diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 4448079f0..f9ee251b8 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import datetime from flask import current_app from sqlalchemy.exc import SQLAlchemyError @@ -6,12 +6,11 @@ from sqlalchemy.exc import SQLAlchemyError from app.aws import s3 from app import notify_celery from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago -from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job, dao_get_jobs_older_than +from app.dao.jobs_dao import dao_set_scheduled_jobs_to_pending, dao_get_jobs_older_than from app.dao.notifications_dao import (delete_notifications_created_more_than_a_week_ago, dao_timeout_notifications) from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago from app.statsd_decorators import statsd -from app.models import JOB_STATUS_PENDING from app.celery.tasks import process_job @@ -28,14 +27,9 @@ def remove_csv_files(): @statsd(namespace="tasks") def run_scheduled_jobs(): try: - jobs = dao_get_scheduled_jobs() - for job in jobs: - job.job_status = JOB_STATUS_PENDING - dao_update_job(job) + for job in dao_set_scheduled_jobs_to_pending(): process_job.apply_async([str(job.id)], queue="process-job") - current_app.logger.info( - "Job ID {} added to process job queue".format(job.id) - ) + current_app.logger.info("Job ID {} added to process job queue".format(job.id)) except SQLAlchemyError as e: current_app.logger.exception("Failed to run scheduled jobs", e) raise diff --git a/app/celery/tasks.py b/app/celery/tasks.py index e784a876f..eca526ee8 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -36,6 +36,9 @@ def process_job(job_id): start = datetime.utcnow() job = dao_get_job_by_id(job_id) + if job.job_status != 'pending': + return + service = job.service total_sent = fetch_todays_total_message_count(service.id) diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index 41ba00940..e78b27645 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -4,7 +4,7 @@ from sqlalchemy import func, desc, asc, cast, Date as sql_date from app import db from app.dao import days_ago -from app.models import Job, NotificationHistory, JOB_STATUS_SCHEDULED +from app.models import Job, NotificationHistory, JOB_STATUS_SCHEDULED, JOB_STATUS_PENDING from app.statsd_decorators import statsd @@ -45,15 +45,31 @@ def dao_get_job_by_id(job_id): return Job.query.filter_by(id=job_id).one() -def dao_get_scheduled_jobs(): - return Job.query \ +def dao_set_scheduled_jobs_to_pending(): + """ + Sets all past scheduled jobs to pending, and then returns them for further processing. + + this is used in the run_scheduled_jobs task, so we put a FOR UPDATE lock on the job table for the duration of + the transaction so that if the task is run more than once concurrently, one task will block the other select + from completing until it commits. + """ + jobs = Job.query \ .filter( Job.job_status == JOB_STATUS_SCHEDULED, Job.scheduled_for < datetime.utcnow() ) \ .order_by(asc(Job.scheduled_for)) \ + .with_for_update() \ .all() + for job in jobs: + job.job_status = JOB_STATUS_PENDING + + db.session.add_all(jobs) + db.session.commit() + + return jobs + def dao_get_future_scheduled_job_by_id_and_service_id(job_id, service_id): return Job.query \ diff --git a/lambda_function/README.md b/lambda_function/README.md new file mode 100644 index 000000000..3a8df5a1a --- /dev/null +++ b/lambda_function/README.md @@ -0,0 +1,9 @@ +# Lambda function + +This code is used to setup a lambda function to call into our API to deliver messages. + +Basic expected flow is that an request to GOV.UK Notify to send a text/email will persist in our DB then invoke an async lambda function. + +This function will call /deliver/notification/{id} which will make the API call to our delivery partners, updating the DB with the response. + +## EXPERIMENTAL AT THIS STAGE diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index cd23c709a..0f11ecf42 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -155,10 +155,10 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, process_job(job.id) - s3.get_job_from_s3.assert_not_called() job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' - tasks.send_sms.apply_async.assert_not_called() + assert s3.get_job_from_s3.called is False + assert tasks.send_sms.apply_async.called is False def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify_db, @@ -178,8 +178,8 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' - s3.get_job_from_s3.assert_not_called() - tasks.send_sms.apply_async.assert_not_called() + assert s3.get_job_from_s3.called is False + assert tasks.send_sms.apply_async.called is False def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(notify_db, notify_db_session, mocker): @@ -198,8 +198,8 @@ def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(noti job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' - s3.get_job_from_s3.assert_not_called() - tasks.send_email.apply_async.assert_not_called() + assert s3.get_job_from_s3.called is False + assert tasks.send_email.apply_async.called is False @freeze_time("2016-01-01 11:09:00.061258") @@ -208,17 +208,29 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not template = sample_email_template(notify_db, notify_db_session, service=service) job = sample_job(notify_db, notify_db_session, service=service, template=template) - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('email')) + mocker.patch('app.celery.tasks.s3.get_job_from_s3') mocker.patch('app.celery.tasks.send_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") process_job(job.id) - s3.get_job_from_s3.assert_not_called job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' - tasks.send_email.apply_async.assert_not_called + assert s3.get_job_from_s3.called is False + assert tasks.send_email.apply_async.called is False + + +def test_should_not_process_job_if_already_pending(notify_db, notify_db_session, mocker): + job = sample_job(notify_db, notify_db_session, job_status='scheduled') + + mocker.patch('app.celery.tasks.s3.get_job_from_s3') + mocker.patch('app.celery.tasks.send_sms.apply_async') + + process_job(job.id) + + assert s3.get_job_from_s3.called is False + assert tasks.send_sms.apply_async.called is False @freeze_time("2016-01-01 11:09:00.061258") @@ -491,7 +503,7 @@ def test_should_not_send_sms_if_restricted_service_and_invalid_number(notify_db, encryption.encrypt(notification), datetime.utcnow().strftime(DATETIME_FORMAT) ) - provider_tasks.deliver_sms.apply_async.assert_not_called() + assert provider_tasks.deliver_sms.apply_async.called is False with pytest.raises(NoResultFound): Notification.query.filter_by(id=notification_id).one() @@ -631,7 +643,7 @@ def test_should_not_send_sms_if_team_key_and_recipient_not_in_team(notify_db, no encryption.encrypt(notification), datetime.utcnow().strftime(DATETIME_FORMAT) ) - provider_tasks.send_sms_to_provider.apply_async.assert_not_called() + assert provider_tasks.send_sms_to_provider.apply_async.called is False with pytest.raises(NoResultFound): Notification.query.filter_by(id=notification_id).one() @@ -786,7 +798,7 @@ def test_send_sms_should_go_to_retry_queue_if_database_errors(sample_template, m encryption.encrypt(notification), now.strftime(DATETIME_FORMAT) ) - provider_tasks.deliver_sms.apply_async.assert_not_called() + assert provider_tasks.deliver_sms.apply_async.called is False tasks.send_sms.retry.assert_called_with(exc=expected_exception, queue='retry') with pytest.raises(NoResultFound) as e: @@ -813,7 +825,7 @@ def test_send_email_should_go_to_retry_queue_if_database_errors(sample_email_tem encryption.encrypt(notification), now.strftime(DATETIME_FORMAT) ) - provider_tasks.deliver_email.apply_async.assert_not_called() + assert provider_tasks.deliver_email.apply_async.called is False tasks.send_email.retry.assert_called_with(exc=expected_exception, queue='retry') with pytest.raises(NoResultFound) as e: diff --git a/tests/app/dao/test_jobs_dao.py b/tests/app/dao/test_jobs_dao.py index f9a77192f..e13a00795 100644 --- a/tests/app/dao/test_jobs_dao.py +++ b/tests/app/dao/test_jobs_dao.py @@ -8,7 +8,7 @@ from app.dao.jobs_dao import ( dao_create_job, dao_update_job, dao_get_jobs_by_service_id, - dao_get_scheduled_jobs, + dao_set_scheduled_jobs_to_pending, dao_get_future_scheduled_job_by_id_and_service_id, dao_get_notification_outcomes_for_job, dao_get_jobs_older_than @@ -223,31 +223,42 @@ def test_update_job(sample_job): assert job_from_db.job_status == 'in progress' -def test_get_scheduled_jobs_gets_all_jobs_in_scheduled_state_scheduled_before_now(notify_db, notify_db_session): +def test_set_scheduled_jobs_to_pending_gets_all_jobs_in_scheduled_state_before_now(notify_db, notify_db_session): one_minute_ago = datetime.utcnow() - timedelta(minutes=1) one_hour_ago = datetime.utcnow() - timedelta(minutes=60) job_new = create_job(notify_db, notify_db_session, scheduled_for=one_minute_ago, job_status='scheduled') job_old = create_job(notify_db, notify_db_session, scheduled_for=one_hour_ago, job_status='scheduled') - jobs = dao_get_scheduled_jobs() + jobs = dao_set_scheduled_jobs_to_pending() assert len(jobs) == 2 assert jobs[0].id == job_old.id assert jobs[1].id == job_new.id -def test_get_scheduled_jobs_gets_ignores_jobs_not_scheduled(notify_db, notify_db_session): +def test_set_scheduled_jobs_to_pending_gets_ignores_jobs_not_scheduled(notify_db, notify_db_session): one_minute_ago = datetime.utcnow() - timedelta(minutes=1) create_job(notify_db, notify_db_session) job_scheduled = create_job(notify_db, notify_db_session, scheduled_for=one_minute_ago, job_status='scheduled') - jobs = dao_get_scheduled_jobs() + jobs = dao_set_scheduled_jobs_to_pending() assert len(jobs) == 1 assert jobs[0].id == job_scheduled.id -def test_get_scheduled_jobs_gets_ignores_jobs_scheduled_in_the_future(sample_scheduled_job): - jobs = dao_get_scheduled_jobs() +def test_set_scheduled_jobs_to_pending_gets_ignores_jobs_scheduled_in_the_future(sample_scheduled_job): + jobs = dao_set_scheduled_jobs_to_pending() assert len(jobs) == 0 +def test_set_scheduled_jobs_to_pending_updates_rows(notify_db, notify_db_session): + one_minute_ago = datetime.utcnow() - timedelta(minutes=1) + one_hour_ago = datetime.utcnow() - timedelta(minutes=60) + create_job(notify_db, notify_db_session, scheduled_for=one_minute_ago, job_status='scheduled') + create_job(notify_db, notify_db_session, scheduled_for=one_hour_ago, job_status='scheduled') + jobs = dao_set_scheduled_jobs_to_pending() + assert len(jobs) == 2 + assert jobs[0].job_status == 'pending' + assert jobs[1].job_status == 'pending' + + def test_get_future_scheduled_job_gets_a_job_yet_to_send(sample_scheduled_job): result = dao_get_future_scheduled_job_by_id_and_service_id(sample_scheduled_job.id, sample_scheduled_job.service_id) assert result.id == sample_scheduled_job.id