diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index d29dbcc8e..520567618 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -9,11 +9,19 @@ from sqlalchemy import and_ from sqlalchemy.exc import SQLAlchemyError from app import notify_celery, zendesk_client -from app.celery.tasks import process_job +from app.celery.tasks import ( + process_job, + get_recipient_csv_and_template, + process_row +) from app.config import QueueNames, TaskNames from app.dao.invited_org_user_dao import delete_org_invitations_created_more_than_two_days_ago from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago -from app.dao.jobs_dao import dao_set_scheduled_jobs_to_pending, find_jobs_with_missing_rows, find_missing_row_for_job +from app.dao.jobs_dao import ( + dao_set_scheduled_jobs_to_pending, + find_jobs_with_missing_rows, + find_missing_row_for_job +) from app.dao.jobs_dao import dao_update_job from app.dao.notifications_dao import ( is_delivery_slow_for_provider, @@ -223,26 +231,19 @@ def check_templated_letter_state(): ticket_type=zendesk_client.TYPE_INCIDENT ) -@notify_celery.task(name='find-missing-rows-from-completed-jobs') -def find_missing_rows_from_completed_jobs(): + +@notify_celery.task(name='check-for-missing-rows-in-completed-jobs') +def check_for_missing_rows_in_completed_jobs(): jobs_and_job_size = find_jobs_with_missing_rows() for x in jobs_and_job_size: - missing_rows = find_missing_row_for_job(jobs_and_job_size.job_id, jobs_and_job_size.notification_count) - - # job = dao_get_job_by_id(job_id) - # db_template = dao_get_template_by_id(job.template_id, job.template_version) - # - # TemplateClass = get_template_class(db_template.template_type) - # template = TemplateClass(db_template.__dict__) - # - # for row in RecipientCSV( - # s3.get_job_from_s3(str(job.service_id), str(job.id)), - # template_type=template.template_type, - # placeholders=template.placeholders - # ).get_rows(): - # if row.index == job_row_number: - # notification_id = process_row(row, template, job, job.service) - # current_app.logger.info("Process row {} for job {} created notification_id: {}".format( - # job_row_number, job_id, notification_id)) - - pass + job = x[1] + missing_rows = find_missing_row_for_job(job.id, job.notification_count) + for row_to_process in missing_rows: + # The sender_id is passed in with job, at this point we no longer have the sender that is passed in. + # The notification will be created with the default sender. + # There is a bug to fix this https://www.pivotaltracker.com/story/show/169569144 + recipient_csv, template = get_recipient_csv_and_template(job) + for row in recipient_csv.get_rows(): + if row.index == row_to_process.missing_row: + current_app.logger.info("Processing row: {} for job: {}") + process_row(row, template, job, job.service) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 22cf3643d..f489f281c 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -97,18 +97,11 @@ def process_job(job_id, sender_id=None): job.processing_started = start dao_update_job(job) - db_template = dao_get_template_by_id(job.template_id, job.template_version) - - TemplateClass = get_template_class(db_template.template_type) - template = TemplateClass(db_template.__dict__) + recipient_csv, template = get_recipient_csv_and_template(job) current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count)) - for row in RecipientCSV( - s3.get_job_from_s3(str(service.id), str(job_id)), - template_type=template.template_type, - placeholders=template.placeholders - ).get_rows(): + for row in recipient_csv.get_rows(): process_row(row, template, job, service, sender_id=sender_id) job_complete(job, start=start) @@ -131,6 +124,18 @@ def job_complete(job, resumed=False, start=None): ) +def get_recipient_csv_and_template(job): + db_template = dao_get_template_by_id(job.template_id, job.template_version) + + TemplateClass = get_template_class(db_template.template_type) + template = TemplateClass(db_template.__dict__) + + recipient_csv = RecipientCSV(file_data=s3.get_job_from_s3(str(job.service_id), str(job.id)), + template_type=template.template_type, + placeholders=template.placeholders) + return recipient_csv, template + + def process_row(row, template, job, service, sender_id=None): template_type = template.template_type encrypted = encryption.encrypt({ @@ -596,16 +601,9 @@ def process_incomplete_job(job_id): current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row)) - db_template = dao_get_template_by_id(job.template_id, job.template_version) + recipient_csv, template = get_recipient_csv_and_template(job) - TemplateClass = get_template_class(db_template.template_type) - template = TemplateClass(db_template.__dict__) - - for row in RecipientCSV( - s3.get_job_from_s3(str(job.service_id), str(job.id)), - template_type=template.template_type, - placeholders=template.placeholders - ).get_rows(): + for row in recipient_csv.get_rows(): if row.index > resume_from_row: process_row(row, template, job, job.service) diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index b84b56a68..2f549a4de 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -191,17 +191,17 @@ def can_letter_job_be_cancelled(job): def find_jobs_with_missing_rows(): + # Jobs can be a maximum of 50,000 rows. It typically takes 5 minutes to create all those notifications. + # Using 10 minutes as a condition seems reasonable. + ten_minutes_ago = datetime.utcnow() - timedelta(minutes=10) jobs_with_rows_missing = db.session.query( func.count(Notification.id).label('count_notifications'), - Job.notification_count, - Job.id.label('job_id'), - Job.service_id + Job ).filter( - Job.job_status == JOB_STATUS_FINISHED + Job.job_status == JOB_STATUS_FINISHED, + Job.processing_finished < ten_minutes_ago ).group_by( - Job.notification_count, - Job.id.label('job_id'), - Job.service_id + Job ).having( func.count(Notification.id) != Job.notification_count ) @@ -211,7 +211,7 @@ def find_jobs_with_missing_rows(): def find_missing_row_for_job(job_id, job_size): expected_row_numbers = db.session.query( - func.generate_series(0, job_size-1).label('row') + func.generate_series(0, job_size - 1).label('row') ).subquery() query = db.session.query( @@ -220,6 +220,6 @@ def find_missing_row_for_job(job_id, job_size): ).outerjoin( Notification, and_(expected_row_numbers.c.row == Notification.job_row_number, Notification.job_id == job_id) ).filter( - Notification.job_row_number == None #noqa + Notification.job_row_number == None # noqa ) return query.all() diff --git a/migrations/versions/0308_add_uq_key_row_number.py b/migrations/versions/0308_add_uq_key_row_number.py new file mode 100644 index 000000000..017204084 --- /dev/null +++ b/migrations/versions/0308_add_uq_key_row_number.py @@ -0,0 +1,19 @@ +""" + +Revision ID: 0308_add_uq_key_row_number +Revises: 0307_delete_dm_datetime +Create Date: 2019-11-05 10:12:03.627850 + +""" +from alembic import op + +revision = '0308_add_uq_key_row_number' +down_revision = '0307_delete_dm_datetime' + + +def upgrade(): + op.create_unique_constraint('uq_notifications_job_row_number', 'notifications', ['job_id', 'job_row_number']) + + +def downgrade(): + op.drop_constraint('uq_notifications_job_row_number') diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 4bfa5de00..f8d0f0a1d 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -3,6 +3,7 @@ from unittest.mock import call import pytest from freezegun import freeze_time +from mock import mock from app import db from app.celery import scheduled_tasks @@ -16,6 +17,7 @@ from app.celery.scheduled_tasks import ( replay_created_notifications, check_precompiled_letter_state, check_templated_letter_state, + check_for_missing_rows_in_completed_jobs ) from app.config import QueueNames, TaskNames from app.dao.jobs_dao import dao_get_job_by_id @@ -32,6 +34,7 @@ from app.models import ( NOTIFICATION_PENDING_VIRUS_CHECK, ) from app.v2.errors import JobIncompleteError +from tests.app import load_example_csv from tests.app.db import ( create_notification, @@ -403,3 +406,47 @@ def test_check_templated_letter_state_during_utc(mocker, sample_letter_template) subject="[test] Letters still in 'created' status", ticket_type='incident' ) + + +def test_check_for_missing_rows_in_completed_jobs(mocker, sample_email_template): + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email')) + mocker.patch('app.encryption.encrypt', return_value="something_encrypted") + process_row = mocker.patch('app.celery.scheduled_tasks.process_row') + + job = create_job(template=sample_email_template, + notification_count=5, + job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=11)) + for i in range(0, 4): + create_notification(job=job, job_row_number=i) + + check_for_missing_rows_in_completed_jobs() + + process_row.assert_called_once_with( + mock.ANY, mock.ANY, job, job.service + ) + + +def test_check_for_missing_rows_in_completed_jobs_calls_save_email(mocker, sample_email_template): + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email')) + save_email_task = mocker.patch('app.celery.tasks.save_email.apply_async') + mocker.patch('app.encryption.encrypt', return_value="something_encrypted") + mocker.patch('app.celery.tasks.create_uuid', return_value='uuid') + + job = create_job(template=sample_email_template, + notification_count=5, + job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=11)) + for i in range(0, 4): + create_notification(job=job, job_row_number=i) + + check_for_missing_rows_in_completed_jobs() + save_email_task.assert_called_once_with( + ( + str(job.service_id), + "uuid", + "something_encrypted", + ), + {}, + queue="database-tasks" + ) diff --git a/tests/app/dao/test_jobs_dao.py b/tests/app/dao/test_jobs_dao.py index 3d32f9174..129deb016 100644 --- a/tests/app/dao/test_jobs_dao.py +++ b/tests/app/dao/test_jobs_dao.py @@ -4,6 +4,7 @@ from functools import partial import pytest from freezegun import freeze_time +from sqlalchemy.exc import IntegrityError from app.dao.jobs_dao import ( can_letter_job_be_cancelled, @@ -415,21 +416,45 @@ def test_can_letter_job_be_cancelled_returns_false_and_error_message_if_notifica def test_find_jobs_with_missing_rows(sample_email_template): - job = create_job(template=sample_email_template, notification_count=5, job_status=JOB_STATUS_FINISHED) + job = create_job(template=sample_email_template, + notification_count=5, + job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=11) + ) for i in range(0, 4): create_notification(job=job, job_row_number=i) results = find_jobs_with_missing_rows() assert len(results) == 1 - assert results[0] == (4, 4, job.id, job.service_id) + assert results[0][0] == 4 + assert results[0][1] == job + + +def test_find_jobs_with_missing_rows_returns_nothing_for_a_job_completed_less_than_10_minutes_ago( + sample_email_template +): + job = create_job(template=sample_email_template, + notification_count=5, + job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=9) + ) + for i in range(0, 4): + create_notification(job=job, job_row_number=i) + + results = find_jobs_with_missing_rows() + + assert len(results) == 0 @pytest.mark.parametrize('status', ['pending', 'in progress', 'cancelled', 'scheduled']) def test_find_jobs_with_missing_rows_doesnt_return_jobs_that_are_not_finished( sample_email_template, status ): - job = create_job(template=sample_email_template, notification_count=5, job_status=status) + job = create_job(template=sample_email_template, + notification_count=5, + job_status=status, + processing_finished=datetime.utcnow() - timedelta(minutes=11)) for i in range(0, 4): create_notification(job=job, job_row_number=i) @@ -439,7 +464,10 @@ def test_find_jobs_with_missing_rows_doesnt_return_jobs_that_are_not_finished( def test_find_missing_row_for_job(sample_email_template): - job = create_job(template=sample_email_template, notification_count=5, job_status=JOB_STATUS_FINISHED) + job = create_job(template=sample_email_template, + notification_count=5, + job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=11)) create_notification(job=job, job_row_number=0) create_notification(job=job, job_row_number=1) create_notification(job=job, job_row_number=3) @@ -451,7 +479,8 @@ def test_find_missing_row_for_job(sample_email_template): def test_find_missing_row_for_job_more_than_one_missing_row(sample_email_template): - job = create_job(template=sample_email_template, notification_count=5, job_status=JOB_STATUS_FINISHED) + job = create_job(template=sample_email_template, notification_count=5, job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=11)) create_notification(job=job, job_row_number=0) create_notification(job=job, job_row_number=1) create_notification(job=job, job_row_number=4) @@ -463,10 +492,18 @@ def test_find_missing_row_for_job_more_than_one_missing_row(sample_email_templat def test_find_missing_row_for_job_return_none_when_row_isnt_missing(sample_email_template): - job = create_job(template=sample_email_template, notification_count=5, job_status=JOB_STATUS_FINISHED) + job = create_job(template=sample_email_template, notification_count=5, job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=11)) for i in range(0, 5): create_notification(job=job, job_row_number=i) results = find_missing_row_for_job(job.id, 5) print(results) assert len(results) == 0 + + +def test_unique_key_on_job_id_and_job_row_number(sample_email_template): + job = create_job(template=sample_email_template) + create_notification(job=job, job_row_number=0) + with pytest.raises(expected_exception=IntegrityError): + create_notification(job=job, job_row_number=0) diff --git a/tests/app/db.py b/tests/app/db.py index a52904bed..341670b90 100644 --- a/tests/app/db.py +++ b/tests/app/db.py @@ -377,6 +377,7 @@ def create_job( job_status='pending', scheduled_for=None, processing_started=None, + processing_finished=None, original_file_name='some.csv', archived=False ): @@ -393,6 +394,7 @@ def create_job( 'job_status': job_status, 'scheduled_for': scheduled_for, 'processing_started': processing_started, + 'processing_finished': processing_finished, 'archived': archived } job = Job(**data)