Merge branch 'main' into aanand-sqlalchemy-version-update

This commit is contained in:
Aditi Anand
2024-03-29 15:51:11 -04:00
29 changed files with 630 additions and 451 deletions

View File

@@ -3,15 +3,16 @@ import secrets
import string
import time
import uuid
from contextlib import contextmanager
from time import monotonic
from celery import current_task
from celery import Celery, Task, current_task
from flask import current_app, g, has_request_context, jsonify, make_response, request
from flask.ctx import has_app_context
from flask_marshmallow import Marshmallow
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy
from notifications_utils import logging, request_helper
from notifications_utils.celery import NotifyCelery
from notifications_utils.clients.encryption.encryption_client import Encryption
from notifications_utils.clients.redis.redis_client import RedisClient
from notifications_utils.clients.zendesk.zendesk_client import ZendeskClient
@@ -27,6 +28,25 @@ from app.clients.email.aws_ses_stub import AwsSesStubClient
from app.clients.sms.aws_sns import AwsSnsClient
class NotifyCelery(Celery):
def init_app(self, app):
self.task_cls = make_task(app)
# Configure Celery app with options from the main app config.
self.config_from_object(app.config["CELERY"])
def send_task(self, name, args=None, kwargs=None, **other_kwargs):
other_kwargs["headers"] = other_kwargs.get("headers") or {}
if has_request_context() and hasattr(request, "request_id"):
other_kwargs["headers"]["notify_request_id"] = request.request_id
elif has_app_context() and "request_id" in g:
other_kwargs["headers"]["notify_request_id"] = g.request_id
return super().send_task(name, args, kwargs, **other_kwargs)
class SQLAlchemy(_SQLAlchemy):
"""We need to subclass SQLAlchemy in order to override create_engine options"""
@@ -366,3 +386,58 @@ def setup_sqlalchemy_events(app):
@event.listens_for(db.engine, "checkin")
def checkin(dbapi_connection, connection_record): # noqa
pass
def make_task(app):
class NotifyTask(Task):
abstract = True
start = None
@property
def queue_name(self):
delivery_info = self.request.delivery_info or {}
return delivery_info.get("routing_key", "none")
@property
def request_id(self):
# Note that each header is a direct attribute of the
# task context (aka "request").
return self.request.get("notify_request_id")
@contextmanager
def app_context(self):
with app.app_context():
# Add 'request_id' to 'g' so that it gets logged.
g.request_id = self.request_id
yield
def on_success(self, retval, task_id, args, kwargs): # noqa
# enables request id tracing for these logs
with self.app_context():
elapsed_time = time.monotonic() - self.start
app.logger.info(
"Celery task {task_name} (queue: {queue_name}) took {time}".format(
task_name=self.name,
queue_name=self.queue_name,
time="{0:.4f}".format(elapsed_time),
)
)
def on_failure(self, exc, task_id, args, kwargs, einfo): # noqa
# enables request id tracing for these logs
with self.app_context():
app.logger.exception(
"Celery task {task_name} (queue: {queue_name}) failed".format(
task_name=self.name,
queue_name=self.queue_name,
)
)
def __call__(self, *args, **kwargs):
# ensure task has flask context to access config, logger, etc
with self.app_context():
self.start = time.monotonic()
return super().__call__(*args, **kwargs)
return NotifyTask

View File

