diff --git a/app/celery/tasks.py b/app/celery/tasks.py index a7713e668..dc69427a5 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -112,7 +112,7 @@ def process_job(job_id, sender_id=None): template_type=template.template_type, placeholders=template.placeholders ).rows: - process_row(row, template, job, service) + process_row(row, template, job, service, sender_id=sender_id) job_complete(job, start=start) @@ -134,7 +134,7 @@ def job_complete(job, resumed=False, start=None): ) -def process_row(row, template, job, service): +def process_row(row, template, job, service, sender_id=None): template_type = template.template_type encrypted = encryption.encrypt({ 'template': str(template.id), @@ -153,12 +153,17 @@ def process_row(row, template, job, service): send_fn = send_fns[template_type] + task_kwargs = {} + if sender_id: + task_kwargs['sender_id'] = sender_id + send_fn.apply_async( ( str(service.id), create_uuid(), encrypted, ), + task_kwargs, queue=QueueNames.DATABASE if not service.research_mode else QueueNames.RESEARCH_MODE ) diff --git a/app/job/rest.py b/app/job/rest.py index fd1ee56ef..6e64e045c 100644 --- a/app/job/rest.py +++ b/app/job/rest.py @@ -150,8 +150,10 @@ def create_job(service_id): dao_create_job(job) + sender_id = data.get('sender_id') + if job.job_status == JOB_STATUS_PENDING: - process_job.apply_async([str(job.id)], queue=QueueNames.JOBS) + process_job.apply_async([str(job.id)], {'sender_id': sender_id}, queue=QueueNames.JOBS) job_json = job_schema.dump(job).data job_json['statistics'] = [] diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 0348c6438..84ad84cab 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -122,12 +122,30 @@ def test_should_process_sms_job(sample_job, mocker): (str(sample_job.service_id), "uuid", "something_encrypted"), + {}, queue="database-tasks" ) job = jobs_dao.dao_get_job_by_id(sample_job.id) assert job.job_status == 'finished' +def test_should_process_sms_job_with_sender_id(sample_job, mocker, fake_uuid): + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) + mocker.patch('app.celery.tasks.save_sms.apply_async') + mocker.patch('app.encryption.encrypt', return_value="something_encrypted") + mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") + + process_job(sample_job.id, sender_id=fake_uuid) + + tasks.save_sms.apply_async.assert_called_once_with( + (str(sample_job.service_id), + "uuid", + "something_encrypted"), + {'sender_id': fake_uuid}, + queue="database-tasks" + ) + + @freeze_time("2016-01-01 11:09:00.061258") def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, notify_db_session, @@ -238,6 +256,7 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db, "uuid", "something_encrypted", ), + {}, queue="database-tasks" ) @@ -282,12 +301,33 @@ def test_should_process_email_job(email_job_with_placeholders, mocker): "uuid", "something_encrypted", ), + {}, queue="database-tasks" ) job = jobs_dao.dao_get_job_by_id(email_job_with_placeholders.id) assert job.job_status == 'finished' +def test_should_process_email_job_with_sender_id(email_job_with_placeholders, mocker, fake_uuid): + email_csv = """email_address,name + test@test.com,foo + """ + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=email_csv) + mocker.patch('app.celery.tasks.save_email.apply_async') + mocker.patch('app.encryption.encrypt', return_value="something_encrypted") + mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") + + process_job(email_job_with_placeholders.id, sender_id=fake_uuid) + + tasks.save_email.apply_async.assert_called_once_with( + (str(email_job_with_placeholders.service_id), + "uuid", + "something_encrypted"), + {'sender_id': fake_uuid}, + queue="database-tasks" + ) + + @freeze_time("2016-01-01 11:09:00.061258") def test_should_process_letter_job(sample_letter_job, mocker): csv = """address_line_1,address_line_2,address_line_3,address_line_4,postcode,name @@ -393,8 +433,44 @@ def test_process_row_sends_letter_task(template_type, research_mode, expected_fu # encrypted data encrypt_mock.return_value, ), + {}, queue=expected_queue ) + + +def test_process_row_when_sender_id_is_provided(mocker, fake_uuid): + mocker.patch('app.celery.tasks.create_uuid', return_value='noti_uuid') + task_mock = mocker.patch('app.celery.tasks.save_sms.apply_async') + encrypt_mock = mocker.patch('app.celery.tasks.encryption.encrypt') + template = Mock(id='template_id', template_type=SMS_TYPE) + job = Mock(id='job_id', template_version='temp_vers') + service = Mock(id='service_id', research_mode=False) + + process_row( + Row( + {'foo': 'bar', 'to': 'recip'}, + index='row_num', + error_fn=lambda k, v: None, + recipient_column_headers=['to'], + placeholders={'foo'}, + template=template, + ), + template, + job, + service, + sender_id=fake_uuid + ) + + task_mock.assert_called_once_with( + ( + 'service_id', + 'noti_uuid', + # encrypted data + encrypt_mock.return_value, + ), + {'sender_id': fake_uuid}, + queue='database-tasks' + ) # -------- save_sms and save_email tests -------- # diff --git a/tests/app/job/test_rest.py b/tests/app/job/test_rest.py index 202996cfc..dcf0cf5f8 100644 --- a/tests/app/job/test_rest.py +++ b/tests/app/job/test_rest.py @@ -98,6 +98,7 @@ def test_create_unscheduled_job(client, sample_template, mocker, fake_uuid): app.celery.tasks.process_job.apply_async.assert_called_once_with( ([str(fake_uuid)]), + {'sender_id': None}, queue="job-tasks" ) @@ -113,6 +114,36 @@ def test_create_unscheduled_job(client, sample_template, mocker, fake_uuid): assert resp_json['data']['notification_count'] == 1 +def test_create_unscheduled_job_with_sender_id_in_metadata(client, sample_template, mocker, fake_uuid): + mocker.patch('app.celery.tasks.process_job.apply_async') + mocker.patch('app.job.rest.get_job_metadata_from_s3', return_value={ + 'template_id': str(sample_template.id), + 'original_file_name': 'thisisatest.csv', + 'notification_count': '1', + 'valid': 'True', + 'sender_id': fake_uuid + }) + data = { + 'id': fake_uuid, + 'created_by': str(sample_template.created_by.id), + } + path = '/service/{}/job'.format(sample_template.service.id) + auth_header = create_authorization_header() + headers = [('Content-Type', 'application/json'), auth_header] + + response = client.post( + path, + data=json.dumps(data), + headers=headers) + assert response.status_code == 201 + + app.celery.tasks.process_job.apply_async.assert_called_once_with( + ([str(fake_uuid)]), + {'sender_id': fake_uuid}, + queue="job-tasks" + ) + + @freeze_time("2016-01-01 12:00:00.000000") def test_create_scheduled_job(client, sample_template, mocker, fake_uuid): scheduled_date = (datetime.utcnow() + timedelta(hours=95, minutes=59)).isoformat()