mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-01 07:35:34 -05:00
Merge branch 'master' into ken-add-letters-pdf-s3-bucket
This commit is contained in:
@@ -5,7 +5,7 @@ from requests import request, RequestException, HTTPError
|
||||
|
||||
from app.models import SMS_TYPE
|
||||
from app.config import QueueNames
|
||||
from app.celery.callback_tasks import process_ses_results
|
||||
from app.celery.process_ses_receipts_tasks import process_ses_results
|
||||
|
||||
temp_fail = "7700900003"
|
||||
perm_fail = "7700900002"
|
||||
|
||||
71
app/celery/service_callback_tasks.py
Normal file
71
app/celery/service_callback_tasks.py
Normal file
@@ -0,0 +1,71 @@
|
||||
import json
|
||||
from app import (
|
||||
DATETIME_FORMAT,
|
||||
notify_celery,
|
||||
)
|
||||
from app.dao.notifications_dao import (
|
||||
get_notification_by_id,
|
||||
)
|
||||
|
||||
from app.statsd_decorators import statsd
|
||||
from app.dao.service_callback_api_dao import get_service_callback_api_for_service
|
||||
from requests import (
|
||||
HTTPError,
|
||||
request,
|
||||
RequestException
|
||||
)
|
||||
from flask import current_app
|
||||
from app.config import QueueNames
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300)
|
||||
@statsd(namespace="tasks")
|
||||
def send_delivery_status_to_service(self, notification_id):
|
||||
# TODO: do we need to do rate limit this?
|
||||
notification = get_notification_by_id(notification_id)
|
||||
service_callback_api = get_service_callback_api_for_service(service_id=notification.service_id)
|
||||
if not service_callback_api:
|
||||
# No delivery receipt API info set
|
||||
return
|
||||
|
||||
data = {
|
||||
"id": str(notification_id),
|
||||
"reference": str(notification.client_reference),
|
||||
"to": notification.to,
|
||||
"status": notification.status,
|
||||
"created_at": notification.created_at.strftime(DATETIME_FORMAT), # the time GOV.UK email sent the request
|
||||
"updated_at": notification.updated_at.strftime(DATETIME_FORMAT), # the last time the status was updated
|
||||
"sent_at": notification.sent_at.strftime(DATETIME_FORMAT), # the time the email was sent
|
||||
"notification_type": notification.notification_type
|
||||
}
|
||||
|
||||
try:
|
||||
response = request(
|
||||
method="POST",
|
||||
url=service_callback_api.url,
|
||||
data=json.dumps(data),
|
||||
headers={
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': 'Bearer {}'.format(service_callback_api.bearer_token)
|
||||
},
|
||||
timeout=60
|
||||
)
|
||||
current_app.logger.info('send_delivery_status_to_service sending {} to {}, response {}'.format(
|
||||
notification_id,
|
||||
service_callback_api.url,
|
||||
response.status_code
|
||||
))
|
||||
response.raise_for_status()
|
||||
except RequestException as e:
|
||||
current_app.logger.warning(
|
||||
"send_delivery_status_to_service request failed for service_id: {} and url: {}. exc: {}".format(
|
||||
notification_id,
|
||||
service_callback_api.url,
|
||||
e
|
||||
)
|
||||
)
|
||||
if not isinstance(e, HTTPError) or e.response.status_code >= 500:
|
||||
try:
|
||||
self.retry(queue=QueueNames.RETRY)
|
||||
except self.MaxRetriesExceededError:
|
||||
current_app.logger.exception('Retry: send_delivery_status_to_service has retried the max num of times')
|
||||
@@ -29,6 +29,7 @@ from app import (
|
||||
)
|
||||
from app.aws import s3
|
||||
from app.celery import provider_tasks
|
||||
from app.celery.service_callback_tasks import send_delivery_status_to_service
|
||||
from app.config import QueueNames
|
||||
from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id
|
||||
from app.dao.jobs_dao import (
|
||||
@@ -42,12 +43,14 @@ from app.dao.notifications_dao import (
|
||||
get_notification_by_id,
|
||||
dao_update_notifications_for_job_to_sent_to_dvla,
|
||||
dao_update_notifications_by_reference,
|
||||
dao_get_last_notification_added_for_job_id
|
||||
dao_get_last_notification_added_for_job_id,
|
||||
dao_get_notifications_by_references
|
||||
)
|
||||
from app.dao.provider_details_dao import get_current_provider
|
||||
from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service
|
||||
from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
from app.dao.service_callback_api_dao import get_service_callback_api_for_service
|
||||
from app.models import (
|
||||
DVLA_RESPONSE_STATUS_SENT,
|
||||
EMAIL_TYPE,
|
||||
@@ -391,6 +394,12 @@ def update_letter_notifications_to_error(self, notification_references):
|
||||
)
|
||||
|
||||
current_app.logger.info("Updated {} letter notifications to technical-failure".format(updated_count))
|
||||
notifications = dao_get_notifications_by_references(references=notification_references)
|
||||
# queue callback task only if the service_callback_api exists
|
||||
service_callback_api = get_service_callback_api_for_service(service_id=notifications[0].service_id)
|
||||
if service_callback_api:
|
||||
for notification in notifications:
|
||||
send_delivery_status_to_service.apply_async([str(notification.id)], queue=QueueNames.NOTIFY)
|
||||
|
||||
|
||||
def create_dvla_file_contents_for_job(job_id):
|
||||
@@ -455,7 +464,7 @@ def update_letter_notifications_statuses(self, filename):
|
||||
for update in notification_updates:
|
||||
status = NOTIFICATION_DELIVERED if update.status == DVLA_RESPONSE_STATUS_SENT \
|
||||
else NOTIFICATION_TECHNICAL_FAILURE
|
||||
notification = dao_update_notifications_by_reference(
|
||||
updated_count = dao_update_notifications_by_reference(
|
||||
references=[update.reference],
|
||||
update_dict={"status": status,
|
||||
"billable_units": update.page_count,
|
||||
@@ -463,7 +472,7 @@ def update_letter_notifications_statuses(self, filename):
|
||||
}
|
||||
)
|
||||
|
||||
if not notification:
|
||||
if not updated_count:
|
||||
msg = "Update letter notification file {filename} failed: notification either not found " \
|
||||
"or already updated from delivered. Status {status} for notification reference {reference}".format(
|
||||
filename=filename, status=status, reference=update.reference)
|
||||
@@ -472,6 +481,12 @@ def update_letter_notifications_statuses(self, filename):
|
||||
current_app.logger.info(
|
||||
'DVLA file: {filename}, notification updated to {status}: {reference}'.format(
|
||||
filename=filename, status=status, reference=str(update.reference)))
|
||||
notifications = dao_get_notifications_by_references(references=[update.reference])
|
||||
# queue callback task only if the service_callback_api exists
|
||||
service_callback_api = get_service_callback_api_for_service(service_id=notifications[0].service_id)
|
||||
if service_callback_api:
|
||||
for notification in notifications:
|
||||
send_delivery_status_to_service.apply_async([str(notification.id)], queue=QueueNames.NOTIFY)
|
||||
|
||||
|
||||
def process_updates_from_file(response_file):
|
||||
|
||||
@@ -459,6 +459,13 @@ def dao_get_notifications_by_to_field(service_id, search_term, statuses=None):
|
||||
return results
|
||||
|
||||
|
||||
@statsd(namespace="dao")
|
||||
def dao_get_notifications_by_references(references):
|
||||
return Notification.query.filter(
|
||||
Notification.reference.in_(references)
|
||||
).all()
|
||||
|
||||
|
||||
@statsd(namespace="dao")
|
||||
def dao_created_scheduled_notification(scheduled_notification):
|
||||
db.session.add(scheduled_notification)
|
||||
|
||||
@@ -172,6 +172,7 @@ def dao_create_service(service, user, service_id=None, service_permissions=None)
|
||||
service.id = service_id or uuid.uuid4() # must be set now so version history model can use same id
|
||||
service.active = True
|
||||
service.research_mode = False
|
||||
service.crown = service.organisation_type == 'central'
|
||||
|
||||
for permission in service_permissions:
|
||||
service_permission = ServicePermission(service_id=service.id, permission=permission)
|
||||
|
||||
@@ -249,6 +249,7 @@ class Service(db.Model, Versioned):
|
||||
db.String(255),
|
||||
nullable=True,
|
||||
)
|
||||
crown = db.Column(db.Boolean, index=False, nullable=False, default=True)
|
||||
|
||||
association_proxy('permissions', 'service_permission_types')
|
||||
|
||||
|
||||
@@ -10,8 +10,11 @@ from app.clients.email.aws_ses import get_aws_responses
|
||||
from app.dao import (
|
||||
notifications_dao
|
||||
)
|
||||
from app.dao.service_callback_api_dao import get_service_callback_api_for_service
|
||||
from app.celery.statistics_tasks import create_outcome_notification_statistic_tasks
|
||||
from app.notifications.process_client_response import validate_callback_data
|
||||
from app.celery.service_callback_tasks import send_delivery_status_to_service
|
||||
from app.config import QueueNames
|
||||
|
||||
|
||||
def process_ses_response(ses_request):
|
||||
@@ -75,7 +78,7 @@ def process_ses_response(ses_request):
|
||||
)
|
||||
|
||||
create_outcome_notification_statistic_tasks(notification)
|
||||
|
||||
_check_and_queue_callback_task(notification.id, notification.service_id)
|
||||
return
|
||||
|
||||
except KeyError:
|
||||
@@ -90,3 +93,10 @@ def process_ses_response(ses_request):
|
||||
def remove_emails_from_bounce(bounce_dict):
|
||||
for recip in bounce_dict['bouncedRecipients']:
|
||||
recip.pop('emailAddress')
|
||||
|
||||
|
||||
def _check_and_queue_callback_task(notification_id, service_id):
|
||||
# queue callback task only if the service_callback_api exists
|
||||
service_callback_api = get_service_callback_api_for_service(service_id=service_id)
|
||||
if service_callback_api:
|
||||
send_delivery_status_to_service.apply_async([str(notification_id)], queue=QueueNames.NOTIFY)
|
||||
|
||||
@@ -8,6 +8,9 @@ from app.dao import notifications_dao
|
||||
from app.clients.sms.firetext import get_firetext_responses
|
||||
from app.clients.sms.mmg import get_mmg_responses
|
||||
from app.celery.statistics_tasks import create_outcome_notification_statistic_tasks
|
||||
from app.celery.service_callback_tasks import send_delivery_status_to_service
|
||||
from app.config import QueueNames
|
||||
from app.dao.service_callback_api_dao import get_service_callback_api_for_service
|
||||
|
||||
|
||||
sms_response_mapper = {
|
||||
@@ -82,6 +85,11 @@ def process_sms_client_response(status, reference, client_name):
|
||||
)
|
||||
|
||||
create_outcome_notification_statistic_tasks(notification)
|
||||
# queue callback task only if the service_callback_api exists
|
||||
service_callback_api = get_service_callback_api_for_service(service_id=notification.service_id)
|
||||
|
||||
if service_callback_api:
|
||||
send_delivery_status_to_service.apply_async([str(notification.id)], queue=QueueNames.NOTIFY)
|
||||
|
||||
success = "{} callback succeeded. reference {} updated".format(client_name, reference)
|
||||
return success, errors
|
||||
|
||||
@@ -178,7 +178,11 @@ def update_service(service_id):
|
||||
service_going_live = fetched_service.restricted and not req_json.get('restricted', True)
|
||||
current_data = dict(service_schema.dump(fetched_service).data.items())
|
||||
current_data.update(request.get_json())
|
||||
|
||||
update_dict = service_schema.load(current_data).data
|
||||
org_type = req_json.get('organisation_type', None)
|
||||
if org_type:
|
||||
update_dict.crown = org_type == 'central'
|
||||
dao_update_service(update_dict)
|
||||
|
||||
# bridging code between frontend is deployed and data has not been migrated yet. Can only update current year
|
||||
|
||||
Reference in New Issue
Block a user