From 5f2508a975104d809dfa0c481b3e0ce800a6e674 Mon Sep 17 00:00:00 2001 From: Andrew Shumway Date: Mon, 18 Mar 2024 11:32:29 -0600 Subject: [PATCH 1/4] Add docs/code comments for message send flow --- app/celery/provider_tasks.py | 9 ++++++++- app/celery/tasks.py | 23 ++++++++++++++++++++++- app/delivery/send_to_providers.py | 6 ++++++ app/job/rest.py | 6 +++++- docs/all.md | 19 ++++++++++++------- 5 files changed, 53 insertions(+), 10 deletions(-) diff --git a/app/celery/provider_tasks.py b/app/celery/provider_tasks.py index 47e4c0bd8..743d37b72 100644 --- a/app/celery/provider_tasks.py +++ b/app/celery/provider_tasks.py @@ -91,6 +91,13 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at): bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300 ) def deliver_sms(self, notification_id): + ''' + This logic will branch off to the final step in delivering + the notification to sns. + Logic is in place for delivery receipts. + Additional logic to help devs output authentication code to + terminal. + ''' try: current_app.logger.info( "Start sending SMS for notification id: {}".format(notification_id) @@ -108,7 +115,7 @@ def deliver_sms(self, notification_id): current_app.logger.warning( ansi_green + f"AUTHENTICATION CODE: {notification.content}" + ansi_reset ) - + # Code branches off to send_to_providers.py message_id = send_to_providers.send_sms_to_provider(notification) # We have to put it in UTC. For other timezones, the delay # will be ignored and it will fire immediately (although this probably only affects developer testing) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index b7cb43c6d..f2bcec884 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -31,6 +31,10 @@ from app.v2.errors import TotalRequestsError @notify_celery.task(name="process-job") def process_job(job_id, sender_id=None): + ''' + We update job status, get the csv data from s3 and begin processing + csv rows. + ''' start = datetime.utcnow() job = dao_get_job_by_id(job_id) current_app.logger.info( @@ -74,6 +78,7 @@ def process_job(job_id, sender_id=None): for row in recipient_csv.get_rows(): process_row(row, template, job, service, sender_id=sender_id) + # End point/Exit point for message send flow. job_complete(job, start=start) @@ -109,6 +114,10 @@ def get_recipient_csv_and_template_and_sender_id(job): def process_row(row, template, job, service, sender_id=None): + ''' + The process will branch off based on notification type, + sms or email. + ''' template_type = template.template_type encrypted = encryption.encrypt( { @@ -121,6 +130,8 @@ def process_row(row, template, job, service, sender_id=None): } ) + # Both save_sms and save_email have the same general + # persist logic. send_fns = {NotificationType.SMS: save_sms, NotificationType.EMAIL: save_email} send_fn = send_fns[template_type] @@ -130,6 +141,7 @@ def process_row(row, template, job, service, sender_id=None): task_kwargs["sender_id"] = sender_id notification_id = create_uuid() + # Kick-off persisting notification in save_sms/save_email. send_fn.apply_async( ( str(service.id), @@ -163,7 +175,14 @@ def __total_sending_limits_for_job_exceeded(service, job, job_id): @notify_celery.task(bind=True, name="save-sms", max_retries=5, default_retry_delay=300) def save_sms(self, service_id, notification_id, encrypted_notification, sender_id=None): + ''' + We are persisting the notification to the db here and + placing notification in queue to send to sns. + ''' notification = encryption.decrypt(encrypted_notification) + # SerialisedService and SerialisedTemplate classes are + # used here to grab the same service and template from the cache + # to improve performance. service = SerialisedService.from_id(service_id) template = SerialisedTemplate.from_id_and_service_id( notification["template"], @@ -177,7 +196,8 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i ).sms_sender else: reply_to_text = template.reply_to_text - + # Return False when trial mode services try sending notifications + # to non-team and non-simulated recipients. if not service_allowed_to_send_to(notification["to"], service, KeyType.NORMAL): current_app.logger.debug( "SMS {} failed as restricted service".format(notification_id) @@ -208,6 +228,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i reply_to_text=reply_to_text, ) + # Kick off sns process in provider_tasks.py provider_tasks.deliver_sms.apply_async( [str(saved_notification.id)], queue=QueueNames.SEND_SMS ) diff --git a/app/delivery/send_to_providers.py b/app/delivery/send_to_providers.py index 02d657e17..e7ab80fbf 100644 --- a/app/delivery/send_to_providers.py +++ b/app/delivery/send_to_providers.py @@ -22,6 +22,11 @@ from app.serialised_models import SerialisedService, SerialisedTemplate def send_sms_to_provider(notification): + ''' + This is the last step in the message send flow. + We get necessary data for recipient, template, + notification and send it off to the provider. + ''' # we no longer store the personalisation in the db, # need to retrieve from s3 before generating content # However, we are still sending the initial verify code through personalisation @@ -41,6 +46,7 @@ def send_sms_to_provider(notification): return if notification.status == NotificationStatus.CREATED: + # We get the provider here (which is only aws sns) provider = provider_to_use(NotificationType.SMS, notification.international) if not provider: technical_failure(notification=notification) diff --git a/app/job/rest.py b/app/job/rest.py index c53a82296..48a456ec4 100644 --- a/app/job/rest.py +++ b/app/job/rest.py @@ -166,6 +166,10 @@ def get_jobs_by_service(service_id): @job_blueprint.route("", methods=["POST"]) def create_job(service_id): + ''' + Entry point from UI for one-off messages + as well as CSV uploads. + ''' service = dao_fetch_service_by_id(service_id) if not service.active: raise InvalidRequest("Create job is not allowed: service is inactive ", 403) @@ -204,7 +208,7 @@ def create_job(service_id): dao_create_job(job) sender_id = data.get("sender_id") - + # Kick off job in tasks.py if job.job_status == JobStatus.PENDING: process_job.apply_async( [str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS diff --git a/docs/all.md b/docs/all.md index e20f127cb..5a87cba95 100644 --- a/docs/all.md +++ b/docs/all.md @@ -542,19 +542,24 @@ All commands use the `-g` or `--generate` to determine how many instances to loa # How messages are queued and sent +Services used during message-send flow: +1. AWS S3 +2. AWS SNS +3. AWS Cloudwatch +4. Redis +5. PostgreSQL + There are several ways for notifications to come into the API. - Messages sent through the API enter through `app/notifications/post_notifications.py` -- One-off messages sent from the UI enter through `create_one_off_notification` in `app/service/rest.py` -- CSV uploads enter through `app/job/rest.py` +- One-off messages and CSV uploads both enter from the UI through `app/job/rest.py:create_job` -API messages and one-off UI messages come in one at a time, and take slightly-separate routes -that both end up at `persist_notification`, which writes to the database, and `provider_tasks.deliver_sms`, +API messages come in one at a time, and end up at `persist_notification`, which writes to the database, and `provider_tasks.deliver_sms`, which enqueues the sending. -For CSV uploads, the CSV is first stored in S3 and queued as a `Job`. When the job runs, it iterates -through the rows, running `process_job.save_sms` to send notifications through `persist_notification` and -`provider_tasks.deliver_sms`. +One-off messages and batch messages both upload a CSV, which are then first stored in S3 and queued as a `Job`. When the job runs, it iterates +through the rows from `tasks.py:process_row`, running `tasks.py:save_sms` (email notifications branch off through `tasks.py:save_email`) to write to the db with `persist_notification` and begin the process of delivering the notification to the provider +through `provider_tasks.deliver_sms`. The exit point to the provider is in `send_to_providers.py:send_sms`. # Writing public APIs From 6599c284cca4495f89a35b7f1bf351a02d23e0c3 Mon Sep 17 00:00:00 2001 From: Andrew Shumway Date: Mon, 18 Mar 2024 11:45:40 -0600 Subject: [PATCH 2/4] Add docs/code comments for message-send-flow --- app/celery/provider_tasks.py | 4 ++-- app/celery/tasks.py | 12 ++++++------ app/delivery/send_to_providers.py | 4 ++-- app/job/rest.py | 4 ++-- migrations/versions/0410_enums_for_everything.py | 6 ++++-- 5 files changed, 16 insertions(+), 14 deletions(-) diff --git a/app/celery/provider_tasks.py b/app/celery/provider_tasks.py index 743d37b72..e8d120a56 100644 --- a/app/celery/provider_tasks.py +++ b/app/celery/provider_tasks.py @@ -91,13 +91,13 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at): bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300 ) def deliver_sms(self, notification_id): - ''' + """ This logic will branch off to the final step in delivering the notification to sns. Logic is in place for delivery receipts. Additional logic to help devs output authentication code to terminal. - ''' + """ try: current_app.logger.info( "Start sending SMS for notification id: {}".format(notification_id) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index f2bcec884..540e29177 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -31,10 +31,10 @@ from app.v2.errors import TotalRequestsError @notify_celery.task(name="process-job") def process_job(job_id, sender_id=None): - ''' + """ We update job status, get the csv data from s3 and begin processing csv rows. - ''' + """ start = datetime.utcnow() job = dao_get_job_by_id(job_id) current_app.logger.info( @@ -114,10 +114,10 @@ def get_recipient_csv_and_template_and_sender_id(job): def process_row(row, template, job, service, sender_id=None): - ''' + """ The process will branch off based on notification type, sms or email. - ''' + """ template_type = template.template_type encrypted = encryption.encrypt( { @@ -175,10 +175,10 @@ def __total_sending_limits_for_job_exceeded(service, job, job_id): @notify_celery.task(bind=True, name="save-sms", max_retries=5, default_retry_delay=300) def save_sms(self, service_id, notification_id, encrypted_notification, sender_id=None): - ''' + """ We are persisting the notification to the db here and placing notification in queue to send to sns. - ''' + """ notification = encryption.decrypt(encrypted_notification) # SerialisedService and SerialisedTemplate classes are # used here to grab the same service and template from the cache diff --git a/app/delivery/send_to_providers.py b/app/delivery/send_to_providers.py index e7ab80fbf..5e9373a60 100644 --- a/app/delivery/send_to_providers.py +++ b/app/delivery/send_to_providers.py @@ -22,11 +22,11 @@ from app.serialised_models import SerialisedService, SerialisedTemplate def send_sms_to_provider(notification): - ''' + """ This is the last step in the message send flow. We get necessary data for recipient, template, notification and send it off to the provider. - ''' + """ # we no longer store the personalisation in the db, # need to retrieve from s3 before generating content # However, we are still sending the initial verify code through personalisation diff --git a/app/job/rest.py b/app/job/rest.py index 48a456ec4..46fe698da 100644 --- a/app/job/rest.py +++ b/app/job/rest.py @@ -166,10 +166,10 @@ def get_jobs_by_service(service_id): @job_blueprint.route("", methods=["POST"]) def create_job(service_id): - ''' + """ Entry point from UI for one-off messages as well as CSV uploads. - ''' + """ service = dao_fetch_service_by_id(service_id) if not service.active: raise InvalidRequest("Create job is not allowed: service is inactive ", 403) diff --git a/migrations/versions/0410_enums_for_everything.py b/migrations/versions/0410_enums_for_everything.py index b6c9042c6..4467e0e9d 100644 --- a/migrations/versions/0410_enums_for_everything.py +++ b/migrations/versions/0410_enums_for_everything.py @@ -469,7 +469,8 @@ def upgrade(): postgresql_using=enum_using("notification_type", NotificationType), ) # Clobbering bad data here. These are values we don't use any more, and anything with them is unnecessary. - op.execute(""" + op.execute( + """ delete from service_permissions where @@ -480,7 +481,8 @@ def upgrade(): 'international_letters', 'broadcast' ); - """) + """ + ) op.alter_column( "service_permissions", "permission", From d3b895ddc6ad2b1ad492b5258f5db45646995005 Mon Sep 17 00:00:00 2001 From: Andrew Shumway Date: Wed, 27 Mar 2024 10:32:40 -0600 Subject: [PATCH 3/4] PEP-257 changes --- app/celery/provider_tasks.py | 8 +------- app/celery/tasks.py | 15 +++------------ app/delivery/send_to_providers.py | 8 ++++---- app/job/rest.py | 5 +---- 4 files changed, 9 insertions(+), 27 deletions(-) diff --git a/app/celery/provider_tasks.py b/app/celery/provider_tasks.py index e8d120a56..5d8d05ca2 100644 --- a/app/celery/provider_tasks.py +++ b/app/celery/provider_tasks.py @@ -91,13 +91,7 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at): bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300 ) def deliver_sms(self, notification_id): - """ - This logic will branch off to the final step in delivering - the notification to sns. - Logic is in place for delivery receipts. - Additional logic to help devs output authentication code to - terminal. - """ + """Branch off to the final step in delivering the notification to sns and get delivery receipts.""" try: current_app.logger.info( "Start sending SMS for notification id: {}".format(notification_id) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 540e29177..ff7ead5ef 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -31,10 +31,7 @@ from app.v2.errors import TotalRequestsError @notify_celery.task(name="process-job") def process_job(job_id, sender_id=None): - """ - We update job status, get the csv data from s3 and begin processing - csv rows. - """ + """ Update job status, get csv data from s3, and begin processing csv rows.""" start = datetime.utcnow() job = dao_get_job_by_id(job_id) current_app.logger.info( @@ -114,10 +111,7 @@ def get_recipient_csv_and_template_and_sender_id(job): def process_row(row, template, job, service, sender_id=None): - """ - The process will branch off based on notification type, - sms or email. - """ + """Branch off based on notification type, sms or email.""" template_type = template.template_type encrypted = encryption.encrypt( { @@ -175,10 +169,7 @@ def __total_sending_limits_for_job_exceeded(service, job, job_id): @notify_celery.task(bind=True, name="save-sms", max_retries=5, default_retry_delay=300) def save_sms(self, service_id, notification_id, encrypted_notification, sender_id=None): - """ - We are persisting the notification to the db here and - placing notification in queue to send to sns. - """ + """Persist notification to db and place notification in queue to send to sns.""" notification = encryption.decrypt(encrypted_notification) # SerialisedService and SerialisedTemplate classes are # used here to grab the same service and template from the cache diff --git a/app/delivery/send_to_providers.py b/app/delivery/send_to_providers.py index 5e9373a60..98f86396a 100644 --- a/app/delivery/send_to_providers.py +++ b/app/delivery/send_to_providers.py @@ -22,10 +22,10 @@ from app.serialised_models import SerialisedService, SerialisedTemplate def send_sms_to_provider(notification): - """ - This is the last step in the message send flow. - We get necessary data for recipient, template, - notification and send it off to the provider. + """Final step in the message send flow. + + Get data for recipient, template, + notification and send it to sns. """ # we no longer store the personalisation in the db, # need to retrieve from s3 before generating content diff --git a/app/job/rest.py b/app/job/rest.py index 46fe698da..d139fe9ae 100644 --- a/app/job/rest.py +++ b/app/job/rest.py @@ -166,10 +166,7 @@ def get_jobs_by_service(service_id): @job_blueprint.route("", methods=["POST"]) def create_job(service_id): - """ - Entry point from UI for one-off messages - as well as CSV uploads. - """ + """Entry point from UI for one-off messages as well as CSV uploads.""" service = dao_fetch_service_by_id(service_id) if not service.active: raise InvalidRequest("Create job is not allowed: service is inactive ", 403) From 1547e80e37bdd7f3c29e90edc63046524f4aaa03 Mon Sep 17 00:00:00 2001 From: Andrew Shumway Date: Thu, 28 Mar 2024 09:35:53 -0600 Subject: [PATCH 4/4] Fix spacing --- app/celery/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index ff7ead5ef..9ab84cb2e 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -31,7 +31,7 @@ from app.v2.errors import TotalRequestsError @notify_celery.task(name="process-job") def process_job(job_id, sender_id=None): - """ Update job status, get csv data from s3, and begin processing csv rows.""" + """Update job status, get csv data from s3, and begin processing csv rows.""" start = datetime.utcnow() job = dao_get_job_by_id(job_id) current_app.logger.info(