Merge pull request #2659 from alphagov/get-sender-id-from-metadata

Get sender id from metadata
This commit is contained in:
Rebecca Law
2019-11-18 10:57:59 +00:00
committed by GitHub
5 changed files with 96 additions and 57 deletions

View File

@@ -37,6 +37,11 @@ def get_job_location(service_id, job_id):
)
def get_job_and_metadata_from_s3(service_id, job_id):
obj = get_s3_object(*get_job_location(service_id, job_id))
return obj.get()['Body'].read().decode('utf-8'), obj.get()['Metadata']
def get_job_from_s3(service_id, job_id):
obj = get_s3_object(*get_job_location(service_id, job_id))
return obj.get()['Body'].read().decode('utf-8')

View File

@@ -11,7 +11,7 @@ from sqlalchemy.exc import SQLAlchemyError
from app import notify_celery, zendesk_client
from app.celery.tasks import (
process_job,
get_recipient_csv_and_template,
get_recipient_csv_and_template_and_sender_id,
process_row
)
from app.config import QueueNames, TaskNames
@@ -239,12 +239,9 @@ def check_for_missing_rows_in_completed_jobs():
job = x[1]
missing_rows = find_missing_row_for_job(job.id, job.notification_count)
for row_to_process in missing_rows:
# The sender_id is passed in with job, at this point we no longer have the sender that is passed in.
# The notification will be created with the default sender.
# There is a bug to fix this https://www.pivotaltracker.com/story/show/169569144
recipient_csv, template = get_recipient_csv_and_template(job)
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job)
for row in recipient_csv.get_rows():
if row.index == row_to_process.missing_row:
current_app.logger.info(
"Processing missing row: {} for job: {}".format(row_to_process.missing_row, job.id))
process_row(row, template, job, job.service)
process_row(row, template, job, job.service, sender_id=sender_id)

View File

@@ -97,7 +97,7 @@ def process_job(job_id, sender_id=None):
job.processing_started = start
dao_update_job(job)
recipient_csv, template = get_recipient_csv_and_template(job)
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job)
current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count))
@@ -124,16 +124,17 @@ def job_complete(job, resumed=False, start=None):
)
def get_recipient_csv_and_template(job):
def get_recipient_csv_and_template_and_sender_id(job):
db_template = dao_get_template_by_id(job.template_id, job.template_version)
TemplateClass = get_template_class(db_template.template_type)
template = TemplateClass(db_template.__dict__)
recipient_csv = RecipientCSV(file_data=s3.get_job_from_s3(str(job.service_id), str(job.id)),
contents, meta_data = s3.get_job_and_metadata_from_s3(service_id=str(job.service_id), job_id=str(job.id))
recipient_csv = RecipientCSV(file_data=contents,
template_type=template.template_type,
placeholders=template.placeholders)
return recipient_csv, template
return recipient_csv, template, meta_data.get("sender_id")
def process_row(row, template, job, service, sender_id=None):
@@ -601,11 +602,11 @@ def process_incomplete_job(job_id):
current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row))
recipient_csv, template = get_recipient_csv_and_template(job)
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job)
for row in recipient_csv.get_rows():
if row.index > resume_from_row:
process_row(row, template, job, job.service)
process_row(row, template, job, job.service, sender_id=sender_id)
job_complete(job, resumed=True)

View File

@@ -409,7 +409,8 @@ def test_check_templated_letter_state_during_utc(mocker, sample_letter_template)
def test_check_for_missing_rows_in_completed_jobs(mocker, sample_email_template):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_email'), {"sender_id": None}))
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
process_row = mocker.patch('app.celery.scheduled_tasks.process_row')
@@ -423,12 +424,13 @@ def test_check_for_missing_rows_in_completed_jobs(mocker, sample_email_template)
check_for_missing_rows_in_completed_jobs()
process_row.assert_called_once_with(
mock.ANY, mock.ANY, job, job.service
mock.ANY, mock.ANY, job, job.service, sender_id=None
)
def test_check_for_missing_rows_in_completed_jobs_calls_save_email(mocker, sample_email_template):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_email'), {'sender_id': None}))
save_email_task = mocker.patch('app.celery.tasks.save_email.apply_async')
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
mocker.patch('app.celery.tasks.create_uuid', return_value='uuid')
@@ -450,3 +452,21 @@ def test_check_for_missing_rows_in_completed_jobs_calls_save_email(mocker, sampl
{},
queue="database-tasks"
)
def test_check_for_missing_rows_in_completed_jobs_uses_sender_id(mocker, sample_email_template, fake_uuid):
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_email'), {'sender_id': fake_uuid}))
mock_process_row = mocker.patch('app.celery.scheduled_tasks.process_row')
job = create_job(template=sample_email_template,
notification_count=5,
job_status=JOB_STATUS_FINISHED,
processing_finished=datetime.utcnow() - timedelta(minutes=11))
for i in range(0, 4):
create_notification(job=job, job_row_number=i)
check_for_missing_rows_in_completed_jobs()
mock_process_row.assert_called_once_with(
mock.ANY, mock.ANY, job, job.service, sender_id=fake_uuid
)

