Merge pull request #1755 from alphagov/replay-service-status-updates

Refactor service callback task
This commit is contained in:
Rebecca Law
2018-03-09 12:06:38 +00:00
committed by GitHub
3 changed files with 259 additions and 66 deletions

View File

@@ -6,6 +6,7 @@ from app import (
db,
DATETIME_FORMAT,
notify_celery,
encryption
)
from app.dao.notifications_dao import (
get_notification_by_id,
@@ -23,10 +24,63 @@ from app.config import QueueNames
@notify_celery.task(bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300)
@statsd(namespace="tasks")
def send_delivery_status_to_service(self, notification_id):
def send_delivery_status_to_service(self, notification_id,
encrypted_status_update=None
):
if not encrypted_status_update:
process_update_with_notification_id(self, notification_id=notification_id)
else:
try:
status_update = encryption.decrypt(encrypted_status_update)
data = {
"id": str(notification_id),
"reference": status_update['notification_client_reference'],
"to": status_update['notification_to'],
"status": status_update['notification_status'],
"created_at": status_update['notification_created_at'],
"completed_at": status_update['notification_updated_at'],
"sent_at": status_update['notification_sent_at'],
"notification_type": status_update['notification_type']
}
response = request(
method="POST",
url=status_update['service_callback_api_url'],
data=json.dumps(data),
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(status_update['service_callback_api_bearer_token'])
},
timeout=60
)
current_app.logger.info('send_delivery_status_to_service sending {} to {}, response {}'.format(
notification_id,
status_update['service_callback_api_url'],
response.status_code
))
response.raise_for_status()
except RequestException as e:
current_app.logger.warning(
"send_delivery_status_to_service request failed for service_id: {} and url: {}. exc: {}".format(
notification_id,
status_update['service_callback_api_url'],
e
)
)
if not isinstance(e, HTTPError) or e.response.status_code >= 500:
try:
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
current_app.logger.exception(
"""Retry: send_delivery_status_to_service has retried the max num of times
for notification: {}""".format(notification_id)
)
def process_update_with_notification_id(self, notification_id):
retry = False
try:
# TODO: do we need to do rate limit this?
notification = get_notification_by_id(notification_id)
service_callback_api = get_service_callback_api_for_service(service_id=notification.service_id)
if not service_callback_api:
@@ -41,9 +95,9 @@ def send_delivery_status_to_service(self, notification_id):
"reference": str(notification.client_reference),
"to": notification.to,
"status": notification.status,
"created_at": notification.created_at.strftime(DATETIME_FORMAT), # the time service sent the request
"completed_at": notification.updated_at.strftime(DATETIME_FORMAT), # the last time the status was updated
"sent_at": notification.sent_at.strftime(DATETIME_FORMAT), # the time the email was sent
"created_at": notification.created_at.strftime(DATETIME_FORMAT),
"completed_at": notification.updated_at.strftime(DATETIME_FORMAT),
"sent_at": notification.sent_at.strftime(DATETIME_FORMAT),
"notification_type": notification.notification_type
}
@@ -83,4 +137,7 @@ def send_delivery_status_to_service(self, notification_id):
try:
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
current_app.logger.exception('Retry: send_delivery_status_to_service has retried the max num of times')
current_app.logger.exception(
"""Retry: send_delivery_status_to_service has retried the max num of times
for notification: {}""".format(notification_id)
)

View File

@@ -1,29 +1,33 @@
import functools
import uuid
from datetime import datetime, timedelta
from decimal import Decimal
import functools
import flask
from flask import current_app
import click
import flask
from click_datetime import Datetime as click_dt
from flask import current_app
from sqlalchemy.orm.exc import NoResultFound
from app import db
from app import db, DATETIME_FORMAT, encryption
from app.celery.scheduled_tasks import send_total_sent_notifications_to_performance_platform
from app.celery.service_callback_tasks import send_delivery_status_to_service
from app.config import QueueNames
from app.dao.monthly_billing_dao import (
create_or_update_monthly_billing,
get_monthly_billing_by_notification_type,
get_service_ids_that_need_billing_populated
)
from app.models import PROVIDERS, User, SMS_TYPE, EMAIL_TYPE
from app.dao.provider_rates_dao import create_provider_rates as dao_create_provider_rates
from app.dao.service_callback_api_dao import get_service_callback_api_for_service
from app.dao.services_dao import (
delete_service_and_all_associated_db_objects,
dao_fetch_all_services_by_user
)
from app.dao.provider_rates_dao import create_provider_rates as dao_create_provider_rates
from app.dao.users_dao import (delete_model_user, delete_user_verify_codes)
from app.utils import get_midnight_for_day_before, get_london_midnight_in_utc
from app.models import PROVIDERS, User, SMS_TYPE, EMAIL_TYPE, Notification
from app.performance_platform.processing_time import (send_processing_time_for_start_and_end)
from app.celery.scheduled_tasks import send_total_sent_notifications_to_performance_platform
from app.utils import get_midnight_for_day_before, get_london_midnight_in_utc
@click.group(name='command', help='Additional commands')
@@ -311,5 +315,55 @@ def insert_inbound_numbers_from_file(file_name):
file.close()
@notify_command(name='replay-service-callbacks')
@click.option('-f', '--file_name', required=True,
help="""Full path of the file to upload, file is a contains client references of
notifications that need the status to be sent to the service.""")
@click.option('-s', '--service_id', required=True,
help="""The service that the callbacks are for""")
def replay_service_callbacks(file_name, service_id):
print("Start send service callbacks for service: ", service_id)
callback_api = get_service_callback_api_for_service(service_id=service_id)
if not callback_api:
print("Callback api was not found for service: {}".format(service_id))
return
errors = []
notifications = []
file = open(file_name)
for ref in file:
try:
notification = Notification.query.filter_by(client_reference=ref.strip()).one()
notifications.append(notification)
except NoResultFound as e:
errors.append("Reference: {} was not found in notifications.".format(ref))
for e in errors:
print(e)
if errors:
raise Exception("Some notifications for the given references were not found")
for n in notifications:
data = {
"notification_id": str(n.id),
"notification_client_reference": n.client_reference,
"notification_to": n.to,
"notification_status": n.status,
"notification_created_at": n.created_at.strftime(DATETIME_FORMAT),
"notification_updated_at": n.updated_at.strftime(DATETIME_FORMAT),
"notification_sent_at": n.sent_at.strftime(DATETIME_FORMAT),
"notification_type": n.notification_type,
"service_callback_api_url": callback_api.url,
"service_callback_api_bearer_token": callback_api.bearer_token,
}
encrypted_status_update = encryption.encrypt(data)
send_delivery_status_to_service.apply_async([str(n.id), encrypted_status_update],
queue=QueueNames.CALLBACKS)
print("Replay service status for service: {}. Sent {} notification status updates to the queue".format(
service_id, len(notifications)))
def setup_commands(application):
application.cli.add_command(command_group)