Files
notifications-api/app/notifications/notifications_letter_callback.py
Katie Smith b440f3f904 Use Draft-07 and Draft7Validator everywhere
We were using the Draft4Validator in one place, so this updates it to
the Draft7Validator instead.

The schemas were mostly using draft 4 of the JSON schema, though there
were a couple of schemas that were already of version 7. This updates
them all to version 7, which is the latest version fully supported by
the jsonschema Python package. There are some breaking changes in the
newer version of the schema, but I could not see anywhere would these
affect us. Some of these schemas were not valid in version 4, but are
now valid in version 7 because `"required": []` was not valid in earlier
versions.
2022-04-14 14:46:10 +01:00

62 lines
2.2 KiB
Python

import json
from functools import wraps
from flask import Blueprint, current_app, jsonify, request
from app.celery.tasks import (
record_daily_sorted_counts,
update_letter_notifications_statuses,
)
from app.config import QueueNames
from app.notifications.utils import autoconfirm_subscription
from app.schema_validation import validate
from app.v2.errors import register_errors
letter_callback_blueprint = Blueprint('notifications_letter_callback', __name__)
register_errors(letter_callback_blueprint)
dvla_sns_callback_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "sns callback received on s3 update",
"type": "object",
"title": "dvla internal sns callback",
"properties": {
"Type": {"enum": ["Notification", "SubscriptionConfirmation"]},
"MessageId": {"type": "string"},
"Message": {"type": ["string", "object"]}
},
"required": ["Type", "MessageId", "Message"]
}
def validate_schema(schema):
def decorator(f):
@wraps(f)
def wrapper(*args, **kw):
validate(request.get_json(force=True), schema)
return f(*args, **kw)
return wrapper
return decorator
@letter_callback_blueprint.route('/notifications/letter/dvla', methods=['POST'])
@validate_schema(dvla_sns_callback_schema)
def process_letter_response():
req_json = request.get_json(force=True)
current_app.logger.debug('Received SNS callback: {}'.format(req_json))
if not autoconfirm_subscription(req_json):
# The callback should have one record for an S3 Put Event.
message = json.loads(req_json['Message'])
filename = message['Records'][0]['s3']['object']['key']
current_app.logger.info('Received file from DVLA: {}'.format(filename))
if filename.lower().endswith('rs.txt') or filename.lower().endswith('rsp.txt'):
current_app.logger.info('DVLA callback: Calling task to update letter notifications')
update_letter_notifications_statuses.apply_async([filename], queue=QueueNames.NOTIFY)
record_daily_sorted_counts.apply_async([filename], queue=QueueNames.NOTIFY)
return jsonify(
result="success", message="DVLA callback succeeded"
), 200