From 897ad6a95739e9ce41fef512f6b5dd46c03dcccd Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Fri, 7 Oct 2016 10:47:48 +0100 Subject: [PATCH 1/5] prevent race conditions in run_scheduled_jobs queuing jobs multiple times we were running into issues where multiple beats queue up the run_scheduled_jobs task at the same time, and concurrency issues with selecting scheduled jobs causes both tasks to trigger processing of the job. Use with_for_update, which calls through to the postgres SELECT ... FOR UPDATE which locks other SELECT FOR UPDATES (ie other threads running same code) until the rows are set to pending and the transaction completes - so the second thread will not find any rows --- app/celery/scheduled_tasks.py | 11 +++++++++-- app/dao/jobs_dao.py | 1 + lambda_function/README.md | 9 +++++++++ 3 files changed, 19 insertions(+), 2 deletions(-) create mode 100644 lambda_function/README.md diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 4448079f0..097448ed8 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -27,17 +27,24 @@ def remove_csv_files(): @notify_celery.task(name="run-scheduled-jobs") @statsd(namespace="tasks") def run_scheduled_jobs(): + from app import db try: jobs = dao_get_scheduled_jobs() for job in jobs: job.job_status = JOB_STATUS_PENDING - dao_update_job(job) - process_job.apply_async([str(job.id)], queue="process-job") + from time import sleep + sleep(1) + print('SCHEDULING' + str(job.id)) + # dao_update_job(job) + # process_job.apply_async([str(job.id)], queue="process-job") current_app.logger.info( "Job ID {} added to process job queue".format(job.id) ) + db.session.add_all(jobs) + db.session.commit() except SQLAlchemyError as e: current_app.logger.exception("Failed to run scheduled jobs", e) + db.session.rollback() raise diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index 41ba00940..b9b677708 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -52,6 +52,7 @@ def dao_get_scheduled_jobs(): Job.scheduled_for < datetime.utcnow() ) \ .order_by(asc(Job.scheduled_for)) \ + .with_for_update() \ .all() diff --git a/lambda_function/README.md b/lambda_function/README.md new file mode 100644 index 000000000..3a8df5a1a --- /dev/null +++ b/lambda_function/README.md @@ -0,0 +1,9 @@ +# Lambda function + +This code is used to setup a lambda function to call into our API to deliver messages. + +Basic expected flow is that an request to GOV.UK Notify to send a text/email will persist in our DB then invoke an async lambda function. + +This function will call /deliver/notification/{id} which will make the API call to our delivery partners, updating the DB with the response. + +## EXPERIMENTAL AT THIS STAGE From 16dd16c0261872fa7e18cb58548e8c98b2a0548e Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 7 Oct 2016 12:28:42 +0100 Subject: [PATCH 2/5] move updating into the dao fn this helps manage the transaction by keeping it inside one function in the dao, so after the function completes you know that the transaction has been released and concurrent processing can resume --- app/celery/scheduled_tasks.py | 21 +++++---------------- app/dao/jobs_dao.py | 22 +++++++++++++++++++--- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 097448ed8..dea2196fc 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import datetime from flask import current_app from sqlalchemy.exc import SQLAlchemyError @@ -6,12 +6,11 @@ from sqlalchemy.exc import SQLAlchemyError from app.aws import s3 from app import notify_celery from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago -from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job, dao_get_jobs_older_than +from app.dao.jobs_dao import dao_set_scheduled_jobs_to_pending, dao_get_jobs_older_than from app.dao.notifications_dao import (delete_notifications_created_more_than_a_week_ago, dao_timeout_notifications) from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago from app.statsd_decorators import statsd -from app.models import JOB_STATUS_PENDING from app.celery.tasks import process_job @@ -27,24 +26,14 @@ def remove_csv_files(): @notify_celery.task(name="run-scheduled-jobs") @statsd(namespace="tasks") def run_scheduled_jobs(): - from app import db try: - jobs = dao_get_scheduled_jobs() - for job in jobs: - job.job_status = JOB_STATUS_PENDING - from time import sleep - sleep(1) - print('SCHEDULING' + str(job.id)) - # dao_update_job(job) - # process_job.apply_async([str(job.id)], queue="process-job") + for job in dao_set_scheduled_jobs_to_pending(): + process_job.apply_async([str(job.id)], queue="process-job") current_app.logger.info( - "Job ID {} added to process job queue".format(job.id) + "Job ID {} added to process job queue".format(job.id) ) - db.session.add_all(jobs) - db.session.commit() except SQLAlchemyError as e: current_app.logger.exception("Failed to run scheduled jobs", e) - db.session.rollback() raise diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index b9b677708..9d674b222 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -4,7 +4,7 @@ from sqlalchemy import func, desc, asc, cast, Date as sql_date from app import db from app.dao import days_ago -from app.models import Job, NotificationHistory, JOB_STATUS_SCHEDULED +from app.models import Job, NotificationHistory, JOB_STATUS_SCHEDULED, JOB_STATUS_PENDING from app.statsd_decorators import statsd @@ -45,8 +45,15 @@ def dao_get_job_by_id(job_id): return Job.query.filter_by(id=job_id).one() -def dao_get_scheduled_jobs(): - return Job.query \ +def dao_set_scheduled_jobs_to_pending(): + """ + Sets all past scheduled jobs to pending, and then returns them for further processing. + + this is used in the run_scheduled_jobs task, so we put a FOR UPDATE lock on the job table for the duration of + the transaction so that if the task is run more than once concurrently, one task will block the other select + from completing until it commits. + """ + jobs = Job.query \ .filter( Job.job_status == JOB_STATUS_SCHEDULED, Job.scheduled_for < datetime.utcnow() @@ -55,6 +62,15 @@ def dao_get_scheduled_jobs(): .with_for_update() \ .all() + for job in jobs: + job.job_status = JOB_STATUS_PENDING + + db.session.add_all(jobs) + db.session.commit() + + return jobs + + def dao_get_future_scheduled_job_by_id_and_service_id(job_id, service_id): return Job.query \ From 27c7a08523e7bc1e6c2ed5c5ab0e016c56783fc3 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 7 Oct 2016 12:48:58 +0100 Subject: [PATCH 3/5] use assert rather than relying on mock functions mocks create any property you access, so calling functions on them is inherently risky due to typos quietly doing nothing. instead assert `.called is False`, which will fail noisily if you typo --- tests/app/celery/test_tasks.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index cd23c709a..cdc0b27b9 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -155,10 +155,10 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, process_job(job.id) - s3.get_job_from_s3.assert_not_called() job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' - tasks.send_sms.apply_async.assert_not_called() + assert s3.get_job_from_s3.called is False + assert tasks.send_sms.apply_async.called is False def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify_db, @@ -178,8 +178,8 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' - s3.get_job_from_s3.assert_not_called() - tasks.send_sms.apply_async.assert_not_called() + assert s3.get_job_from_s3.called is False + assert tasks.send_sms.apply_async.called is False def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(notify_db, notify_db_session, mocker): @@ -198,8 +198,8 @@ def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(noti job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' - s3.get_job_from_s3.assert_not_called() - tasks.send_email.apply_async.assert_not_called() + assert s3.get_job_from_s3.called is False + assert tasks.send_email.apply_async.called is False @freeze_time("2016-01-01 11:09:00.061258") @@ -208,17 +208,17 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not template = sample_email_template(notify_db, notify_db_session, service=service) job = sample_job(notify_db, notify_db_session, service=service, template=template) - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('email')) + mocker.patch('app.celery.tasks.s3.get_job_from_s3') mocker.patch('app.celery.tasks.send_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") process_job(job.id) - s3.get_job_from_s3.assert_not_called job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' - tasks.send_email.apply_async.assert_not_called + assert s3.get_job_from_s3.called is False + assert tasks.send_email.apply_async.called is False @freeze_time("2016-01-01 11:09:00.061258") @@ -491,7 +491,7 @@ def test_should_not_send_sms_if_restricted_service_and_invalid_number(notify_db, encryption.encrypt(notification), datetime.utcnow().strftime(DATETIME_FORMAT) ) - provider_tasks.deliver_sms.apply_async.assert_not_called() + assert provider_tasks.deliver_sms.apply_async.called is False with pytest.raises(NoResultFound): Notification.query.filter_by(id=notification_id).one() @@ -631,7 +631,7 @@ def test_should_not_send_sms_if_team_key_and_recipient_not_in_team(notify_db, no encryption.encrypt(notification), datetime.utcnow().strftime(DATETIME_FORMAT) ) - provider_tasks.send_sms_to_provider.apply_async.assert_not_called() + assert provider_tasks.send_sms_to_provider.apply_async.called is False with pytest.raises(NoResultFound): Notification.query.filter_by(id=notification_id).one() @@ -786,7 +786,7 @@ def test_send_sms_should_go_to_retry_queue_if_database_errors(sample_template, m encryption.encrypt(notification), now.strftime(DATETIME_FORMAT) ) - provider_tasks.deliver_sms.apply_async.assert_not_called() + assert provider_tasks.deliver_sms.apply_async.called is False tasks.send_sms.retry.assert_called_with(exc=expected_exception, queue='retry') with pytest.raises(NoResultFound) as e: @@ -813,7 +813,7 @@ def test_send_email_should_go_to_retry_queue_if_database_errors(sample_email_tem encryption.encrypt(notification), now.strftime(DATETIME_FORMAT) ) - provider_tasks.deliver_email.apply_async.assert_not_called() + assert provider_tasks.deliver_email.apply_async.called is False tasks.send_email.retry.assert_called_with(exc=expected_exception, queue='retry') with pytest.raises(NoResultFound) as e: From d22d055e21e942564bc9ffbd754fdacf61f9aa88 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 7 Oct 2016 12:54:04 +0100 Subject: [PATCH 4/5] only process jobs if they're pending help prevent issues where scheduled jobs are processed twice. note this is NOT a watertight solution - it holds no locks, and there is no guarantee that the status won't have updated between asserting that its status is 'pending' and updating it to be 'in progress' --- app/celery/tasks.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index e784a876f..eca526ee8 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -36,6 +36,9 @@ def process_job(job_id): start = datetime.utcnow() job = dao_get_job_by_id(job_id) + if job.job_status != 'pending': + return + service = job.service total_sent = fetch_todays_total_message_count(service.id) From bdb4da4976d91282d3ebbf5b7161567efec49627 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 7 Oct 2016 12:55:48 +0100 Subject: [PATCH 5/5] tests n stuff --- app/celery/scheduled_tasks.py | 4 +--- app/dao/jobs_dao.py | 1 - tests/app/celery/test_tasks.py | 12 ++++++++++++ tests/app/dao/test_jobs_dao.py | 25 ++++++++++++++++++------- 4 files changed, 31 insertions(+), 11 deletions(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index dea2196fc..f9ee251b8 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -29,9 +29,7 @@ def run_scheduled_jobs(): try: for job in dao_set_scheduled_jobs_to_pending(): process_job.apply_async([str(job.id)], queue="process-job") - current_app.logger.info( - "Job ID {} added to process job queue".format(job.id) - ) + current_app.logger.info("Job ID {} added to process job queue".format(job.id)) except SQLAlchemyError as e: current_app.logger.exception("Failed to run scheduled jobs", e) raise diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index 9d674b222..e78b27645 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -71,7 +71,6 @@ def dao_set_scheduled_jobs_to_pending(): return jobs - def dao_get_future_scheduled_job_by_id_and_service_id(job_id, service_id): return Job.query \ .filter( diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index cdc0b27b9..0f11ecf42 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -221,6 +221,18 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not assert tasks.send_email.apply_async.called is False +def test_should_not_process_job_if_already_pending(notify_db, notify_db_session, mocker): + job = sample_job(notify_db, notify_db_session, job_status='scheduled') + + mocker.patch('app.celery.tasks.s3.get_job_from_s3') + mocker.patch('app.celery.tasks.send_sms.apply_async') + + process_job(job.id) + + assert s3.get_job_from_s3.called is False + assert tasks.send_sms.apply_async.called is False + + @freeze_time("2016-01-01 11:09:00.061258") def test_should_process_email_job_if_exactly_on_send_limits(notify_db, notify_db_session, diff --git a/tests/app/dao/test_jobs_dao.py b/tests/app/dao/test_jobs_dao.py index f9a77192f..e13a00795 100644 --- a/tests/app/dao/test_jobs_dao.py +++ b/tests/app/dao/test_jobs_dao.py @@ -8,7 +8,7 @@ from app.dao.jobs_dao import ( dao_create_job, dao_update_job, dao_get_jobs_by_service_id, - dao_get_scheduled_jobs, + dao_set_scheduled_jobs_to_pending, dao_get_future_scheduled_job_by_id_and_service_id, dao_get_notification_outcomes_for_job, dao_get_jobs_older_than @@ -223,31 +223,42 @@ def test_update_job(sample_job): assert job_from_db.job_status == 'in progress' -def test_get_scheduled_jobs_gets_all_jobs_in_scheduled_state_scheduled_before_now(notify_db, notify_db_session): +def test_set_scheduled_jobs_to_pending_gets_all_jobs_in_scheduled_state_before_now(notify_db, notify_db_session): one_minute_ago = datetime.utcnow() - timedelta(minutes=1) one_hour_ago = datetime.utcnow() - timedelta(minutes=60) job_new = create_job(notify_db, notify_db_session, scheduled_for=one_minute_ago, job_status='scheduled') job_old = create_job(notify_db, notify_db_session, scheduled_for=one_hour_ago, job_status='scheduled') - jobs = dao_get_scheduled_jobs() + jobs = dao_set_scheduled_jobs_to_pending() assert len(jobs) == 2 assert jobs[0].id == job_old.id assert jobs[1].id == job_new.id -def test_get_scheduled_jobs_gets_ignores_jobs_not_scheduled(notify_db, notify_db_session): +def test_set_scheduled_jobs_to_pending_gets_ignores_jobs_not_scheduled(notify_db, notify_db_session): one_minute_ago = datetime.utcnow() - timedelta(minutes=1) create_job(notify_db, notify_db_session) job_scheduled = create_job(notify_db, notify_db_session, scheduled_for=one_minute_ago, job_status='scheduled') - jobs = dao_get_scheduled_jobs() + jobs = dao_set_scheduled_jobs_to_pending() assert len(jobs) == 1 assert jobs[0].id == job_scheduled.id -def test_get_scheduled_jobs_gets_ignores_jobs_scheduled_in_the_future(sample_scheduled_job): - jobs = dao_get_scheduled_jobs() +def test_set_scheduled_jobs_to_pending_gets_ignores_jobs_scheduled_in_the_future(sample_scheduled_job): + jobs = dao_set_scheduled_jobs_to_pending() assert len(jobs) == 0 +def test_set_scheduled_jobs_to_pending_updates_rows(notify_db, notify_db_session): + one_minute_ago = datetime.utcnow() - timedelta(minutes=1) + one_hour_ago = datetime.utcnow() - timedelta(minutes=60) + create_job(notify_db, notify_db_session, scheduled_for=one_minute_ago, job_status='scheduled') + create_job(notify_db, notify_db_session, scheduled_for=one_hour_ago, job_status='scheduled') + jobs = dao_set_scheduled_jobs_to_pending() + assert len(jobs) == 2 + assert jobs[0].job_status == 'pending' + assert jobs[1].job_status == 'pending' + + def test_get_future_scheduled_job_gets_a_job_yet_to_send(sample_scheduled_job): result = dao_get_future_scheduled_job_by_id_and_service_id(sample_scheduled_job.id, sample_scheduled_job.service_id) assert result.id == sample_scheduled_job.id