Add docs/code comments for message send flow

This commit is contained in:
Andrew Shumway
2024-03-18 11:32:29 -06:00
parent 8470c2568e
commit 5f2508a975
5 changed files with 53 additions and 10 deletions

View File

@@ -91,6 +91,13 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300
)
def deliver_sms(self, notification_id):
'''
This logic will branch off to the final step in delivering
the notification to sns.
Logic is in place for delivery receipts.
Additional logic to help devs output authentication code to
terminal.
'''
try:
current_app.logger.info(
"Start sending SMS for notification id: {}".format(notification_id)
@@ -108,7 +115,7 @@ def deliver_sms(self, notification_id):
current_app.logger.warning(
ansi_green + f"AUTHENTICATION CODE: {notification.content}" + ansi_reset
)
# Code branches off to send_to_providers.py
message_id = send_to_providers.send_sms_to_provider(notification)
# We have to put it in UTC. For other timezones, the delay
# will be ignored and it will fire immediately (although this probably only affects developer testing)

View File

@@ -31,6 +31,10 @@ from app.v2.errors import TotalRequestsError
@notify_celery.task(name="process-job")
def process_job(job_id, sender_id=None):
'''
We update job status, get the csv data from s3 and begin processing
csv rows.
'''
start = datetime.utcnow()
job = dao_get_job_by_id(job_id)
current_app.logger.info(
@@ -74,6 +78,7 @@ def process_job(job_id, sender_id=None):
for row in recipient_csv.get_rows():
process_row(row, template, job, service, sender_id=sender_id)
# End point/Exit point for message send flow.
job_complete(job, start=start)
@@ -109,6 +114,10 @@ def get_recipient_csv_and_template_and_sender_id(job):
def process_row(row, template, job, service, sender_id=None):
'''
The process will branch off based on notification type,
sms or email.
'''
template_type = template.template_type
encrypted = encryption.encrypt(
{
@@ -121,6 +130,8 @@ def process_row(row, template, job, service, sender_id=None):
}
)
# Both save_sms and save_email have the same general
# persist logic.
send_fns = {NotificationType.SMS: save_sms, NotificationType.EMAIL: save_email}
send_fn = send_fns[template_type]
@@ -130,6 +141,7 @@ def process_row(row, template, job, service, sender_id=None):
task_kwargs["sender_id"] = sender_id
notification_id = create_uuid()
# Kick-off persisting notification in save_sms/save_email.
send_fn.apply_async(
(
str(service.id),
@@ -163,7 +175,14 @@ def __total_sending_limits_for_job_exceeded(service, job, job_id):
@notify_celery.task(bind=True, name="save-sms", max_retries=5, default_retry_delay=300)
def save_sms(self, service_id, notification_id, encrypted_notification, sender_id=None):
'''
We are persisting the notification to the db here and
placing notification in queue to send to sns.
'''
notification = encryption.decrypt(encrypted_notification)
# SerialisedService and SerialisedTemplate classes are
# used here to grab the same service and template from the cache
# to improve performance.
service = SerialisedService.from_id(service_id)
template = SerialisedTemplate.from_id_and_service_id(
notification["template"],
@@ -177,7 +196,8 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
).sms_sender
else:
reply_to_text = template.reply_to_text
# Return False when trial mode services try sending notifications
# to non-team and non-simulated recipients.
if not service_allowed_to_send_to(notification["to"], service, KeyType.NORMAL):
current_app.logger.debug(
"SMS {} failed as restricted service".format(notification_id)
@@ -208,6 +228,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
reply_to_text=reply_to_text,
)
# Kick off sns process in provider_tasks.py
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)], queue=QueueNames.SEND_SMS
)

View File

@@ -22,6 +22,11 @@ from app.serialised_models import SerialisedService, SerialisedTemplate
def send_sms_to_provider(notification):
'''
This is the last step in the message send flow.
We get necessary data for recipient, template,
notification and send it off to the provider.
'''
# we no longer store the personalisation in the db,
# need to retrieve from s3 before generating content
# However, we are still sending the initial verify code through personalisation
@@ -41,6 +46,7 @@ def send_sms_to_provider(notification):
return
if notification.status == NotificationStatus.CREATED:
# We get the provider here (which is only aws sns)
provider = provider_to_use(NotificationType.SMS, notification.international)
if not provider:
technical_failure(notification=notification)

View File

@@ -166,6 +166,10 @@ def get_jobs_by_service(service_id):
@job_blueprint.route("", methods=["POST"])
def create_job(service_id):
'''
Entry point from UI for one-off messages
as well as CSV uploads.
'''
service = dao_fetch_service_by_id(service_id)
if not service.active:
raise InvalidRequest("Create job is not allowed: service is inactive ", 403)
@@ -204,7 +208,7 @@ def create_job(service_id):
dao_create_job(job)
sender_id = data.get("sender_id")
# Kick off job in tasks.py
if job.job_status == JobStatus.PENDING:
process_job.apply_async(
[str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS

View File

@@ -542,19 +542,24 @@ All commands use the `-g` or `--generate` to determine how many instances to loa
# How messages are queued and sent
Services used during message-send flow:
1. AWS S3
2. AWS SNS
3. AWS Cloudwatch
4. Redis
5. PostgreSQL
There are several ways for notifications to come into the API.
- Messages sent through the API enter through `app/notifications/post_notifications.py`
- One-off messages sent from the UI enter through `create_one_off_notification` in `app/service/rest.py`
- CSV uploads enter through `app/job/rest.py`
- One-off messages and CSV uploads both enter from the UI through `app/job/rest.py:create_job`
API messages and one-off UI messages come in one at a time, and take slightly-separate routes
that both end up at `persist_notification`, which writes to the database, and `provider_tasks.deliver_sms`,
API messages come in one at a time, and end up at `persist_notification`, which writes to the database, and `provider_tasks.deliver_sms`,
which enqueues the sending.
For CSV uploads, the CSV is first stored in S3 and queued as a `Job`. When the job runs, it iterates
through the rows, running `process_job.save_sms` to send notifications through `persist_notification` and
`provider_tasks.deliver_sms`.
One-off messages and batch messages both upload a CSV, which are then first stored in S3 and queued as a `Job`. When the job runs, it iterates
through the rows from `tasks.py:process_row`, running `tasks.py:save_sms` (email notifications branch off through `tasks.py:save_email`) to write to the db with `persist_notification` and begin the process of delivering the notification to the provider
through `provider_tasks.deliver_sms`. The exit point to the provider is in `send_to_providers.py:send_sms`.
# Writing public APIs