mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-03 18:01:08 -05:00
Merge pull request #704 from alphagov/concurrency
fix run_scheduled_jobs concurrency bug
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
|
||||
from flask import current_app
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
@@ -6,12 +6,11 @@ from sqlalchemy.exc import SQLAlchemyError
|
||||
from app.aws import s3
|
||||
from app import notify_celery
|
||||
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
|
||||
from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job, dao_get_jobs_older_than
|
||||
from app.dao.jobs_dao import dao_set_scheduled_jobs_to_pending, dao_get_jobs_older_than
|
||||
from app.dao.notifications_dao import (delete_notifications_created_more_than_a_week_ago,
|
||||
dao_timeout_notifications)
|
||||
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
||||
from app.statsd_decorators import statsd
|
||||
from app.models import JOB_STATUS_PENDING
|
||||
from app.celery.tasks import process_job
|
||||
|
||||
|
||||
@@ -28,14 +27,9 @@ def remove_csv_files():
|
||||
@statsd(namespace="tasks")
|
||||
def run_scheduled_jobs():
|
||||
try:
|
||||
jobs = dao_get_scheduled_jobs()
|
||||
for job in jobs:
|
||||
job.job_status = JOB_STATUS_PENDING
|
||||
dao_update_job(job)
|
||||
for job in dao_set_scheduled_jobs_to_pending():
|
||||
process_job.apply_async([str(job.id)], queue="process-job")
|
||||
current_app.logger.info(
|
||||
"Job ID {} added to process job queue".format(job.id)
|
||||
)
|
||||
current_app.logger.info("Job ID {} added to process job queue".format(job.id))
|
||||
except SQLAlchemyError as e:
|
||||
current_app.logger.exception("Failed to run scheduled jobs", e)
|
||||
raise
|
||||
|
||||
@@ -36,6 +36,9 @@ def process_job(job_id):
|
||||
start = datetime.utcnow()
|
||||
job = dao_get_job_by_id(job_id)
|
||||
|
||||
if job.job_status != 'pending':
|
||||
return
|
||||
|
||||
service = job.service
|
||||
|
||||
total_sent = fetch_todays_total_message_count(service.id)
|
||||
|
||||
@@ -4,7 +4,7 @@ from sqlalchemy import func, desc, asc, cast, Date as sql_date
|
||||
|
||||
from app import db
|
||||
from app.dao import days_ago
|
||||
from app.models import Job, NotificationHistory, JOB_STATUS_SCHEDULED
|
||||
from app.models import Job, NotificationHistory, JOB_STATUS_SCHEDULED, JOB_STATUS_PENDING
|
||||
from app.statsd_decorators import statsd
|
||||
|
||||
|
||||
@@ -45,15 +45,31 @@ def dao_get_job_by_id(job_id):
|
||||
return Job.query.filter_by(id=job_id).one()
|
||||
|
||||
|
||||
def dao_get_scheduled_jobs():
|
||||
return Job.query \
|
||||
def dao_set_scheduled_jobs_to_pending():
|
||||
"""
|
||||
Sets all past scheduled jobs to pending, and then returns them for further processing.
|
||||
|
||||
this is used in the run_scheduled_jobs task, so we put a FOR UPDATE lock on the job table for the duration of
|
||||
the transaction so that if the task is run more than once concurrently, one task will block the other select
|
||||
from completing until it commits.
|
||||
"""
|
||||
jobs = Job.query \
|
||||
.filter(
|
||||
Job.job_status == JOB_STATUS_SCHEDULED,
|
||||
Job.scheduled_for < datetime.utcnow()
|
||||
) \
|
||||
.order_by(asc(Job.scheduled_for)) \
|
||||
.with_for_update() \
|
||||
.all()
|
||||
|
||||
for job in jobs:
|
||||
job.job_status = JOB_STATUS_PENDING
|
||||
|
||||
db.session.add_all(jobs)
|
||||
db.session.commit()
|
||||
|
||||
return jobs
|
||||
|
||||
|
||||
def dao_get_future_scheduled_job_by_id_and_service_id(job_id, service_id):
|
||||
return Job.query \
|
||||
|
||||
9
lambda_function/README.md
Normal file
9
lambda_function/README.md
Normal file
@@ -0,0 +1,9 @@
|
||||
# Lambda function
|
||||
|
||||
This code is used to setup a lambda function to call into our API to deliver messages.
|
||||
|
||||
Basic expected flow is that an request to GOV.UK Notify to send a text/email will persist in our DB then invoke an async lambda function.
|
||||
|
||||
This function will call /deliver/notification/{id} which will make the API call to our delivery partners, updating the DB with the response.
|
||||
|
||||
## EXPERIMENTAL AT THIS STAGE
|
||||
@@ -155,10 +155,10 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db,
|
||||
|
||||
process_job(job.id)
|
||||
|
||||
s3.get_job_from_s3.assert_not_called()
|
||||
job = jobs_dao.dao_get_job_by_id(job.id)
|
||||
assert job.job_status == 'sending limits exceeded'
|
||||
tasks.send_sms.apply_async.assert_not_called()
|
||||
assert s3.get_job_from_s3.called is False
|
||||
assert tasks.send_sms.apply_async.called is False
|
||||
|
||||
|
||||
def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify_db,
|
||||
@@ -178,8 +178,8 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify
|
||||
|
||||
job = jobs_dao.dao_get_job_by_id(job.id)
|
||||
assert job.job_status == 'sending limits exceeded'
|
||||
s3.get_job_from_s3.assert_not_called()
|
||||
tasks.send_sms.apply_async.assert_not_called()
|
||||
assert s3.get_job_from_s3.called is False
|
||||
assert tasks.send_sms.apply_async.called is False
|
||||
|
||||
|
||||
def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(notify_db, notify_db_session, mocker):
|
||||
@@ -198,8 +198,8 @@ def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(noti
|
||||
|
||||
job = jobs_dao.dao_get_job_by_id(job.id)
|
||||
assert job.job_status == 'sending limits exceeded'
|
||||
s3.get_job_from_s3.assert_not_called()
|
||||
tasks.send_email.apply_async.assert_not_called()
|
||||
assert s3.get_job_from_s3.called is False
|
||||
assert tasks.send_email.apply_async.called is False
|
||||
|
||||
|
||||
@freeze_time("2016-01-01 11:09:00.061258")
|
||||
@@ -208,17 +208,29 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not
|
||||
template = sample_email_template(notify_db, notify_db_session, service=service)
|
||||
job = sample_job(notify_db, notify_db_session, service=service, template=template)
|
||||
|
||||
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('email'))
|
||||
mocker.patch('app.celery.tasks.s3.get_job_from_s3')
|
||||
mocker.patch('app.celery.tasks.send_email.apply_async')
|
||||
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
|
||||
mocker.patch('app.celery.tasks.create_uuid', return_value="uuid")
|
||||
|
||||
process_job(job.id)
|
||||
|
||||
s3.get_job_from_s3.assert_not_called
|
||||
job = jobs_dao.dao_get_job_by_id(job.id)
|
||||
assert job.job_status == 'sending limits exceeded'
|
||||
tasks.send_email.apply_async.assert_not_called
|
||||
assert s3.get_job_from_s3.called is False
|
||||
assert tasks.send_email.apply_async.called is False
|
||||
|
||||
|
||||
def test_should_not_process_job_if_already_pending(notify_db, notify_db_session, mocker):
|
||||
job = sample_job(notify_db, notify_db_session, job_status='scheduled')
|
||||
|
||||
mocker.patch('app.celery.tasks.s3.get_job_from_s3')
|
||||
mocker.patch('app.celery.tasks.send_sms.apply_async')
|
||||
|
||||
process_job(job.id)
|
||||
|
||||
assert s3.get_job_from_s3.called is False
|
||||
assert tasks.send_sms.apply_async.called is False
|
||||
|
||||
|
||||
@freeze_time("2016-01-01 11:09:00.061258")
|
||||
@@ -491,7 +503,7 @@ def test_should_not_send_sms_if_restricted_service_and_invalid_number(notify_db,
|
||||
encryption.encrypt(notification),
|
||||
datetime.utcnow().strftime(DATETIME_FORMAT)
|
||||
)
|
||||
provider_tasks.deliver_sms.apply_async.assert_not_called()
|
||||
assert provider_tasks.deliver_sms.apply_async.called is False
|
||||
with pytest.raises(NoResultFound):
|
||||
Notification.query.filter_by(id=notification_id).one()
|
||||
|
||||
@@ -631,7 +643,7 @@ def test_should_not_send_sms_if_team_key_and_recipient_not_in_team(notify_db, no
|
||||
encryption.encrypt(notification),
|
||||
datetime.utcnow().strftime(DATETIME_FORMAT)
|
||||
)
|
||||
provider_tasks.send_sms_to_provider.apply_async.assert_not_called()
|
||||
assert provider_tasks.send_sms_to_provider.apply_async.called is False
|
||||
with pytest.raises(NoResultFound):
|
||||
Notification.query.filter_by(id=notification_id).one()
|
||||
|
||||
@@ -786,7 +798,7 @@ def test_send_sms_should_go_to_retry_queue_if_database_errors(sample_template, m
|
||||
encryption.encrypt(notification),
|
||||
now.strftime(DATETIME_FORMAT)
|
||||
)
|
||||
provider_tasks.deliver_sms.apply_async.assert_not_called()
|
||||
assert provider_tasks.deliver_sms.apply_async.called is False
|
||||
tasks.send_sms.retry.assert_called_with(exc=expected_exception, queue='retry')
|
||||
|
||||
with pytest.raises(NoResultFound) as e:
|
||||
@@ -813,7 +825,7 @@ def test_send_email_should_go_to_retry_queue_if_database_errors(sample_email_tem
|
||||
encryption.encrypt(notification),
|
||||
now.strftime(DATETIME_FORMAT)
|
||||
)
|
||||
provider_tasks.deliver_email.apply_async.assert_not_called()
|
||||
assert provider_tasks.deliver_email.apply_async.called is False
|
||||
tasks.send_email.retry.assert_called_with(exc=expected_exception, queue='retry')
|
||||
|
||||
with pytest.raises(NoResultFound) as e:
|
||||
|
||||
@@ -8,7 +8,7 @@ from app.dao.jobs_dao import (
|
||||
dao_create_job,
|
||||
dao_update_job,
|
||||
dao_get_jobs_by_service_id,
|
||||
dao_get_scheduled_jobs,
|
||||
dao_set_scheduled_jobs_to_pending,
|
||||
dao_get_future_scheduled_job_by_id_and_service_id,
|
||||
dao_get_notification_outcomes_for_job,
|
||||
dao_get_jobs_older_than
|
||||
@@ -223,31 +223,42 @@ def test_update_job(sample_job):
|
||||
assert job_from_db.job_status == 'in progress'
|
||||
|
||||
|
||||
def test_get_scheduled_jobs_gets_all_jobs_in_scheduled_state_scheduled_before_now(notify_db, notify_db_session):
|
||||
def test_set_scheduled_jobs_to_pending_gets_all_jobs_in_scheduled_state_before_now(notify_db, notify_db_session):
|
||||
one_minute_ago = datetime.utcnow() - timedelta(minutes=1)
|
||||
one_hour_ago = datetime.utcnow() - timedelta(minutes=60)
|
||||
job_new = create_job(notify_db, notify_db_session, scheduled_for=one_minute_ago, job_status='scheduled')
|
||||
job_old = create_job(notify_db, notify_db_session, scheduled_for=one_hour_ago, job_status='scheduled')
|
||||
jobs = dao_get_scheduled_jobs()
|
||||
jobs = dao_set_scheduled_jobs_to_pending()
|
||||
assert len(jobs) == 2
|
||||
assert jobs[0].id == job_old.id
|
||||
assert jobs[1].id == job_new.id
|
||||
|
||||
|
||||
def test_get_scheduled_jobs_gets_ignores_jobs_not_scheduled(notify_db, notify_db_session):
|
||||
def test_set_scheduled_jobs_to_pending_gets_ignores_jobs_not_scheduled(notify_db, notify_db_session):
|
||||
one_minute_ago = datetime.utcnow() - timedelta(minutes=1)
|
||||
create_job(notify_db, notify_db_session)
|
||||
job_scheduled = create_job(notify_db, notify_db_session, scheduled_for=one_minute_ago, job_status='scheduled')
|
||||
jobs = dao_get_scheduled_jobs()
|
||||
jobs = dao_set_scheduled_jobs_to_pending()
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0].id == job_scheduled.id
|
||||
|
||||
|
||||
def test_get_scheduled_jobs_gets_ignores_jobs_scheduled_in_the_future(sample_scheduled_job):
|
||||
jobs = dao_get_scheduled_jobs()
|
||||
def test_set_scheduled_jobs_to_pending_gets_ignores_jobs_scheduled_in_the_future(sample_scheduled_job):
|
||||
jobs = dao_set_scheduled_jobs_to_pending()
|
||||
assert len(jobs) == 0
|
||||
|
||||
|
||||
def test_set_scheduled_jobs_to_pending_updates_rows(notify_db, notify_db_session):
|
||||
one_minute_ago = datetime.utcnow() - timedelta(minutes=1)
|
||||
one_hour_ago = datetime.utcnow() - timedelta(minutes=60)
|
||||
create_job(notify_db, notify_db_session, scheduled_for=one_minute_ago, job_status='scheduled')
|
||||
create_job(notify_db, notify_db_session, scheduled_for=one_hour_ago, job_status='scheduled')
|
||||
jobs = dao_set_scheduled_jobs_to_pending()
|
||||
assert len(jobs) == 2
|
||||
assert jobs[0].job_status == 'pending'
|
||||
assert jobs[1].job_status == 'pending'
|
||||
|
||||
|
||||
def test_get_future_scheduled_job_gets_a_job_yet_to_send(sample_scheduled_job):
|
||||
result = dao_get_future_scheduled_job_by_id_and_service_id(sample_scheduled_job.id, sample_scheduled_job.service_id)
|
||||
assert result.id == sample_scheduled_job.id
|
||||
|
||||
Reference in New Issue
Block a user