@@ -114,8 +114,7 @@ def extract_phones(job):
job_row = 0
for row in job:
row = row.split(",")
current_app.logger.info(f"PHONE INDEX IS NOW {phone_index}")
current_app.logger.info(f"LENGTH OF ROW IS {len(row)}")
if phone_index >= len(row):
phones[job_row] = "Unavailable"
current_app.logger.error(

View File

@@ -91,6 +91,7 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300
)
def deliver_sms(self, notification_id):
"""Branch off to the final step in delivering the notification to sns and get delivery receipts."""
try:
current_app.logger.info(
"Start sending SMS for notification id: {}".format(notification_id)
@@ -108,7 +109,7 @@ def deliver_sms(self, notification_id):
current_app.logger.warning(
ansi_green + f"AUTHENTICATION CODE: {notification.content}" + ansi_reset
)
# Code branches off to send_to_providers.py
message_id = send_to_providers.send_sms_to_provider(notification)
# We have to put it in UTC. For other timezones, the delay
# will be ignored and it will fire immediately (although this probably only affects developer testing)

View File

@@ -31,6 +31,7 @@ from app.v2.errors import TotalRequestsError
@notify_celery.task(name="process-job")
def process_job(job_id, sender_id=None):
"""Update job status, get csv data from s3, and begin processing csv rows."""
start = datetime.utcnow()
job = dao_get_job_by_id(job_id)
current_app.logger.info(
@@ -74,6 +75,7 @@ def process_job(job_id, sender_id=None):
for row in recipient_csv.get_rows():
process_row(row, template, job, service, sender_id=sender_id)
# End point/Exit point for message send flow.
job_complete(job, start=start)
@@ -109,6 +111,7 @@ def get_recipient_csv_and_template_and_sender_id(job):
def process_row(row, template, job, service, sender_id=None):
"""Branch off based on notification type, sms or email."""
template_type = template.template_type
encrypted = encryption.encrypt(
{
@@ -121,6 +124,8 @@ def process_row(row, template, job, service, sender_id=None):
}
)
# Both save_sms and save_email have the same general
# persist logic.
send_fns = {NotificationType.SMS: save_sms, NotificationType.EMAIL: save_email}
send_fn = send_fns[template_type]
@@ -130,6 +135,7 @@ def process_row(row, template, job, service, sender_id=None):
task_kwargs["sender_id"] = sender_id
notification_id = create_uuid()
# Kick-off persisting notification in save_sms/save_email.
send_fn.apply_async(
(
str(service.id),
@@ -163,7 +169,11 @@ def __total_sending_limits_for_job_exceeded(service, job, job_id):
@notify_celery.task(bind=True, name="save-sms", max_retries=5, default_retry_delay=300)
def save_sms(self, service_id, notification_id, encrypted_notification, sender_id=None):
"""Persist notification to db and place notification in queue to send to sns."""
notification = encryption.decrypt(encrypted_notification)
# SerialisedService and SerialisedTemplate classes are
# used here to grab the same service and template from the cache
# to improve performance.
service = SerialisedService.from_id(service_id)
template = SerialisedTemplate.from_id_and_service_id(
notification["template"],
@@ -177,7 +187,8 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
).sms_sender
else:
reply_to_text = template.reply_to_text
# Return False when trial mode services try sending notifications
# to non-team and non-simulated recipients.
if not service_allowed_to_send_to(notification["to"], service, KeyType.NORMAL):
current_app.logger.debug(
"SMS {} failed as restricted service".format(notification_id)
@@ -208,6 +219,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
reply_to_text=reply_to_text,
)
# Kick off sns process in provider_tasks.py
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)], queue=QueueNames.SEND_SMS
)

View File

@@ -161,7 +161,7 @@
"",
"Open this link to create an account on Notify.gov:",
"",
"((url))",
"[Join Organization](((url)))",
"",
"",
"This invitation will stop working at midnight tomorrow. This is to keep ((organization_name)) secure."

View File

@@ -36,7 +36,6 @@ def get_login_gov_user(login_uuid, email_address):
should be removed.
"""
print(User.query.filter_by(login_uuid=login_uuid).first())
user = User.query.filter_by(login_uuid=login_uuid).first()
if user:
if user.email_address != email_address:

View File

@@ -22,6 +22,11 @@ from app.serialised_models import SerialisedService, SerialisedTemplate
def send_sms_to_provider(notification):
"""Final step in the message send flow.
Get data for recipient, template,
notification and send it to sns.
"""
# we no longer store the personalisation in the db,
# need to retrieve from s3 before generating content
# However, we are still sending the initial verify code through personalisation
@@ -41,6 +46,7 @@ def send_sms_to_provider(notification):
return
if notification.status == NotificationStatus.CREATED:
# We get the provider here (which is only aws sns)
provider = provider_to_use(NotificationType.SMS, notification.international)
if not provider:
technical_failure(notification=notification)

View File

@@ -1,8 +1,4 @@
from strenum import StrEnum # type: ignore [import-not-found]
# In 3.11 this is in the enum library. We will not need this external library any more.
# The line will simply change from importing from strenum to importing from enum.
# And the strenum library can then be removed from poetry.
from enum import StrEnum
class TemplateType(StrEnum):

View File

@@ -166,6 +166,7 @@ def get_jobs_by_service(service_id):
@job_blueprint.route("", methods=["POST"])
def create_job(service_id):
"""Entry point from UI for one-off messages as well as CSV uploads."""
service = dao_fetch_service_by_id(service_id)
if not service.active:
raise InvalidRequest("Create job is not allowed: service is inactive ", 403)
@@ -204,7 +205,7 @@ def create_job(service_id):
dao_create_job(job)
sender_id = data.get("sender_id")
# Kick off job in tasks.py
if job.job_status == JobStatus.PENDING:
process_job.apply_async(
[str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS

View File

@@ -1,4 +1,5 @@
import json
import os
from flask import Blueprint, current_app, jsonify, request
from itsdangerous import BadData, SignatureExpired
@@ -27,6 +28,7 @@ from app.organization.organization_schema import (
post_update_invited_org_user_status_schema,
)
from app.schema_validation import validate
from app.utils import hilite
organization_invite_blueprint = Blueprint("organization_invite", __name__)
@@ -58,10 +60,7 @@ def invite_user_to_org(organization_id):
else invited_org_user.invited_by.name
),
"organization_name": invited_org_user.organization.name,
"url": invited_org_user_url(
invited_org_user.id,
data.get("invite_link_host"),
),
"url": os.environ["LOGIN_DOT_GOV_REGISTRATION_URL"],
}
saved_notification = persist_notification(
template_id=template.id,
@@ -74,13 +73,28 @@ def invite_user_to_org(organization_id):
key_type=KeyType.NORMAL,
reply_to_text=invited_org_user.invited_by.email_address,
)
saved_notification.personalisation = personalisation
redis_store.set(
f"email-personalisation-{saved_notification.id}",
json.dumps(personalisation),
ex=1800,
)
saved_notification.personalisation = personalisation
# This is for the login.gov path, note 24 hour expiry to match
# The expiration of invitations.
redis_key = f"organization-invite-{invited_org_user.email_address}"
redis_store.set(
redis_key,
organization_id,
ex=3600 * 24,
)
current_app.logger.info(
hilite(f"STORING THIS ORGANIZATION ID IN REDIS {redis_store.get(redis_key)}")
)
current_app.logger.info(
hilite(f"URL: {os.environ['LOGIN_DOT_GOV_REGISTRATION_URL']}")
)
send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY)
return jsonify(data=invited_org_user.serialize()), 201

View File

@@ -1,6 +1,9 @@
import json
from flask import Blueprint, abort, current_app, jsonify, request
from sqlalchemy.exc import IntegrityError
from app import redis_store
from app.config import QueueNames
from app.dao.annual_billing_dao import set_default_free_allowance_for_service
from app.dao.dao_utils import transaction
@@ -210,6 +213,12 @@ def send_notifications_on_mou_signed(organization_id):
reply_to_text=notify_service.get_default_reply_to_email_address(),
)
saved_notification.personalisation = personalisation
redis_store.set(
f"email-personalisation-{saved_notification.id}",
json.dumps(personalisation),
ex=60 * 60,
)
send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY)
personalisation = {

View File

@@ -1,5 +1,6 @@
import itertools
from flask import current_app
from notifications_utils.recipients import allowed_to_send_to
from app.dao.services_dao import dao_fetch_service_by_id
@@ -45,6 +46,9 @@ def service_allowed_to_send_to(
member.recipient for member in service.guest_list if allow_guest_list_recipients
]
# As per discussion we have decided to allow official simulated
# numbers to go out in trial mode for development purposes.
guest_list_members.extend(current_app.config["SIMULATED_SMS_NUMBERS"])
if (key_type == KeyType.NORMAL and service.restricted) or (
key_type == KeyType.TEAM
):

View File

@@ -1,4 +1,5 @@
import json
import os
from datetime import datetime
from flask import Blueprint, current_app, jsonify, request
@@ -24,6 +25,7 @@ from app.notifications.process_notifications import (
send_notification_to_queue,
)
from app.schemas import invited_user_schema
from app.utils import hilite
service_invite = Blueprint("service_invite", __name__)
@@ -39,7 +41,7 @@ def _create_service_invite(invited_user, invite_link_host):
personalisation = {
"user_name": invited_user.from_user.name,
"service_name": invited_user.service.name,
"url": invited_user_url(invited_user.id, invite_link_host),
"url": os.environ["LOGIN_DOT_GOV_REGISTRATION_URL"],
}
saved_notification = persist_notification(
@@ -47,11 +49,7 @@ def _create_service_invite(invited_user, invite_link_host):
template_version=template.version,
recipient=invited_user.email_address,
service=service,
personalisation={
"user_name": invited_user.from_user.name,
"service_name": invited_user.service.name,
"url": invited_user_url(invited_user.id, invite_link_host),
},
personalisation={},
notification_type=NotificationType.EMAIL,
api_key_id=None,
key_type=KeyType.NORMAL,
@@ -63,6 +61,29 @@ def _create_service_invite(invited_user, invite_link_host):
json.dumps(personalisation),
ex=1800,
)
# The raw permissions are in the form "a,b,c,d"
# but need to be in the form ["a", "b", "c", "d"]
data = {}
permissions = invited_user.permissions
permissions = permissions.split(",")
permission_list = []
for permission in permissions:
permission_list.append(f"{permission}")
data["from_user_id"] = (str(invited_user.from_user.id),)
data["service_id"] = str(invited_user.service.id)
data["permissions"] = permission_list
data["folder_permissions"] = invited_user.folder_permissions
# This is for the login.gov service invite on the
# "Set Up Your Profile" path.
redis_store.set(
f"service-invite-{invited_user.email_address}",
json.dumps(data),
ex=3600 * 24,
)
current_app.logger.info(
hilite(f"STORING ALL THIS IN REDIS FOR SERVICE INVITE {json.dumps(data)}")
)
send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY)

View File

@@ -140,6 +140,11 @@ def update_user_attribute(user_id):
)
saved_notification.personalisation = personalisation
redis_store.set(
f"email-personalisation-{saved_notification.id}",
json.dumps(personalisation),
ex=60 * 60,
)
send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY)
return jsonify(data=user_to_update.serialize()), 200
@@ -361,6 +366,12 @@ def create_2fa_code(
# Assume that we never want to observe the Notify service's research mode
# setting for this notification - we still need to be able to log into the
# admin even if we're doing user research using this service:
redis_store.set(
f"email-personalisation-{saved_notification.id}",
json.dumps(personalisation),
ex=60 * 60,
)
send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY)
@@ -394,6 +405,11 @@ def send_user_confirm_new_email(user_id):
)
saved_notification.personalisation = personalisation
redis_store.set(
f"email-personalisation-{saved_notification.id}",
json.dumps(personalisation),
ex=60 * 60,
)
send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY)
return jsonify({}), 204
@@ -487,6 +503,12 @@ def send_already_registered_email(user_id):
current_app.logger.info("Sending notification to queue")
redis_store.set(
f"email-personalisation-{saved_notification.id}",
json.dumps(personalisation),
ex=60 * 60,
)
send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY)
current_app.logger.info("Sent notification to queue")
@@ -544,6 +566,9 @@ def get_user_login_gov_user():
login_uuid = request_args["login_uuid"]
email = request_args["email"]
user = get_login_gov_user(login_uuid, email)
if user is None:
return jsonify({})
result = user.serialize()
return jsonify(data=result)
@@ -614,6 +639,11 @@ def send_user_reset_password():
)
saved_notification.personalisation = personalisation
redis_store.set(
f"email-personalisation-{saved_notification.id}",
json.dumps(personalisation),
ex=60 * 60,
)
send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY)
return jsonify({}), 204
@@ -688,9 +718,9 @@ def get_orgs_and_services(user):
"id": service.id,
"name": service.name,
"restricted": service.restricted,
"organization": service.organization.id
if service.organization
else None,
"organization": (
service.organization.id if service.organization else None
),
}
for service in user.services
if service.active

View File

@@ -123,3 +123,11 @@ def get_reference_from_personalisation(personalisation):
if personalisation:
return personalisation.get("reference")
return None
# Function used for debugging.
# Do print(hilite(message)) while debugging, then remove your print statements
def hilite(message):
ansi_green = "\033[32m"
ansi_reset = "\033[0m"
return f"{ansi_green}{message}{ansi_reset}"