mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-20 15:31:15 -05:00
Revert "Process SNS request triggered by a DVLA S3 update"
This commit is contained in:
@@ -4,24 +4,20 @@ from flask import current_app
|
||||
FILE_LOCATION_STRUCTURE = 'service-{}-notify/{}.csv'
|
||||
|
||||
|
||||
def get_s3_object(bucket_name, file_location):
|
||||
def get_s3_job_object(bucket_name, file_location):
|
||||
s3 = resource('s3')
|
||||
s3_object = s3.Object(bucket_name, file_location)
|
||||
return s3_object.get()['Body'].read()
|
||||
return s3.Object(bucket_name, file_location)
|
||||
|
||||
|
||||
def get_job_from_s3(service_id, job_id):
|
||||
job = _job_from_s3(service_id, job_id)
|
||||
return job
|
||||
bucket_name = current_app.config['CSV_UPLOAD_BUCKET_NAME']
|
||||
file_location = FILE_LOCATION_STRUCTURE.format(service_id, job_id)
|
||||
obj = get_s3_job_object(bucket_name, file_location)
|
||||
return obj.get()['Body'].read().decode('utf-8')
|
||||
|
||||
|
||||
def remove_job_from_s3(service_id, job_id):
|
||||
job = _job_from_s3(service_id, job_id)
|
||||
return job.delete()
|
||||
|
||||
|
||||
def _job_from_s3():
|
||||
bucket_name = current_app.config['CSV_UPLOAD_BUCKET_NAME']
|
||||
file_location = FILE_LOCATION_STRUCTURE.format(service_id, job_id)
|
||||
obj = get_s3_object(bucket_name, file_location).decode('utf-8')
|
||||
return obj
|
||||
obj = get_s3_job_object(bucket_name, file_location)
|
||||
return obj.delete()
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import random
|
||||
|
||||
from datetime import (datetime)
|
||||
from collections import namedtuple
|
||||
|
||||
from flask import current_app
|
||||
from notifications_utils.recipients import (
|
||||
@@ -24,10 +23,7 @@ from app.dao.jobs_dao import (
|
||||
all_notifications_are_created_for_job,
|
||||
dao_get_all_notifications_for_job,
|
||||
dao_update_job_status)
|
||||
from app.dao.notifications_dao import (
|
||||
get_notification_by_id,
|
||||
dao_update_notifications_sent_to_dvla
|
||||
)
|
||||
from app.dao.notifications_dao import get_notification_by_id, dao_update_notifications_sent_to_dvla
|
||||
from app.dao.provider_details_dao import get_current_provider
|
||||
from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
@@ -358,27 +354,3 @@ def get_template_class(template_type):
|
||||
# since we don't need rendering capabilities (we only need to extract placeholders) both email and letter can
|
||||
# use the same base template
|
||||
return WithSubjectTemplate
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name='update-letter-notifications-statuses')
|
||||
@statsd(namespace="tasks")
|
||||
def update_letter_notifications_statuses(self, filename):
|
||||
bucket_location = '{}-ftp'.format(current_app.config['NOTIFY_EMAIL_DOMAIN'])
|
||||
response_file = s3.get_s3_object(bucket_location, filename).decode('utf-8')
|
||||
|
||||
try:
|
||||
NotificationUpdate = namedtuple('NotificationUpdate', ['reference', 'status', 'page_count', 'cost_threshold'])
|
||||
notification_updates = [NotificationUpdate(*line.split('|')) for line in response_file.splitlines()]
|
||||
|
||||
except TypeError:
|
||||
current_app.logger.exception('DVLA response file: {} has an invalid format'.format(filename))
|
||||
raise
|
||||
|
||||
else:
|
||||
if notification_updates:
|
||||
for update in notification_updates:
|
||||
current_app.logger.info('DVLA update: {}'.format(str(update)))
|
||||
# TODO: Update notifications with desired status
|
||||
return notification_updates
|
||||
else:
|
||||
current_app.logger.exception('DVLA response file contained no updates')
|
||||
|
||||
@@ -27,8 +27,7 @@ from app.models import (
|
||||
NOTIFICATION_PERMANENT_FAILURE,
|
||||
KEY_TYPE_NORMAL, KEY_TYPE_TEST,
|
||||
LETTER_TYPE,
|
||||
NOTIFICATION_SENT
|
||||
)
|
||||
NOTIFICATION_SENT)
|
||||
|
||||
from app.dao.dao_utils import transactional
|
||||
from app.statsd_decorators import statsd
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
from datetime import datetime
|
||||
from functools import wraps
|
||||
|
||||
from flask import (
|
||||
Blueprint,
|
||||
@@ -10,54 +9,31 @@ from flask import (
|
||||
)
|
||||
|
||||
from app import statsd_client
|
||||
from app.celery.tasks import update_letter_notifications_statuses
|
||||
from app.clients.email.aws_ses import get_aws_responses
|
||||
from app.dao import notifications_dao
|
||||
from app.v2.errors import register_errors
|
||||
from app.notifications.process_client_response import validate_callback_data
|
||||
from app.notifications.utils import autoconfirm_subscription
|
||||
from app.schema_validation import validate
|
||||
from app.dao import (
|
||||
notifications_dao
|
||||
)
|
||||
|
||||
from app.notifications.process_client_response import validate_callback_data
|
||||
|
||||
letter_callback_blueprint = Blueprint('notifications_letter_callback', __name__)
|
||||
|
||||
from app.errors import (
|
||||
register_errors,
|
||||
InvalidRequest
|
||||
)
|
||||
|
||||
register_errors(letter_callback_blueprint)
|
||||
|
||||
|
||||
dvla_sns_callback_schema = {
|
||||
"$schema": "http://json-schema.org/draft-04/schema#",
|
||||
"description": "sns callback received on s3 update",
|
||||
"type": "object",
|
||||
"title": "dvla internal sns callback",
|
||||
"properties": {
|
||||
"Type": {"enum": ["Notification", "SubscriptionConfirmation"]},
|
||||
"MessageId": {"type": "string"},
|
||||
"Message": {"type": ["string", "object"]}
|
||||
},
|
||||
"required": ["Type", "MessageId", "Message"]
|
||||
}
|
||||
|
||||
|
||||
def validate_schema(schema):
|
||||
def decorator(f):
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kw):
|
||||
validate(request.json, schema)
|
||||
return f(*args, **kw)
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
@letter_callback_blueprint.route('/notifications/letter/dvla', methods=['POST'])
|
||||
@validate_schema(dvla_sns_callback_schema)
|
||||
def process_letter_response():
|
||||
req_json = request.json
|
||||
if not autoconfirm_subscription(req_json):
|
||||
# The callback should have one record for an S3 Put Event.
|
||||
filename = req_json['Message']['Records'][0]['s3']['object']['key']
|
||||
current_app.logger.info('Received file from DVLA: {}'.format(filename))
|
||||
current_app.logger.info('DVLA callback: Calling task to update letter notifications')
|
||||
update_letter_notifications_statuses.apply_async([filename], queue='notify')
|
||||
|
||||
try:
|
||||
dvla_request = json.loads(request.data)
|
||||
current_app.logger.info(dvla_request)
|
||||
return jsonify(
|
||||
result="success", message="DVLA callback succeeded"
|
||||
), 200
|
||||
except ValueError:
|
||||
error = "DVLA callback failed: invalid json"
|
||||
raise InvalidRequest(error, status_code=400)
|
||||
|
||||
@@ -13,8 +13,9 @@ from app.clients.email.aws_ses import get_aws_responses
|
||||
from app.dao import (
|
||||
notifications_dao
|
||||
)
|
||||
|
||||
from app.notifications.process_client_response import validate_callback_data
|
||||
from app.notifications.utils import autoconfirm_subscription
|
||||
from app.notifications.utils import confirm_subscription
|
||||
|
||||
ses_callback_blueprint = Blueprint('notifications_ses_callback', __name__)
|
||||
|
||||
@@ -31,7 +32,9 @@ def process_ses_response():
|
||||
try:
|
||||
ses_request = json.loads(request.data)
|
||||
|
||||
subscribed_topic = autoconfirm_subscription(ses_request)
|
||||
if ses_request.get('Type') == 'SubscriptionConfirmation':
|
||||
current_app.logger.info("SNS subscription confirmation url: {}".format(ses_request['SubscribeURL']))
|
||||
subscribed_topic = confirm_subscription(ses_request)
|
||||
if subscribed_topic:
|
||||
current_app.logger.info("Automatically subscribed to topic: {}".format(subscribed_topic))
|
||||
return jsonify(
|
||||
|
||||
@@ -16,10 +16,3 @@ def confirm_subscription(confirmation_request):
|
||||
raise e
|
||||
|
||||
return confirmation_request['TopicArn']
|
||||
|
||||
|
||||
def autoconfirm_subscription(req_json):
|
||||
if req_json.get('Type') == 'SubscriptionConfirmation':
|
||||
current_app.logger.info("SNS subscription confirmation url: {}".format(req_json['SubscribeURL']))
|
||||
subscribed_topic = confirm_subscription(req_json)
|
||||
return subscribed_topic
|
||||
|
||||
@@ -12,19 +12,15 @@ from celery.exceptions import Retry
|
||||
from app import (encryption, DATETIME_FORMAT)
|
||||
from app.celery import provider_tasks
|
||||
from app.celery import tasks
|
||||
from app.celery.tasks import s3, build_dvla_file, create_dvla_file_contents, update_dvla_job_to_error
|
||||
from app.celery.tasks import (
|
||||
s3,
|
||||
build_dvla_file,
|
||||
create_dvla_file_contents,
|
||||
update_dvla_job_to_error,
|
||||
process_job,
|
||||
process_row,
|
||||
send_sms,
|
||||
send_email,
|
||||
persist_letter,
|
||||
get_template_class,
|
||||
update_job_to_sent_to_dvla,
|
||||
update_letter_notifications_statuses
|
||||
update_job_to_sent_to_dvla
|
||||
)
|
||||
from app.dao import jobs_dao, services_dao
|
||||
from app.models import (
|
||||
@@ -38,7 +34,6 @@ from app.models import (
|
||||
Job)
|
||||
|
||||
from tests.app import load_example_csv
|
||||
from tests.conftest import set_config
|
||||
from tests.app.conftest import (
|
||||
sample_service,
|
||||
sample_template,
|
||||
@@ -1076,37 +1071,3 @@ def test_update_dvla_job_to_error(sample_letter_template, sample_letter_job):
|
||||
assert not n.sent_by
|
||||
|
||||
assert 'error' == Job.query.filter_by(id=sample_letter_job.id).one().job_status
|
||||
|
||||
|
||||
def test_update_letter_notifications_statuses_raises_for_invalid_format(notify_api, mocker):
|
||||
invalid_file = b'ref-foo|Sent|1|Unsorted\nref-bar|Sent|2'
|
||||
mocker.patch('app.celery.tasks.s3.get_s3_object', return_value=invalid_file)
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
update_letter_notifications_statuses(filename='foo.txt')
|
||||
|
||||
|
||||
def test_update_letter_notifications_statuses_calls_with_correct_bucket_location(notify_api, mocker):
|
||||
s3_mock = mocker.patch('app.celery.tasks.s3.get_s3_object')
|
||||
|
||||
with set_config(notify_api, 'NOTIFY_EMAIL_DOMAIN', 'foo.bar'):
|
||||
update_letter_notifications_statuses(filename='foo.txt')
|
||||
s3_mock.assert_called_with('{}-ftp'.format(current_app.config['NOTIFY_EMAIL_DOMAIN']), 'foo.txt')
|
||||
|
||||
|
||||
def test_update_letter_notifications_statuses_builds_updates_list(notify_api, mocker):
|
||||
valid_file = b'ref-foo|Sent|1|Unsorted\nref-bar|Sent|2|Sorted'
|
||||
mocker.patch('app.celery.tasks.s3.get_s3_object', return_value=valid_file)
|
||||
updates = update_letter_notifications_statuses(filename='foo.txt')
|
||||
|
||||
assert len(updates) == 2
|
||||
|
||||
assert updates[0].reference == 'ref-foo'
|
||||
assert updates[0].status == 'Sent'
|
||||
assert updates[0].page_count == '1'
|
||||
assert updates[0].cost_threshold == 'Unsorted'
|
||||
|
||||
assert updates[1].reference == 'ref-bar'
|
||||
assert updates[1].status == 'Sent'
|
||||
assert updates[1].page_count == '2'
|
||||
assert updates[1].cost_threshold == 'Sorted'
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import pytest
|
||||
import uuid
|
||||
|
||||
from datetime import datetime
|
||||
@@ -7,73 +6,21 @@ from flask import json
|
||||
from freezegun import freeze_time
|
||||
|
||||
import app.celery.tasks
|
||||
from app.errors import InvalidRequest
|
||||
from app.dao.notifications_dao import (
|
||||
get_notification_by_id
|
||||
)
|
||||
from app.models import NotificationStatistics
|
||||
from tests.app.notifications.test_notifications_ses_callback import ses_confirmation_callback
|
||||
from tests.app.conftest import sample_notification as create_sample_notification
|
||||
|
||||
|
||||
def test_dvla_callback_returns_400_with_invalid_request(client):
|
||||
data = json.dumps({"foo": "bar"})
|
||||
def test_dvla_callback_should_not_need_auth(client):
|
||||
data = json.dumps({"somekey": "somevalue"})
|
||||
response = client.post(
|
||||
path='/notifications/letter/dvla',
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json')]
|
||||
)
|
||||
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
def test_dvla_callback_autoconfirms_subscription(client, mocker):
|
||||
autoconfirm_mock = mocker.patch('app.notifications.notifications_letter_callback.autoconfirm_subscription')
|
||||
|
||||
data = ses_confirmation_callback()
|
||||
response = client.post(
|
||||
path='/notifications/letter/dvla',
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json')]
|
||||
)
|
||||
headers=[('Content-Type', 'application/json')])
|
||||
|
||||
assert response.status_code == 200
|
||||
assert autoconfirm_mock.called
|
||||
|
||||
|
||||
def test_dvla_callback_autoconfirm_does_not_call_update_letter_notifications_task(client, mocker):
|
||||
autoconfirm_mock = mocker.patch('app.notifications.notifications_letter_callback.autoconfirm_subscription')
|
||||
update_task = \
|
||||
mocker.patch('app.notifications.notifications_letter_callback.update_letter_notifications_statuses.apply_async')
|
||||
|
||||
data = ses_confirmation_callback()
|
||||
response = client.post(
|
||||
path='/notifications/letter/dvla',
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json')]
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert autoconfirm_mock.called
|
||||
assert not update_task.called
|
||||
|
||||
|
||||
def test_dvla_callback_calls_update_letter_notifications_task(client, mocker):
|
||||
update_task = \
|
||||
mocker.patch('app.notifications.notifications_letter_callback.update_letter_notifications_statuses.apply_async')
|
||||
data = _sample_sns_s3_callback()
|
||||
response = client.post(
|
||||
path='/notifications/letter/dvla',
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json')]
|
||||
)
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
|
||||
assert response.status_code == 200
|
||||
assert update_task.called
|
||||
update_task.assert_called_with(['bar.txt'], queue='notify')
|
||||
|
||||
|
||||
def test_firetext_callback_should_not_need_auth(client, mocker):
|
||||
@@ -511,46 +458,3 @@ def test_firetext_callback_should_record_statsd(client, notify_db, notify_db_ses
|
||||
|
||||
def get_notification_stats(service_id):
|
||||
return NotificationStatistics.query.filter_by(service_id=service_id).one()
|
||||
|
||||
|
||||
def _sample_sns_s3_callback():
|
||||
return json.dumps({
|
||||
"SigningCertURL": "foo.pem",
|
||||
"UnsubscribeURL": "bar",
|
||||
"Signature": "some-signature",
|
||||
"Type": "Notification",
|
||||
"Timestamp": "2016-05-03T08:35:12.884Z",
|
||||
"SignatureVersion": "1",
|
||||
"MessageId": "6adbfe0a-d610-509a-9c47-af894e90d32d",
|
||||
"Subject": "Amazon S3 Notification",
|
||||
"TopicArn": "sample-topic-arn",
|
||||
"Message": {
|
||||
"Records": [{
|
||||
"eventVersion": "2.0",
|
||||
"eventSource": "aws:s3",
|
||||
"awsRegion": "eu-west-1",
|
||||
"eventTime": "2017-05-03T08:35:12.826Z",
|
||||
"eventName": "ObjectCreated:Put",
|
||||
"userIdentity": {"principalId": "some-p-id"},
|
||||
"requestParameters": {"sourceIPAddress": "8.8.8.8"},
|
||||
"responseElements": {"x-amz-request-id": "some-req-id", "x-amz-id-2": "some-amz-id"},
|
||||
"s3": {
|
||||
"s3SchemaVersion": "1.0",
|
||||
"configurationId": "some-config-id",
|
||||
"bucket": {
|
||||
"name": "some-bucket",
|
||||
"ownerIdentity": {"principalId": "some-p-id"},
|
||||
"arn": "some-bucket-arn"
|
||||
},
|
||||
"object": {
|
||||
"key": "bar.txt",
|
||||
"size": 200,
|
||||
"eTag": "some-etag",
|
||||
"versionId": "some-v-id",
|
||||
"sequencer": "some-seq"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user