diff --git a/app/aws/s3.py b/app/aws/s3.py index a926bb101..cb4fef679 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -37,6 +37,11 @@ def get_job_location(service_id, job_id): ) +def get_job_and_metadata_from_s3(service_id, job_id): + obj = get_s3_object(*get_job_location(service_id, job_id)) + return obj.get()['Body'].read().decode('utf-8'), obj.get()['Metadata'] + + def get_job_from_s3(service_id, job_id): obj = get_s3_object(*get_job_location(service_id, job_id)) return obj.get()['Body'].read().decode('utf-8') diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 383f80462..5edc35b0b 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -11,7 +11,7 @@ from sqlalchemy.exc import SQLAlchemyError from app import notify_celery, zendesk_client from app.celery.tasks import ( process_job, - get_recipient_csv_and_template, + get_recipient_csv_and_template_and_sender_id, process_row ) from app.config import QueueNames, TaskNames @@ -239,12 +239,9 @@ def check_for_missing_rows_in_completed_jobs(): job = x[1] missing_rows = find_missing_row_for_job(job.id, job.notification_count) for row_to_process in missing_rows: - # The sender_id is passed in with job, at this point we no longer have the sender that is passed in. - # The notification will be created with the default sender. - # There is a bug to fix this https://www.pivotaltracker.com/story/show/169569144 - recipient_csv, template = get_recipient_csv_and_template(job) + recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job) for row in recipient_csv.get_rows(): if row.index == row_to_process.missing_row: current_app.logger.info( "Processing missing row: {} for job: {}".format(row_to_process.missing_row, job.id)) - process_row(row, template, job, job.service) + process_row(row, template, job, job.service, sender_id=sender_id) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index f489f281c..588e14054 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -97,7 +97,7 @@ def process_job(job_id, sender_id=None): job.processing_started = start dao_update_job(job) - recipient_csv, template = get_recipient_csv_and_template(job) + recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job) current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count)) @@ -124,16 +124,17 @@ def job_complete(job, resumed=False, start=None): ) -def get_recipient_csv_and_template(job): +def get_recipient_csv_and_template_and_sender_id(job): db_template = dao_get_template_by_id(job.template_id, job.template_version) TemplateClass = get_template_class(db_template.template_type) template = TemplateClass(db_template.__dict__) - - recipient_csv = RecipientCSV(file_data=s3.get_job_from_s3(str(job.service_id), str(job.id)), + contents, meta_data = s3.get_job_and_metadata_from_s3(service_id=str(job.service_id), job_id=str(job.id)) + recipient_csv = RecipientCSV(file_data=contents, template_type=template.template_type, placeholders=template.placeholders) - return recipient_csv, template + + return recipient_csv, template, meta_data.get("sender_id") def process_row(row, template, job, service, sender_id=None): @@ -601,11 +602,11 @@ def process_incomplete_job(job_id): current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row)) - recipient_csv, template = get_recipient_csv_and_template(job) + recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job) for row in recipient_csv.get_rows(): if row.index > resume_from_row: - process_row(row, template, job, job.service) + process_row(row, template, job, job.service, sender_id=sender_id) job_complete(job, resumed=True) diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index f8d0f0a1d..7c4f1cc09 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -409,7 +409,8 @@ def test_check_templated_letter_state_during_utc(mocker, sample_letter_template) def test_check_for_missing_rows_in_completed_jobs(mocker, sample_email_template): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_email'), {"sender_id": None})) mocker.patch('app.encryption.encrypt', return_value="something_encrypted") process_row = mocker.patch('app.celery.scheduled_tasks.process_row') @@ -423,12 +424,13 @@ def test_check_for_missing_rows_in_completed_jobs(mocker, sample_email_template) check_for_missing_rows_in_completed_jobs() process_row.assert_called_once_with( - mock.ANY, mock.ANY, job, job.service + mock.ANY, mock.ANY, job, job.service, sender_id=None ) def test_check_for_missing_rows_in_completed_jobs_calls_save_email(mocker, sample_email_template): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_email'), {'sender_id': None})) save_email_task = mocker.patch('app.celery.tasks.save_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value='uuid') @@ -450,3 +452,21 @@ def test_check_for_missing_rows_in_completed_jobs_calls_save_email(mocker, sampl {}, queue="database-tasks" ) + + +def test_check_for_missing_rows_in_completed_jobs_uses_sender_id(mocker, sample_email_template, fake_uuid): + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_email'), {'sender_id': fake_uuid})) + mock_process_row = mocker.patch('app.celery.scheduled_tasks.process_row') + + job = create_job(template=sample_email_template, + notification_count=5, + job_status=JOB_STATUS_FINISHED, + processing_finished=datetime.utcnow() - timedelta(minutes=11)) + for i in range(0, 4): + create_notification(job=job, job_row_number=i) + + check_for_missing_rows_in_completed_jobs() + mock_process_row.assert_called_once_with( + mock.ANY, mock.ANY, job, job.service, sender_id=fake_uuid + ) diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index fdffffc25..515ef5f8c 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -100,15 +100,16 @@ def email_job_with_placeholders(notify_db, notify_db_session, sample_email_templ def test_should_process_sms_job(sample_job, mocker): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('sms'), {'sender_id': None})) mocker.patch('app.celery.tasks.save_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") process_job(sample_job.id) - s3.get_job_from_s3.assert_called_once_with( - str(sample_job.service.id), - str(sample_job.id) + s3.get_job_and_metadata_from_s3.assert_called_once_with( + service_id=str(sample_job.service.id), + job_id=str(sample_job.id) ) assert encryption.encrypt.call_args[0][0]['to'] == '+441234123123' assert encryption.encrypt.call_args[0][0]['template'] == str(sample_job.template.id) @@ -127,7 +128,8 @@ def test_should_process_sms_job(sample_job, mocker): def test_should_process_sms_job_with_sender_id(sample_job, mocker, fake_uuid): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('sms'), {'sender_id': fake_uuid})) mocker.patch('app.celery.tasks.save_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") @@ -150,14 +152,15 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits( service = create_service(message_limit=9) template = create_template(service=service) job = create_job(template=template, notification_count=10, original_file_name='multiple_sms.csv') - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_sms'), {'sender_id': None})) mocker.patch('app.celery.tasks.process_row') process_job(job.id) job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' - assert s3.get_job_from_s3.called is False + assert s3.get_job_and_metadata_from_s3.called is False assert tasks.process_row.called is False @@ -170,14 +173,15 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today( create_notification(template=template, job=job) - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('sms'), {'sender_id': None})) mocker.patch('app.celery.tasks.process_row') process_job(job.id) job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' - assert s3.get_job_from_s3.called is False + assert s3.get_job_and_metadata_from_s3.called is False assert tasks.process_row.called is False @@ -189,26 +193,26 @@ def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(noti create_notification(template=template, job=job) - mocker.patch('app.celery.tasks.s3.get_job_from_s3') + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3') mocker.patch('app.celery.tasks.process_row') process_job(job.id) job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' - assert s3.get_job_from_s3.called is False + assert s3.get_job_and_metadata_from_s3.called is False assert tasks.process_row.called is False def test_should_not_process_job_if_already_pending(sample_template, mocker): job = create_job(template=sample_template, job_status='scheduled') - mocker.patch('app.celery.tasks.s3.get_job_from_s3') + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3') mocker.patch('app.celery.tasks.process_row') process_job(job.id) - assert s3.get_job_from_s3.called is False + assert s3.get_job_and_metadata_from_s3.called is False assert tasks.process_row.called is False @@ -218,16 +222,17 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db_session, template = create_template(service=service, template_type='email') job = create_job(template=template, notification_count=10) - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_email'), {"sender_id": None})) mocker.patch('app.celery.tasks.save_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") process_job(job.id) - s3.get_job_from_s3.assert_called_once_with( - str(job.service.id), - str(job.id) + s3.get_job_and_metadata_from_s3.assert_called_once_with( + service_id=str(job.service.id), + job_id=str(job.id) ) job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'finished' @@ -243,14 +248,15 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db_session, def test_should_not_create_save_task_for_empty_file(sample_job, mocker): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('empty')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('empty'), {"sender_id": None})) mocker.patch('app.celery.tasks.save_sms.apply_async') process_job(sample_job.id) - s3.get_job_from_s3.assert_called_once_with( - str(sample_job.service.id), - str(sample_job.id) + s3.get_job_and_metadata_from_s3.assert_called_once_with( + service_id=str(sample_job.service.id), + job_id=str(sample_job.id) ) job = jobs_dao.dao_get_job_by_id(sample_job.id) assert job.job_status == 'finished' @@ -261,16 +267,16 @@ def test_should_process_email_job(email_job_with_placeholders, mocker): email_csv = """email_address,name test@test.com,foo """ - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=email_csv) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(email_csv, {"sender_id": None})) mocker.patch('app.celery.tasks.save_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") process_job(email_job_with_placeholders.id) - s3.get_job_from_s3.assert_called_once_with( - str(email_job_with_placeholders.service.id), - str(email_job_with_placeholders.id) + s3.get_job_and_metadata_from_s3.assert_called_once_with( + service_id=str(email_job_with_placeholders.service.id), + job_id=str(email_job_with_placeholders.id) ) assert encryption.encrypt.call_args[0][0]['to'] == 'test@test.com' assert encryption.encrypt.call_args[0][0]['template'] == str(email_job_with_placeholders.template.id) @@ -293,7 +299,7 @@ def test_should_process_email_job_with_sender_id(email_job_with_placeholders, mo email_csv = """email_address,name test@test.com,foo """ - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=email_csv) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(email_csv, {"sender_id": fake_uuid})) mocker.patch('app.celery.tasks.save_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") @@ -314,15 +320,16 @@ def test_should_process_letter_job(sample_letter_job, mocker): csv = """address_line_1,address_line_2,address_line_3,address_line_4,postcode,name A1,A2,A3,A4,A_POST,Alice """ - s3_mock = mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=csv) + s3_mock = mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(csv, {"sender_id": None})) process_row_mock = mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") process_job(sample_letter_job.id) s3_mock.assert_called_once_with( - str(sample_letter_job.service.id), - str(sample_letter_job.id) + service_id=str(sample_letter_job.service.id), + job_id=str(sample_letter_job.id) ) row_call = process_row_mock.mock_calls[0][1] @@ -345,16 +352,17 @@ def test_should_process_letter_job(sample_letter_job, mocker): def test_should_process_all_sms_job(sample_job_with_placeholdered_template, mocker): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_sms'), {"sender_id": None})) mocker.patch('app.celery.tasks.save_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") process_job(sample_job_with_placeholdered_template.id) - s3.get_job_from_s3.assert_called_once_with( - str(sample_job_with_placeholdered_template.service.id), - str(sample_job_with_placeholdered_template.id) + s3.get_job_and_metadata_from_s3.assert_called_once_with( + service_id=str(sample_job_with_placeholdered_template.service.id), + job_id=str(sample_job_with_placeholdered_template.id) ) assert encryption.encrypt.call_args[0][0]['to'] == '+441234123120' assert encryption.encrypt.call_args[0][0]['template'] == str(sample_job_with_placeholdered_template.template.id) @@ -1373,7 +1381,8 @@ def test_send_inbound_sms_to_service_does_not_retries_if_request_returns_404(not def test_process_incomplete_job_sms(mocker, sample_template): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_sms'), {'sender_id': None})) save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') job = create_job(template=sample_template, notification_count=10, @@ -1398,7 +1407,8 @@ def test_process_incomplete_job_sms(mocker, sample_template): def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_template): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_sms'), {'sender_id': None})) mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') job = create_job(template=sample_template, notification_count=10, @@ -1431,7 +1441,8 @@ def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_templ def test_process_incomplete_jobs_sms(mocker, sample_template): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_sms'), {'sender_id': None})) mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') job = create_job(template=sample_template, notification_count=10, @@ -1473,7 +1484,8 @@ def test_process_incomplete_jobs_sms(mocker, sample_template): def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_sms'), {'sender_id': None})) mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') job = create_job(template=sample_template, notification_count=10, @@ -1495,7 +1507,8 @@ def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template) def test_process_incomplete_jobs(mocker): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_sms'), {'sender_id': None})) mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') jobs = [] @@ -1506,7 +1519,8 @@ def test_process_incomplete_jobs(mocker): def test_process_incomplete_job_no_job_in_database(mocker, fake_uuid): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_sms'), {'sender_id': None})) mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') with pytest.raises(expected_exception=Exception): @@ -1517,7 +1531,8 @@ def test_process_incomplete_job_no_job_in_database(mocker, fake_uuid): def test_process_incomplete_job_email(mocker, sample_email_template): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_email'), {'sender_id': None})) mock_email_saver = mocker.patch('app.celery.tasks.save_email.apply_async') job = create_job(template=sample_email_template, notification_count=10, @@ -1541,7 +1556,8 @@ def test_process_incomplete_job_email(mocker, sample_email_template): def test_process_incomplete_job_letter(mocker, sample_letter_template): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_letter')) + mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', + return_value=(load_example_csv('multiple_letter'), {'sender_id': None})) mock_letter_saver = mocker.patch('app.celery.tasks.save_letter.apply_async') job = create_job(template=sample_letter_template, notification_count=10,