From db5a50c5a7e32d9d7018a5385c78b5e62fcf6d36 Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Tue, 5 Nov 2019 16:47:00 +0000 Subject: [PATCH] Adding a scheduled task to processing missing rows from job Sometimes a job finishes but has missed a row in the middle. It is a mystery why this is happening, it could be that the task to save the notifications has been dropped. So until we solve the missing let's find missing rows and process them. A new scheduled task has been added to find any "finished" jobs that do not have enough notifications created. If there are missing notifications the job processes those rows for the job. Adding the new task to beat schedule will be done in the next commit. A unique key constraint has been added to Notifications to ensure that the row is not added twice. Any index or constraint can affect performance, but this unique constraint should not affect it enough for us to notice. --- app/celery/scheduled_tasks.py | 47 +++++++++--------- app/celery/tasks.py | 34 ++++++------- app/dao/jobs_dao.py | 18 +++---- .../versions/0308_add_uq_key_row_number.py | 19 +++++++ tests/app/celery/test_scheduled_tasks.py | 47 ++++++++++++++++++ tests/app/dao/test_jobs_dao.py | 49 ++++++++++++++++--- tests/app/db.py | 2 + 7 files changed, 160 insertions(+), 56 deletions(-) create mode 100644 migrations/versions/0308_add_uq_key_row_number.py diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index d29dbcc8e..520567618 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -9,11 +9,19 @@ from sqlalchemy import and_ from sqlalchemy.exc import SQLAlchemyError from app import notify_celery, zendesk_client -from app.celery.tasks import process_job +from app.celery.tasks import ( + process_job, + get_recipient_csv_and_template, + process_row +) from app.config import QueueNames, TaskNames from app.dao.invited_org_user_dao import delete_org_invitations_created_more_than_two_days_ago from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago -from app.dao.jobs_dao import dao_set_scheduled_jobs_to_pending, find_jobs_with_missing_rows, find_missing_row_for_job +from app.dao.jobs_dao import ( + dao_set_scheduled_jobs_to_pending, + find_jobs_with_missing_rows, + find_missing_row_for_job +) from app.dao.jobs_dao import dao_update_job from app.dao.notifications_dao import ( is_delivery_slow_for_provider, @@ -223,26 +231,19 @@ def check_templated_letter_state(): ticket_type=zendesk_client.TYPE_INCIDENT ) -@notify_celery.task(name='find-missing-rows-from-completed-jobs') -def find_missing_rows_from_completed_jobs(): + +@notify_celery.task(name='check-for-missing-rows-in-completed-jobs') +def check_for_missing_rows_in_completed_jobs(): jobs_and_job_size = find_jobs_with_missing_rows() for x in jobs_and_job_size: - missing_rows = find_missing_row_for_job(jobs_and_job_size.job_id, jobs_and_job_size.notification_count) - - # job = dao_get_job_by_id(job_id) - # db_template = dao_get_template_by_id(job.template_id, job.template_version) - # - # TemplateClass = get_template_class(db_template.template_type) - # template = TemplateClass(db_template.__dict__) - # - # for row in RecipientCSV( - # s3.get_job_from_s3(str(job.service_id), str(job.id)), - # template_type=template.template_type, - # placeholders=template.placeholders - # ).get_rows(): - # if row.index == job_row_number: - # notification_id = process_row(row, template, job, job.service) - # current_app.logger.info("Process row {} for job {} created notification_id: {}".format( - # job_row_number, job_id, notification_id)) - - pass + job = x[1] + missing_rows = find_missing_row_for_job(job.id, job.notification_count) + for row_to_process in missing_rows: + # The sender_id is passed in with job, at this point we no longer have the sender that is passed in. + # The notification will be created with the default sender. + # There is a bug to fix this https://www.pivotaltracker.com/story/show/169569144 + recipient_csv, template = get_recipient_csv_and_template(job) + for row in recipient_csv.get_rows(): + if row.index == row_to_process.missing_row: + current_app.logger.info("Processing row: {} for job: {}") + process_row(row, template, job, job.service) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 22cf3643d..f489f281c 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -97,18 +97,11 @@ def process_job(job_id, sender_id=None): job.processing_started = start dao_update_job(job) - db_template = dao_get_template_by_id(job.template_id, job.template_version) - - TemplateClass = get_template_class(db_template.template_type) - template = TemplateClass(db_template.__dict__) + recipient_csv, template = get_recipient_csv_and_template(job) current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count)) - for row in RecipientCSV( - s3.get_job_from_s3(str(service.id), str(job_id)), - template_type=template.template_type, - placeholders=template.placeholders - ).get_rows(): + for row in recipient_csv.get_rows(): process_row(row, template, job, service, sender_id=sender_id) job_complete(job, start=start) @@ -131,6 +124,18 @@ def job_complete(job, resumed=False, start=None): ) +def get_recipient_csv_and_template(job): + db_template = dao_get_template_by_id(job.template_id, job.template_version) + + TemplateClass = get_template_class(db_template.template_type) + template = TemplateClass(db_template.__dict__) + + recipient_csv = RecipientCSV(file_data=s3.get_job_from_s3(str(job.service_id), str(job.id)), + template_type=template.template_type, + placeholders=template.placeholders) + return recipient_csv, template + + def process_row(row, template, job, service, sender_id=None): template_type = template.template_type encrypted = encryption.encrypt({ @@ -596,16 +601,9 @@ def process_incomplete_job(job_id): current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row)) - db_template = dao_get_template_by_id(job.template_id, job.template_version) + recipient_csv, template = get_recipient_csv_and_template(job) - TemplateClass = get_template_class(db_template.template_type) - template = TemplateClass(db_template.__dict__) - - for row in RecipientCSV( - s3.get_job_from_s3(str(job.service_id), str(job.id)), - template_type=template.template_type, - placeholders=template.placeholders - ).get_rows(): + for row in recipient_csv.get_rows(): if row.index > resume_from_row: process_row(row, template, job, job.service) diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index b84b56a68..2f549a4de 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -191,17 +191,17 @@ def can_letter_job_be_cancelled(job): def find_jobs_with_missing_rows(): + # Jobs can be a maximum of 50,000 rows. It typically takes 5 minutes to create all those notifications. + # Using 10 minutes as a condition seems reasonable. + ten_minutes_ago = datetime.utcnow() - timedelta(minutes=10) jobs_with_rows_missing = db.session.query( func.count(Notification.id).label('count_notifications'), - Job.notification_count, - Job.id.label('job_id'), - Job.service_id + Job ).filter( - Job.job_status == JOB_STATUS_FINISHED + Job.job_status == JOB_STATUS_FINISHED, + Job.processing_finished < ten_minutes_ago ).group_by( - Job.notification_count, - Job.id.label('job_id'), - Job.service_id + Job ).having( func.count(Notification.id) != Job.notification_count ) @@ -211,7 +211,7 @@ def find_jobs_with_missing_rows(): def find_missing_row_for_job(job_id, job_size): expected_row_numbers = db.session.query( - func.generate_series(0, job_size-1).label('row') + func.generate_series(0, job_size - 1).label('row') ).subquery() query = db.session.query( @@ -220,6 +220,6 @@ def find_missing_row_for_job(job_id, job_size): ).outerjoin( Notification, and_(expected_row_numbers.c.row == Notification.job_row_number, Notification.job_id == job_id) ).filter( - Notification.job_row_number == None #noqa + Notification.job_row_number == None # noqa ) return query.all() diff --git a/migrations/versions/0308_add_uq_key_row_number.py b/migrations/versions/0308_add_uq_key_row_number.py new file mode 100644 index 000000000..017204084 --- /dev/null +++ b/migrations/versions/0308_add_uq_key_row_number.py @@ -0,0 +1,19 @@ +""" + +Revision ID: 0308_add_uq_key_row_number +Revises: 0307_delete_dm_datetime +Create Date: 2019-11-05 10:12:03.627850 + +""" +from alembic import op + +revision = '0308_add_uq_key_row_number' +down_revision = '0307_delete_dm_datetime' + + +def upgrade(): + op.create_unique_constraint('uq_notifications_job_row_number', 'notifications', ['job_id', 'job_row_number']) + + +def downgrade(): + op.drop_constraint('uq_notifications_job_row_number') diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 4bfa5de00..f8d0f0a1d 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -3,6 +3,7 @@ from unittest.mock import call import pytest from freezegun import freeze_time +from mock import mock from app import db from app.celery import scheduled_tasks @@ -16,6 +17,7 @@ from app.celery.scheduled_tasks import ( replay_created_notifications, check_precompiled_letter_state, check_templated_letter_state, + check_for_missing_rows_in_completed_jobs ) from app.config import QueueNames, TaskNames from app.dao.jobs_dao import dao_get_job_by_id @@ -32,6 +34,7 @@ from app.models import ( NOTIFICATION_PENDING_VIRUS_CHECK, ) from app.v2.errors import JobIncompleteError +from tests.app import load_example_csv from tests.app.db import ( create_notification, @@ -403,3 +406,47 @@ def test_check_templated_letter_state_during_utc(mocker, sample_letter_template) subject="[test] Letters still in 'created' status", ticket_type='incident' ) + + +def test_check_for_missing_rows_in_completed_jobs(mocker, sample_email_template): + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email')) + mocker.patch('app.encryption.encrypt', return_value="something_encrypted") + process_row = mocker.patch('app.celery.scheduled_tasks.process_row') + + job = create_job(template=sample_email_template, + notification_count=5, + job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=11)) + for i in range(0, 4): + create_notification(job=job, job_row_number=i) + + check_for_missing_rows_in_completed_jobs() + + process_row.assert_called_once_with( + mock.ANY, mock.ANY, job, job.service + ) + + +def test_check_for_missing_rows_in_completed_jobs_calls_save_email(mocker, sample_email_template): + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email')) + save_email_task = mocker.patch('app.celery.tasks.save_email.apply_async') + mocker.patch('app.encryption.encrypt', return_value="something_encrypted") + mocker.patch('app.celery.tasks.create_uuid', return_value='uuid') + + job = create_job(template=sample_email_template, + notification_count=5, + job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=11)) + for i in range(0, 4): + create_notification(job=job, job_row_number=i) + + check_for_missing_rows_in_completed_jobs() + save_email_task.assert_called_once_with( + ( + str(job.service_id), + "uuid", + "something_encrypted", + ), + {}, + queue="database-tasks" + ) diff --git a/tests/app/dao/test_jobs_dao.py b/tests/app/dao/test_jobs_dao.py index 3d32f9174..129deb016 100644 --- a/tests/app/dao/test_jobs_dao.py +++ b/tests/app/dao/test_jobs_dao.py @@ -4,6 +4,7 @@ from functools import partial import pytest from freezegun import freeze_time +from sqlalchemy.exc import IntegrityError from app.dao.jobs_dao import ( can_letter_job_be_cancelled, @@ -415,21 +416,45 @@ def test_can_letter_job_be_cancelled_returns_false_and_error_message_if_notifica def test_find_jobs_with_missing_rows(sample_email_template): - job = create_job(template=sample_email_template, notification_count=5, job_status=JOB_STATUS_FINISHED) + job = create_job(template=sample_email_template, + notification_count=5, + job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=11) + ) for i in range(0, 4): create_notification(job=job, job_row_number=i) results = find_jobs_with_missing_rows() assert len(results) == 1 - assert results[0] == (4, 4, job.id, job.service_id) + assert results[0][0] == 4 + assert results[0][1] == job + + +def test_find_jobs_with_missing_rows_returns_nothing_for_a_job_completed_less_than_10_minutes_ago( + sample_email_template +): + job = create_job(template=sample_email_template, + notification_count=5, + job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=9) + ) + for i in range(0, 4): + create_notification(job=job, job_row_number=i) + + results = find_jobs_with_missing_rows() + + assert len(results) == 0 @pytest.mark.parametrize('status', ['pending', 'in progress', 'cancelled', 'scheduled']) def test_find_jobs_with_missing_rows_doesnt_return_jobs_that_are_not_finished( sample_email_template, status ): - job = create_job(template=sample_email_template, notification_count=5, job_status=status) + job = create_job(template=sample_email_template, + notification_count=5, + job_status=status, + processing_finished=datetime.utcnow() - timedelta(minutes=11)) for i in range(0, 4): create_notification(job=job, job_row_number=i) @@ -439,7 +464,10 @@ def test_find_jobs_with_missing_rows_doesnt_return_jobs_that_are_not_finished( def test_find_missing_row_for_job(sample_email_template): - job = create_job(template=sample_email_template, notification_count=5, job_status=JOB_STATUS_FINISHED) + job = create_job(template=sample_email_template, + notification_count=5, + job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=11)) create_notification(job=job, job_row_number=0) create_notification(job=job, job_row_number=1) create_notification(job=job, job_row_number=3) @@ -451,7 +479,8 @@ def test_find_missing_row_for_job(sample_email_template): def test_find_missing_row_for_job_more_than_one_missing_row(sample_email_template): - job = create_job(template=sample_email_template, notification_count=5, job_status=JOB_STATUS_FINISHED) + job = create_job(template=sample_email_template, notification_count=5, job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=11)) create_notification(job=job, job_row_number=0) create_notification(job=job, job_row_number=1) create_notification(job=job, job_row_number=4) @@ -463,10 +492,18 @@ def test_find_missing_row_for_job_more_than_one_missing_row(sample_email_templat def test_find_missing_row_for_job_return_none_when_row_isnt_missing(sample_email_template): - job = create_job(template=sample_email_template, notification_count=5, job_status=JOB_STATUS_FINISHED) + job = create_job(template=sample_email_template, notification_count=5, job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=11)) for i in range(0, 5): create_notification(job=job, job_row_number=i) results = find_missing_row_for_job(job.id, 5) print(results) assert len(results) == 0 + + +def test_unique_key_on_job_id_and_job_row_number(sample_email_template): + job = create_job(template=sample_email_template) + create_notification(job=job, job_row_number=0) + with pytest.raises(expected_exception=IntegrityError): + create_notification(job=job, job_row_number=0) diff --git a/tests/app/db.py b/tests/app/db.py index a52904bed..341670b90 100644 --- a/tests/app/db.py +++ b/tests/app/db.py @@ -377,6 +377,7 @@ def create_job( job_status='pending', scheduled_for=None, processing_started=None, + processing_finished=None, original_file_name='some.csv', archived=False ): @@ -393,6 +394,7 @@ def create_job( 'job_status': job_status, 'scheduled_for': scheduled_for, 'processing_started': processing_started, + 'processing_finished': processing_finished, 'archived': archived } job = Job(**data)