diff --git a/app/celery/provider_tasks.py b/app/celery/provider_tasks.py index 47e4c0bd8..743d37b72 100644 --- a/app/celery/provider_tasks.py +++ b/app/celery/provider_tasks.py @@ -91,6 +91,13 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at): bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300 ) def deliver_sms(self, notification_id): + ''' + This logic will branch off to the final step in delivering + the notification to sns. + Logic is in place for delivery receipts. + Additional logic to help devs output authentication code to + terminal. + ''' try: current_app.logger.info( "Start sending SMS for notification id: {}".format(notification_id) @@ -108,7 +115,7 @@ def deliver_sms(self, notification_id): current_app.logger.warning( ansi_green + f"AUTHENTICATION CODE: {notification.content}" + ansi_reset ) - + # Code branches off to send_to_providers.py message_id = send_to_providers.send_sms_to_provider(notification) # We have to put it in UTC. For other timezones, the delay # will be ignored and it will fire immediately (although this probably only affects developer testing) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index b7cb43c6d..f2bcec884 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -31,6 +31,10 @@ from app.v2.errors import TotalRequestsError @notify_celery.task(name="process-job") def process_job(job_id, sender_id=None): + ''' + We update job status, get the csv data from s3 and begin processing + csv rows. + ''' start = datetime.utcnow() job = dao_get_job_by_id(job_id) current_app.logger.info( @@ -74,6 +78,7 @@ def process_job(job_id, sender_id=None): for row in recipient_csv.get_rows(): process_row(row, template, job, service, sender_id=sender_id) + # End point/Exit point for message send flow. job_complete(job, start=start) @@ -109,6 +114,10 @@ def get_recipient_csv_and_template_and_sender_id(job): def process_row(row, template, job, service, sender_id=None): + ''' + The process will branch off based on notification type, + sms or email. + ''' template_type = template.template_type encrypted = encryption.encrypt( { @@ -121,6 +130,8 @@ def process_row(row, template, job, service, sender_id=None): } ) + # Both save_sms and save_email have the same general + # persist logic. send_fns = {NotificationType.SMS: save_sms, NotificationType.EMAIL: save_email} send_fn = send_fns[template_type] @@ -130,6 +141,7 @@ def process_row(row, template, job, service, sender_id=None): task_kwargs["sender_id"] = sender_id notification_id = create_uuid() + # Kick-off persisting notification in save_sms/save_email. send_fn.apply_async( ( str(service.id), @@ -163,7 +175,14 @@ def __total_sending_limits_for_job_exceeded(service, job, job_id): @notify_celery.task(bind=True, name="save-sms", max_retries=5, default_retry_delay=300) def save_sms(self, service_id, notification_id, encrypted_notification, sender_id=None): + ''' + We are persisting the notification to the db here and + placing notification in queue to send to sns. + ''' notification = encryption.decrypt(encrypted_notification) + # SerialisedService and SerialisedTemplate classes are + # used here to grab the same service and template from the cache + # to improve performance. service = SerialisedService.from_id(service_id) template = SerialisedTemplate.from_id_and_service_id( notification["template"], @@ -177,7 +196,8 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i ).sms_sender else: reply_to_text = template.reply_to_text - + # Return False when trial mode services try sending notifications + # to non-team and non-simulated recipients. if not service_allowed_to_send_to(notification["to"], service, KeyType.NORMAL): current_app.logger.debug( "SMS {} failed as restricted service".format(notification_id) @@ -208,6 +228,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i reply_to_text=reply_to_text, ) + # Kick off sns process in provider_tasks.py provider_tasks.deliver_sms.apply_async( [str(saved_notification.id)], queue=QueueNames.SEND_SMS ) diff --git a/app/delivery/send_to_providers.py b/app/delivery/send_to_providers.py index 02d657e17..e7ab80fbf 100644 --- a/app/delivery/send_to_providers.py +++ b/app/delivery/send_to_providers.py @@ -22,6 +22,11 @@ from app.serialised_models import SerialisedService, SerialisedTemplate def send_sms_to_provider(notification): + ''' + This is the last step in the message send flow. + We get necessary data for recipient, template, + notification and send it off to the provider. + ''' # we no longer store the personalisation in the db, # need to retrieve from s3 before generating content # However, we are still sending the initial verify code through personalisation @@ -41,6 +46,7 @@ def send_sms_to_provider(notification): return if notification.status == NotificationStatus.CREATED: + # We get the provider here (which is only aws sns) provider = provider_to_use(NotificationType.SMS, notification.international) if not provider: technical_failure(notification=notification) diff --git a/app/job/rest.py b/app/job/rest.py index c53a82296..48a456ec4 100644 --- a/app/job/rest.py +++ b/app/job/rest.py @@ -166,6 +166,10 @@ def get_jobs_by_service(service_id): @job_blueprint.route("", methods=["POST"]) def create_job(service_id): + ''' + Entry point from UI for one-off messages + as well as CSV uploads. + ''' service = dao_fetch_service_by_id(service_id) if not service.active: raise InvalidRequest("Create job is not allowed: service is inactive ", 403) @@ -204,7 +208,7 @@ def create_job(service_id): dao_create_job(job) sender_id = data.get("sender_id") - + # Kick off job in tasks.py if job.job_status == JobStatus.PENDING: process_job.apply_async( [str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS diff --git a/docs/all.md b/docs/all.md index e20f127cb..5a87cba95 100644 --- a/docs/all.md +++ b/docs/all.md @@ -542,19 +542,24 @@ All commands use the `-g` or `--generate` to determine how many instances to loa # How messages are queued and sent +Services used during message-send flow: +1. AWS S3 +2. AWS SNS +3. AWS Cloudwatch +4. Redis +5. PostgreSQL + There are several ways for notifications to come into the API. - Messages sent through the API enter through `app/notifications/post_notifications.py` -- One-off messages sent from the UI enter through `create_one_off_notification` in `app/service/rest.py` -- CSV uploads enter through `app/job/rest.py` +- One-off messages and CSV uploads both enter from the UI through `app/job/rest.py:create_job` -API messages and one-off UI messages come in one at a time, and take slightly-separate routes -that both end up at `persist_notification`, which writes to the database, and `provider_tasks.deliver_sms`, +API messages come in one at a time, and end up at `persist_notification`, which writes to the database, and `provider_tasks.deliver_sms`, which enqueues the sending. -For CSV uploads, the CSV is first stored in S3 and queued as a `Job`. When the job runs, it iterates -through the rows, running `process_job.save_sms` to send notifications through `persist_notification` and -`provider_tasks.deliver_sms`. +One-off messages and batch messages both upload a CSV, which are then first stored in S3 and queued as a `Job`. When the job runs, it iterates +through the rows from `tasks.py:process_row`, running `tasks.py:save_sms` (email notifications branch off through `tasks.py:save_email`) to write to the db with `persist_notification` and begin the process of delivering the notification to the provider +through `provider_tasks.deliver_sms`. The exit point to the provider is in `send_to_providers.py:send_sms`. # Writing public APIs