mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-18 05:11:42 -05:00
Removed all existing statsd logging and replaced with: - statsd decorator. Infers the stat name from the decorated function call. Delegates statsd call to statsd client. Calls incr and timing for each decorated method. This is applied to all tasks and all dao methods that touch the notifications/notification_history tables - statsd client changed to prefix all stats with "notification.api." - Relies on https://github.com/alphagov/notifications-utils/pull/61 for request logging. Once integrated we pass the statsd client to the logger, allowing us to statsd all API calls. This passes in the start time and the method to be called (NOT the url) onto the global flask object. We then construct statsd counters and timers in the following way notifications.api.POST.notifications.send_notification.200 This should allow us to aggregate to the level of - API or ADMIN - POST or GET etc - modules - methods - status codes Finally we count the callbacks received from 3rd parties to mapped status.
216 lines
6.9 KiB
Python
216 lines
6.9 KiB
Python
import itertools
|
|
from datetime import (datetime)
|
|
|
|
from flask import current_app
|
|
from monotonic import monotonic
|
|
from notifications_utils.recipients import (
|
|
RecipientCSV,
|
|
allowed_to_send_to
|
|
)
|
|
from notifications_utils.template import Template
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from app import (
|
|
create_uuid,
|
|
DATETIME_FORMAT,
|
|
DATE_FORMAT,
|
|
notify_celery,
|
|
encryption
|
|
)
|
|
from app.aws import s3
|
|
from app.celery.provider_tasks import send_sms_to_provider, send_email_to_provider
|
|
from app.dao.jobs_dao import (
|
|
dao_update_job,
|
|
dao_get_job_by_id
|
|
)
|
|
from app.dao.notifications_dao import (
|
|
dao_create_notification,
|
|
dao_get_notification_statistics_for_service_and_day
|
|
)
|
|
from app.dao.services_dao import dao_fetch_service_by_id
|
|
from app.dao.templates_dao import dao_get_template_by_id
|
|
from app.models import (
|
|
Notification,
|
|
EMAIL_TYPE,
|
|
SMS_TYPE,
|
|
KEY_TYPE_NORMAL
|
|
)
|
|
from app.statsd_decorators import statsd
|
|
|
|
|
|
@notify_celery.task(name="process-job")
|
|
@statsd(namespace="tasks")
|
|
def process_job(job_id):
|
|
start = datetime.utcnow()
|
|
job = dao_get_job_by_id(job_id)
|
|
|
|
service = job.service
|
|
|
|
stats = dao_get_notification_statistics_for_service_and_day(
|
|
service_id=service.id,
|
|
day=job.created_at.strftime(DATE_FORMAT)
|
|
)
|
|
|
|
total_sent = 0
|
|
if stats:
|
|
total_sent = stats.emails_requested + stats.sms_requested
|
|
|
|
if total_sent + job.notification_count > service.message_limit:
|
|
job.status = 'sending limits exceeded'
|
|
job.processing_finished = datetime.utcnow()
|
|
dao_update_job(job)
|
|
current_app.logger.info(
|
|
"Job {} size {} error. Sending limits {} exceeded".format(
|
|
job_id, job.notification_count, service.message_limit)
|
|
)
|
|
return
|
|
|
|
job.status = 'in progress'
|
|
dao_update_job(job)
|
|
|
|
template = Template(
|
|
dao_get_template_by_id(job.template_id, job.template_version).__dict__
|
|
)
|
|
|
|
for row_number, recipient, personalisation in RecipientCSV(
|
|
s3.get_job_from_s3(str(service.id), str(job_id)),
|
|
template_type=template.template_type,
|
|
placeholders=template.placeholders
|
|
).enumerated_recipients_and_personalisation:
|
|
|
|
encrypted = encryption.encrypt({
|
|
'template': str(template.id),
|
|
'template_version': job.template_version,
|
|
'job': str(job.id),
|
|
'to': recipient,
|
|
'row_number': row_number,
|
|
'personalisation': {
|
|
key: personalisation.get(key)
|
|
for key in template.placeholders
|
|
}
|
|
})
|
|
|
|
if template.template_type == SMS_TYPE:
|
|
send_sms.apply_async((
|
|
str(job.service_id),
|
|
create_uuid(),
|
|
encrypted,
|
|
datetime.utcnow().strftime(DATETIME_FORMAT)),
|
|
queue='bulk-sms'
|
|
)
|
|
|
|
if template.template_type == EMAIL_TYPE:
|
|
send_email.apply_async((
|
|
str(job.service_id),
|
|
create_uuid(),
|
|
encrypted,
|
|
datetime.utcnow().strftime(DATETIME_FORMAT)),
|
|
queue='bulk-email')
|
|
|
|
finished = datetime.utcnow()
|
|
job.status = 'finished'
|
|
job.processing_started = start
|
|
job.processing_finished = finished
|
|
dao_update_job(job)
|
|
remove_job.apply_async((str(job_id),), queue='remove-job')
|
|
current_app.logger.info(
|
|
"Job {} created at {} started at {} finished at {}".format(job_id, job.created_at, start, finished)
|
|
)
|
|
|
|
|
|
@notify_celery.task(name="remove-job")
|
|
@statsd(namespace="tasks")
|
|
def remove_job(job_id):
|
|
job = dao_get_job_by_id(job_id)
|
|
s3.remove_job_from_s3(job.service.id, str(job_id))
|
|
current_app.logger.info("Job {} has been removed from s3.".format(job_id))
|
|
|
|
|
|
@notify_celery.task(bind=True, name="send-sms", max_retries=5, default_retry_delay=5)
|
|
@statsd(namespace="tasks")
|
|
def send_sms(self,
|
|
service_id,
|
|
notification_id,
|
|
encrypted_notification,
|
|
created_at,
|
|
api_key_id=None,
|
|
key_type=KEY_TYPE_NORMAL):
|
|
notification = encryption.decrypt(encrypted_notification)
|
|
service = dao_fetch_service_by_id(service_id)
|
|
|
|
if not service_allowed_to_send_to(notification['to'], service):
|
|
current_app.logger.info(
|
|
"SMS {} failed as restricted service".format(notification_id)
|
|
)
|
|
return
|
|
|
|
try:
|
|
_save_notification(created_at, notification, notification_id, service_id, SMS_TYPE, api_key_id, key_type)
|
|
|
|
send_sms_to_provider.apply_async((service_id, notification_id), queue='sms')
|
|
|
|
current_app.logger.info(
|
|
"SMS {} created at {}".format(notification_id, created_at)
|
|
)
|
|
|
|
except SQLAlchemyError as e:
|
|
current_app.logger.exception(e)
|
|
raise self.retry(queue="retry", exc=e)
|
|
|
|
|
|
@notify_celery.task(bind=True, name="send-email", max_retries=5, default_retry_delay=5)
|
|
@statsd(namespace="tasks")
|
|
def send_email(self, service_id,
|
|
notification_id,
|
|
encrypted_notification,
|
|
created_at,
|
|
api_key_id=None,
|
|
key_type=KEY_TYPE_NORMAL):
|
|
notification = encryption.decrypt(encrypted_notification)
|
|
service = dao_fetch_service_by_id(service_id)
|
|
|
|
if not service_allowed_to_send_to(notification['to'], service):
|
|
current_app.logger.info("Email {} failed as restricted service".format(notification_id))
|
|
return
|
|
|
|
try:
|
|
_save_notification(created_at, notification, notification_id, service_id, EMAIL_TYPE, api_key_id, key_type)
|
|
|
|
send_email_to_provider.apply_async((service_id, notification_id), queue='email')
|
|
|
|
current_app.logger.info("Email {} created at {}".format(notification_id, created_at))
|
|
except SQLAlchemyError as e:
|
|
current_app.logger.exception(e)
|
|
raise self.retry(queue="retry", exc=e)
|
|
|
|
|
|
def _save_notification(created_at, notification, notification_id, service_id, notification_type, api_key_id, key_type):
|
|
notification_db_object = Notification(
|
|
id=notification_id,
|
|
template_id=notification['template'],
|
|
template_version=notification['template_version'],
|
|
to=notification['to'],
|
|
service_id=service_id,
|
|
job_id=notification.get('job', None),
|
|
job_row_number=notification.get('row_number', None),
|
|
status='created',
|
|
created_at=datetime.strptime(created_at, DATETIME_FORMAT),
|
|
personalisation=notification.get('personalisation'),
|
|
notification_type=notification_type,
|
|
api_key_id=api_key_id,
|
|
key_type=key_type
|
|
)
|
|
dao_create_notification(notification_db_object, notification_type)
|
|
|
|
|
|
def service_allowed_to_send_to(recipient, service):
|
|
if not service.restricted:
|
|
return True
|
|
|
|
return allowed_to_send_to(
|
|
recipient,
|
|
itertools.chain.from_iterable(
|
|
[user.mobile_number, user.email_address] for user in service.users
|
|
)
|
|
)
|