From ee484ec368a68205e4cfc6c32a27634491244878 Mon Sep 17 00:00:00 2001 From: Imdad Ahad Date: Fri, 12 May 2017 17:39:15 +0100 Subject: [PATCH 1/2] Add get_s3_file method for use in DVLA processing --- app/aws/s3.py | 11 ++++++++--- app/celery/tasks.py | 25 +++++++++++++++++++++++++ tests/app/aws/test_s3.py | 11 +++++++++++ tests/app/celery/test_tasks.py | 34 ++++++++++++++++++++++++++++++++++ 4 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 tests/app/aws/test_s3.py diff --git a/app/aws/s3.py b/app/aws/s3.py index 2aa7aac39..ed206b3a4 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -4,7 +4,12 @@ from flask import current_app FILE_LOCATION_STRUCTURE = 'service-{}-notify/{}.csv' -def get_s3_job_object(bucket_name, file_location): +def get_s3_file(bucket_name, file_location): + s3_file = get_s3_object(bucket_name, file_location) + return s3_file.get()['Body'].read().decode('utf-8') + + +def get_s3_object(bucket_name, file_location): s3 = resource('s3') return s3.Object(bucket_name, file_location) @@ -12,12 +17,12 @@ def get_s3_job_object(bucket_name, file_location): def get_job_from_s3(service_id, job_id): bucket_name = current_app.config['CSV_UPLOAD_BUCKET_NAME'] file_location = FILE_LOCATION_STRUCTURE.format(service_id, job_id) - obj = get_s3_job_object(bucket_name, file_location) + obj = get_s3_object(bucket_name, file_location) return obj.get()['Body'].read().decode('utf-8') def remove_job_from_s3(service_id, job_id): bucket_name = current_app.config['CSV_UPLOAD_BUCKET_NAME'] file_location = FILE_LOCATION_STRUCTURE.format(service_id, job_id) - obj = get_s3_job_object(bucket_name, file_location) + obj = get_s3_object(bucket_name, file_location) return obj.delete() diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 464e3e1be..48923d228 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -1,6 +1,7 @@ import random from datetime import (datetime) +from collections import namedtuple from flask import current_app from notifications_utils.recipients import ( @@ -354,3 +355,27 @@ def get_template_class(template_type): # since we don't need rendering capabilities (we only need to extract placeholders) both email and letter can # use the same base template return WithSubjectTemplate + + +@notify_celery.task(bind=True, name='update-letter-notifications-statuses') +@statsd(namespace="tasks") +def update_letter_notifications_statuses(self, filename): + bucket_location = '{}-ftp'.format(current_app.config['NOTIFY_EMAIL_DOMAIN']) + response_file = s3.get_s3_file(bucket_location, filename) + + try: + NotificationUpdate = namedtuple('NotificationUpdate', ['reference', 'status', 'page_count', 'cost_threshold']) + notification_updates = [NotificationUpdate(*line.split('|')) for line in response_file.splitlines()] + + except TypeError: + current_app.logger.exception('DVLA response file: {} has an invalid format'.format(filename)) + raise + + else: + if notification_updates: + for update in notification_updates: + current_app.logger.info('DVLA update: {}'.format(str(update))) + # TODO: Update notifications with desired status + return notification_updates + else: + current_app.logger.exception('DVLA response file contained no updates') diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py new file mode 100644 index 000000000..38c9966dd --- /dev/null +++ b/tests/app/aws/test_s3.py @@ -0,0 +1,11 @@ +from app.aws.s3 import get_s3_file + + +def test_get_s3_file_makes_correct_call(sample_service, sample_job, mocker): + get_s3_mock = mocker.patch('app.aws.s3.get_s3_object') + get_s3_file('foo-bucket', 'bar-file.txt') + + get_s3_mock.assert_called_with( + 'foo-bucket', + 'bar-file.txt' + ) diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 8b9631b4f..882ac1fcb 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1071,3 +1071,37 @@ def test_update_dvla_job_to_error(sample_letter_template, sample_letter_job): assert not n.sent_by assert 'error' == Job.query.filter_by(id=sample_letter_job.id).one().job_status + + +def test_update_letter_notifications_statuses_raises_for_invalid_format(notify_api, mocker): + invalid_file = 'ref-foo|Sent|1|Unsorted\nref-bar|Sent|2' + mocker.patch('app.celery.tasks.s3.get_s3_file', return_value=invalid_file) + + with pytest.raises(TypeError): + update_letter_notifications_statuses(filename='foo.txt') + + +def test_update_letter_notifications_statuses_calls_with_correct_bucket_location(notify_api, mocker): + s3_mock = mocker.patch('app.celery.tasks.s3.get_s3_object') + + with set_config(notify_api, 'NOTIFY_EMAIL_DOMAIN', 'foo.bar'): + update_letter_notifications_statuses(filename='foo.txt') + s3_mock.assert_called_with('{}-ftp'.format(current_app.config['NOTIFY_EMAIL_DOMAIN']), 'foo.txt') + + +def test_update_letter_notifications_statuses_builds_updates_list(notify_api, mocker): + valid_file = 'ref-foo|Sent|1|Unsorted\nref-bar|Sent|2|Sorted' + mocker.patch('app.celery.tasks.s3.get_s3_file', return_value=valid_file) + updates = update_letter_notifications_statuses(filename='foo.txt') + + assert len(updates) == 2 + + assert updates[0].reference == 'ref-foo' + assert updates[0].status == 'Sent' + assert updates[0].page_count == '1' + assert updates[0].cost_threshold == 'Unsorted' + + assert updates[1].reference == 'ref-bar' + assert updates[1].status == 'Sent' + assert updates[1].page_count == '2' + assert updates[1].cost_threshold == 'Sorted' From 4003edfa67ae7ba73fcabc684c664f04642ad5af Mon Sep 17 00:00:00 2001 From: Imdad Ahad Date: Mon, 15 May 2017 11:12:31 +0100 Subject: [PATCH 2/2] Add DVLA callback: * Process SNS callback, trigger the update notifications celery task * Put autoconfirm into its own method and use in callbacks --- .../notifications_letter_callback.py | 66 ++++++++----- .../notifications_ses_callback.py | 17 ++-- app/notifications/utils.py | 7 ++ tests/app/celery/test_tasks.py | 9 +- .../app/notifications/rest/test_callbacks.py | 97 ++++++++++++++++++- 5 files changed, 157 insertions(+), 39 deletions(-) diff --git a/app/notifications/notifications_letter_callback.py b/app/notifications/notifications_letter_callback.py index f7c357055..2f758b7ac 100644 --- a/app/notifications/notifications_letter_callback.py +++ b/app/notifications/notifications_letter_callback.py @@ -1,39 +1,57 @@ -from datetime import datetime +from functools import wraps from flask import ( Blueprint, jsonify, request, - current_app, - json + current_app ) -from app import statsd_client -from app.clients.email.aws_ses import get_aws_responses -from app.dao import ( - notifications_dao -) +from app.celery.tasks import update_letter_notifications_statuses +from app.v2.errors import register_errors +from app.notifications.utils import autoconfirm_subscription +from app.schema_validation import validate -from app.notifications.process_client_response import validate_callback_data letter_callback_blueprint = Blueprint('notifications_letter_callback', __name__) - -from app.errors import ( - register_errors, - InvalidRequest -) - register_errors(letter_callback_blueprint) +dvla_sns_callback_schema = { + "$schema": "http://json-schema.org/draft-04/schema#", + "description": "sns callback received on s3 update", + "type": "object", + "title": "dvla internal sns callback", + "properties": { + "Type": {"enum": ["Notification", "SubscriptionConfirmation"]}, + "MessageId": {"type": "string"}, + "Message": {"type": ["string", "object"]} + }, + "required": ["Type", "MessageId", "Message"] +} + + +def validate_schema(schema): + def decorator(f): + @wraps(f) + def wrapper(*args, **kw): + validate(request.json, schema) + return f(*args, **kw) + return wrapper + return decorator + + @letter_callback_blueprint.route('/notifications/letter/dvla', methods=['POST']) +@validate_schema(dvla_sns_callback_schema) def process_letter_response(): - try: - dvla_request = json.loads(request.data) - current_app.logger.info(dvla_request) - return jsonify( - result="success", message="DVLA callback succeeded" - ), 200 - except ValueError: - error = "DVLA callback failed: invalid json" - raise InvalidRequest(error, status_code=400) + req_json = request.json + if not autoconfirm_subscription(req_json): + # The callback should have one record for an S3 Put Event. + filename = req_json['Message']['Records'][0]['s3']['object']['key'] + current_app.logger.info('Received file from DVLA: {}'.format(filename)) + current_app.logger.info('DVLA callback: Calling task to update letter notifications') + update_letter_notifications_statuses.apply_async([filename], queue='notify') + + return jsonify( + result="success", message="DVLA callback succeeded" + ), 200 diff --git a/app/notifications/notifications_ses_callback.py b/app/notifications/notifications_ses_callback.py index ce8dec9ba..e78fb9394 100644 --- a/app/notifications/notifications_ses_callback.py +++ b/app/notifications/notifications_ses_callback.py @@ -13,9 +13,8 @@ from app.clients.email.aws_ses import get_aws_responses from app.dao import ( notifications_dao ) - from app.notifications.process_client_response import validate_callback_data -from app.notifications.utils import confirm_subscription +from app.notifications.utils import autoconfirm_subscription ses_callback_blueprint = Blueprint('notifications_ses_callback', __name__) @@ -32,14 +31,12 @@ def process_ses_response(): try: ses_request = json.loads(request.data) - if ses_request.get('Type') == 'SubscriptionConfirmation': - current_app.logger.info("SNS subscription confirmation url: {}".format(ses_request['SubscribeURL'])) - subscribed_topic = confirm_subscription(ses_request) - if subscribed_topic: - current_app.logger.info("Automatically subscribed to topic: {}".format(subscribed_topic)) - return jsonify( - result="success", message="SES callback succeeded" - ), 200 + subscribed_topic = autoconfirm_subscription(ses_request) + if subscribed_topic: + current_app.logger.info("Automatically subscribed to topic: {}".format(subscribed_topic)) + return jsonify( + result="success", message="SES callback succeeded" + ), 200 errors = validate_callback_data(data=ses_request, fields=['Message'], client_name=client_name) if errors: diff --git a/app/notifications/utils.py b/app/notifications/utils.py index 80346c475..5f1443f09 100644 --- a/app/notifications/utils.py +++ b/app/notifications/utils.py @@ -16,3 +16,10 @@ def confirm_subscription(confirmation_request): raise e return confirmation_request['TopicArn'] + + +def autoconfirm_subscription(req_json): + if req_json.get('Type') == 'SubscriptionConfirmation': + current_app.logger.info("SNS subscription confirmation url: {}".format(req_json['SubscribeURL'])) + subscribed_topic = confirm_subscription(req_json) + return subscribed_topic diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 882ac1fcb..560f68c27 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -12,15 +12,19 @@ from celery.exceptions import Retry from app import (encryption, DATETIME_FORMAT) from app.celery import provider_tasks from app.celery import tasks -from app.celery.tasks import s3, build_dvla_file, create_dvla_file_contents, update_dvla_job_to_error from app.celery.tasks import ( + s3, + build_dvla_file, + create_dvla_file_contents, + update_dvla_job_to_error, process_job, process_row, send_sms, send_email, persist_letter, get_template_class, - update_job_to_sent_to_dvla + update_job_to_sent_to_dvla, + update_letter_notifications_statuses ) from app.dao import jobs_dao, services_dao from app.models import ( @@ -34,6 +38,7 @@ from app.models import ( Job) from tests.app import load_example_csv +from tests.conftest import set_config from tests.app.conftest import ( sample_service, sample_template, diff --git a/tests/app/notifications/rest/test_callbacks.py b/tests/app/notifications/rest/test_callbacks.py index 780076b34..a15a607df 100644 --- a/tests/app/notifications/rest/test_callbacks.py +++ b/tests/app/notifications/rest/test_callbacks.py @@ -10,17 +10,65 @@ from app.dao.notifications_dao import ( get_notification_by_id ) from app.models import NotificationStatistics +from tests.app.notifications.test_notifications_ses_callback import ses_confirmation_callback from tests.app.conftest import sample_notification as create_sample_notification -def test_dvla_callback_should_not_need_auth(client): - data = json.dumps({"somekey": "somevalue"}) +def test_dvla_callback_returns_400_with_invalid_request(client): + data = json.dumps({"foo": "bar"}) response = client.post( path='/notifications/letter/dvla', data=data, - headers=[('Content-Type', 'application/json')]) + headers=[('Content-Type', 'application/json')] + ) + + assert response.status_code == 400 + + +def test_dvla_callback_autoconfirms_subscription(client, mocker): + autoconfirm_mock = mocker.patch('app.notifications.notifications_letter_callback.autoconfirm_subscription') + + data = ses_confirmation_callback() + response = client.post( + path='/notifications/letter/dvla', + data=data, + headers=[('Content-Type', 'application/json')] + ) assert response.status_code == 200 + assert autoconfirm_mock.called + + +def test_dvla_callback_autoconfirm_does_not_call_update_letter_notifications_task(client, mocker): + autoconfirm_mock = mocker.patch('app.notifications.notifications_letter_callback.autoconfirm_subscription') + update_task = \ + mocker.patch('app.notifications.notifications_letter_callback.update_letter_notifications_statuses.apply_async') + + data = ses_confirmation_callback() + response = client.post( + path='/notifications/letter/dvla', + data=data, + headers=[('Content-Type', 'application/json')] + ) + + assert response.status_code == 200 + assert autoconfirm_mock.called + assert not update_task.called + + +def test_dvla_callback_calls_update_letter_notifications_task(client, mocker): + update_task = \ + mocker.patch('app.notifications.notifications_letter_callback.update_letter_notifications_statuses.apply_async') + data = _sample_sns_s3_callback() + response = client.post( + path='/notifications/letter/dvla', + data=data, + headers=[('Content-Type', 'application/json')] + ) + + assert response.status_code == 200 + assert update_task.called + update_task.assert_called_with(['bar.txt'], queue='notify') def test_firetext_callback_should_not_need_auth(client, mocker): @@ -458,3 +506,46 @@ def test_firetext_callback_should_record_statsd(client, notify_db, notify_db_ses def get_notification_stats(service_id): return NotificationStatistics.query.filter_by(service_id=service_id).one() + + +def _sample_sns_s3_callback(): + return json.dumps({ + "SigningCertURL": "foo.pem", + "UnsubscribeURL": "bar", + "Signature": "some-signature", + "Type": "Notification", + "Timestamp": "2016-05-03T08:35:12.884Z", + "SignatureVersion": "1", + "MessageId": "6adbfe0a-d610-509a-9c47-af894e90d32d", + "Subject": "Amazon S3 Notification", + "TopicArn": "sample-topic-arn", + "Message": { + "Records": [{ + "eventVersion": "2.0", + "eventSource": "aws:s3", + "awsRegion": "eu-west-1", + "eventTime": "2017-05-03T08:35:12.826Z", + "eventName": "ObjectCreated:Put", + "userIdentity": {"principalId": "some-p-id"}, + "requestParameters": {"sourceIPAddress": "8.8.8.8"}, + "responseElements": {"x-amz-request-id": "some-req-id", "x-amz-id-2": "some-amz-id"}, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "some-config-id", + "bucket": { + "name": "some-bucket", + "ownerIdentity": {"principalId": "some-p-id"}, + "arn": "some-bucket-arn" + }, + "object": { + "key": "bar.txt", + "size": 200, + "eTag": "some-etag", + "versionId": "some-v-id", + "sequencer": "some-seq" + } + } + } + ] + } + })