View File

@@ -100,15 +100,16 @@ def email_job_with_placeholders(notify_db, notify_db_session, sample_email_templ
def test_should_process_sms_job(sample_job, mocker):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('sms'), {'sender_id': None}))
mocker.patch('app.celery.tasks.save_sms.apply_async')
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
mocker.patch('app.celery.tasks.create_uuid', return_value="uuid")
process_job(sample_job.id)
s3.get_job_from_s3.assert_called_once_with(
str(sample_job.service.id),
str(sample_job.id)
s3.get_job_and_metadata_from_s3.assert_called_once_with(
service_id=str(sample_job.service.id),
job_id=str(sample_job.id)
)
assert encryption.encrypt.call_args[0][0]['to'] == '+441234123123'
assert encryption.encrypt.call_args[0][0]['template'] == str(sample_job.template.id)
@@ -127,7 +128,8 @@ def test_should_process_sms_job(sample_job, mocker):
def test_should_process_sms_job_with_sender_id(sample_job, mocker, fake_uuid):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('sms'), {'sender_id': fake_uuid}))
mocker.patch('app.celery.tasks.save_sms.apply_async')
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
mocker.patch('app.celery.tasks.create_uuid', return_value="uuid")
@@ -150,14 +152,15 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits(
service = create_service(message_limit=9)
template = create_template(service=service)
job = create_job(template=template, notification_count=10, original_file_name='multiple_sms.csv')
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_sms'), {'sender_id': None}))
mocker.patch('app.celery.tasks.process_row')
process_job(job.id)
job = jobs_dao.dao_get_job_by_id(job.id)
assert job.job_status == 'sending limits exceeded'
assert s3.get_job_from_s3.called is False
assert s3.get_job_and_metadata_from_s3.called is False
assert tasks.process_row.called is False
@@ -170,14 +173,15 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(
create_notification(template=template, job=job)
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('sms'), {'sender_id': None}))
mocker.patch('app.celery.tasks.process_row')
process_job(job.id)
job = jobs_dao.dao_get_job_by_id(job.id)
assert job.job_status == 'sending limits exceeded'
assert s3.get_job_from_s3.called is False
assert s3.get_job_and_metadata_from_s3.called is False
assert tasks.process_row.called is False
@@ -189,26 +193,26 @@ def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(noti
create_notification(template=template, job=job)
mocker.patch('app.celery.tasks.s3.get_job_from_s3')
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3')
mocker.patch('app.celery.tasks.process_row')
process_job(job.id)
job = jobs_dao.dao_get_job_by_id(job.id)
assert job.job_status == 'sending limits exceeded'
assert s3.get_job_from_s3.called is False
assert s3.get_job_and_metadata_from_s3.called is False
assert tasks.process_row.called is False
def test_should_not_process_job_if_already_pending(sample_template, mocker):
job = create_job(template=sample_template, job_status='scheduled')
mocker.patch('app.celery.tasks.s3.get_job_from_s3')
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3')
mocker.patch('app.celery.tasks.process_row')
process_job(job.id)
assert s3.get_job_from_s3.called is False
assert s3.get_job_and_metadata_from_s3.called is False
assert tasks.process_row.called is False
@@ -218,16 +222,17 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db_session,
template = create_template(service=service, template_type='email')
job = create_job(template=template, notification_count=10)
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_email'), {"sender_id": None}))
mocker.patch('app.celery.tasks.save_email.apply_async')
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
mocker.patch('app.celery.tasks.create_uuid', return_value="uuid")
process_job(job.id)
s3.get_job_from_s3.assert_called_once_with(
str(job.service.id),
str(job.id)
s3.get_job_and_metadata_from_s3.assert_called_once_with(
service_id=str(job.service.id),
job_id=str(job.id)
)
job = jobs_dao.dao_get_job_by_id(job.id)
assert job.job_status == 'finished'
@@ -243,14 +248,15 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db_session,
def test_should_not_create_save_task_for_empty_file(sample_job, mocker):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('empty'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('empty'), {"sender_id": None}))
mocker.patch('app.celery.tasks.save_sms.apply_async')
process_job(sample_job.id)
s3.get_job_from_s3.assert_called_once_with(
str(sample_job.service.id),
str(sample_job.id)
s3.get_job_and_metadata_from_s3.assert_called_once_with(
service_id=str(sample_job.service.id),
job_id=str(sample_job.id)
)
job = jobs_dao.dao_get_job_by_id(sample_job.id)
assert job.job_status == 'finished'
@@ -261,16 +267,16 @@ def test_should_process_email_job(email_job_with_placeholders, mocker):
email_csv = """email_address,name
test@test.com,foo
"""
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=email_csv)
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(email_csv, {"sender_id": None}))
mocker.patch('app.celery.tasks.save_email.apply_async')
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
mocker.patch('app.celery.tasks.create_uuid', return_value="uuid")
process_job(email_job_with_placeholders.id)
s3.get_job_from_s3.assert_called_once_with(
str(email_job_with_placeholders.service.id),
str(email_job_with_placeholders.id)
s3.get_job_and_metadata_from_s3.assert_called_once_with(
service_id=str(email_job_with_placeholders.service.id),
job_id=str(email_job_with_placeholders.id)
)
assert encryption.encrypt.call_args[0][0]['to'] == 'test@test.com'
assert encryption.encrypt.call_args[0][0]['template'] == str(email_job_with_placeholders.template.id)
@@ -293,7 +299,7 @@ def test_should_process_email_job_with_sender_id(email_job_with_placeholders, mo
email_csv = """email_address,name
test@test.com,foo
"""
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=email_csv)
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(email_csv, {"sender_id": fake_uuid}))
mocker.patch('app.celery.tasks.save_email.apply_async')
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
mocker.patch('app.celery.tasks.create_uuid', return_value="uuid")
@@ -314,15 +320,16 @@ def test_should_process_letter_job(sample_letter_job, mocker):
csv = """address_line_1,address_line_2,address_line_3,address_line_4,postcode,name
A1,A2,A3,A4,A_POST,Alice
"""
s3_mock = mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=csv)
s3_mock = mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(csv, {"sender_id": None}))
process_row_mock = mocker.patch('app.celery.tasks.process_row')
mocker.patch('app.celery.tasks.create_uuid', return_value="uuid")
process_job(sample_letter_job.id)
s3_mock.assert_called_once_with(
str(sample_letter_job.service.id),
str(sample_letter_job.id)
service_id=str(sample_letter_job.service.id),
job_id=str(sample_letter_job.id)
)
row_call = process_row_mock.mock_calls[0][1]
@@ -345,16 +352,17 @@ def test_should_process_letter_job(sample_letter_job, mocker):
def test_should_process_all_sms_job(sample_job_with_placeholdered_template,
mocker):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_sms'), {"sender_id": None}))
mocker.patch('app.celery.tasks.save_sms.apply_async')
mocker.patch('app.encryption.encrypt', return_value="something_encrypted")
mocker.patch('app.celery.tasks.create_uuid', return_value="uuid")
process_job(sample_job_with_placeholdered_template.id)
s3.get_job_from_s3.assert_called_once_with(
str(sample_job_with_placeholdered_template.service.id),
str(sample_job_with_placeholdered_template.id)
s3.get_job_and_metadata_from_s3.assert_called_once_with(
service_id=str(sample_job_with_placeholdered_template.service.id),
job_id=str(sample_job_with_placeholdered_template.id)
)
assert encryption.encrypt.call_args[0][0]['to'] == '+441234123120'
assert encryption.encrypt.call_args[0][0]['template'] == str(sample_job_with_placeholdered_template.template.id)
@@ -1373,7 +1381,8 @@ def test_send_inbound_sms_to_service_does_not_retries_if_request_returns_404(not
def test_process_incomplete_job_sms(mocker, sample_template):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_sms'), {'sender_id': None}))
save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async')
job = create_job(template=sample_template, notification_count=10,
@@ -1398,7 +1407,8 @@ def test_process_incomplete_job_sms(mocker, sample_template):
def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_template):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_sms'), {'sender_id': None}))
mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async')
job = create_job(template=sample_template, notification_count=10,
@@ -1431,7 +1441,8 @@ def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_templ
def test_process_incomplete_jobs_sms(mocker, sample_template):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_sms'), {'sender_id': None}))
mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async')
job = create_job(template=sample_template, notification_count=10,
@@ -1473,7 +1484,8 @@ def test_process_incomplete_jobs_sms(mocker, sample_template):
def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_sms'), {'sender_id': None}))
mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async')
job = create_job(template=sample_template, notification_count=10,
@@ -1495,7 +1507,8 @@ def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template)
def test_process_incomplete_jobs(mocker):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_sms'), {'sender_id': None}))
mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async')
jobs = []
@@ -1506,7 +1519,8 @@ def test_process_incomplete_jobs(mocker):
def test_process_incomplete_job_no_job_in_database(mocker, fake_uuid):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_sms'), {'sender_id': None}))
mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async')
with pytest.raises(expected_exception=Exception):
@@ -1517,7 +1531,8 @@ def test_process_incomplete_job_no_job_in_database(mocker, fake_uuid):
def test_process_incomplete_job_email(mocker, sample_email_template):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_email'), {'sender_id': None}))
mock_email_saver = mocker.patch('app.celery.tasks.save_email.apply_async')
job = create_job(template=sample_email_template, notification_count=10,
@@ -1541,7 +1556,8 @@ def test_process_incomplete_job_email(mocker, sample_email_template):
def test_process_incomplete_job_letter(mocker, sample_letter_template):
mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_letter'))
mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3',
return_value=(load_example_csv('multiple_letter'), {'sender_id': None}))
mock_letter_saver = mocker.patch('app.celery.tasks.save_letter.apply_async')
job = create_job(template=sample_letter_template, notification_count=10,