2017-06-20 17:13:40 +01:00
|
|
|
import json
|
2017-10-11 18:14:56 +01:00
|
|
|
from datetime import datetime
|
2017-05-12 17:39:15 +01:00
|
|
|
from collections import namedtuple
|
2016-11-25 17:32:01 +00:00
|
|
|
|
2017-10-16 12:32:44 +01:00
|
|
|
from celery.signals import worker_process_shutdown
|
2016-03-31 15:57:50 +01:00
|
|
|
from flask import current_app
|
2016-04-13 15:31:08 +01:00
|
|
|
from notifications_utils.recipients import (
|
2016-09-28 17:00:17 +01:00
|
|
|
RecipientCSV
|
2016-03-31 15:57:50 +01:00
|
|
|
)
|
2017-10-12 16:21:08 +01:00
|
|
|
from notifications_utils.template import (
|
|
|
|
|
SMSMessageTemplate,
|
|
|
|
|
WithSubjectTemplate,
|
|
|
|
|
LetterDVLATemplate
|
|
|
|
|
)
|
|
|
|
|
from requests import (
|
|
|
|
|
HTTPError,
|
|
|
|
|
request
|
|
|
|
|
)
|
2016-06-20 13:33:53 +01:00
|
|
|
from sqlalchemy.exc import SQLAlchemyError
|
2016-03-31 15:57:50 +01:00
|
|
|
from app import (
|
|
|
|
|
create_uuid,
|
2017-04-12 17:56:55 +01:00
|
|
|
create_random_identifier,
|
2016-03-31 15:57:50 +01:00
|
|
|
DATETIME_FORMAT,
|
|
|
|
|
notify_celery,
|
2016-05-10 09:04:22 +01:00
|
|
|
encryption
|
2016-03-31 15:57:50 +01:00
|
|
|
)
|
|
|
|
|
from app.aws import s3
|
2016-09-28 15:05:50 +01:00
|
|
|
from app.celery import provider_tasks
|
2017-07-19 13:50:29 +01:00
|
|
|
from app.config import QueueNames
|
2017-06-20 17:13:40 +01:00
|
|
|
from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id
|
2016-06-20 13:33:53 +01:00
|
|
|
from app.dao.jobs_dao import (
|
|
|
|
|
dao_update_job,
|
2017-03-15 15:26:58 +00:00
|
|
|
dao_get_job_by_id,
|
|
|
|
|
all_notifications_are_created_for_job,
|
2017-04-05 11:57:56 +01:00
|
|
|
dao_get_all_notifications_for_job,
|
|
|
|
|
dao_update_job_status)
|
2017-09-20 11:12:37 +01:00
|
|
|
from app.dao.notifications_dao import (
|
|
|
|
|
get_notification_by_id,
|
|
|
|
|
dao_update_notifications_for_job_to_sent_to_dvla,
|
2017-10-17 11:07:36 +01:00
|
|
|
dao_update_notifications_by_reference,
|
|
|
|
|
dao_get_last_notification_added_for_job_id)
|
2017-04-06 17:16:08 +01:00
|
|
|
from app.dao.provider_details_dao import get_current_provider
|
2017-06-20 17:13:40 +01:00
|
|
|
from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service
|
2016-10-03 10:57:10 +01:00
|
|
|
from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count
|
2016-06-20 13:33:53 +01:00
|
|
|
from app.dao.templates_dao import dao_get_template_by_id
|
2016-03-09 17:46:01 +00:00
|
|
|
from app.models import (
|
2017-10-16 12:32:44 +01:00
|
|
|
Job,
|
|
|
|
|
Notification,
|
2016-06-29 11:50:54 +01:00
|
|
|
EMAIL_TYPE,
|
2017-04-06 17:16:08 +01:00
|
|
|
JOB_STATUS_CANCELLED,
|
|
|
|
|
JOB_STATUS_FINISHED,
|
2017-10-17 11:07:36 +01:00
|
|
|
JOB_STATUS_IN_PROGRESS,
|
|
|
|
|
JOB_STATUS_PENDING,
|
2017-04-06 17:16:08 +01:00
|
|
|
JOB_STATUS_READY_TO_SEND,
|
2017-09-26 09:56:09 +01:00
|
|
|
JOB_STATUS_SENT_TO_DVLA, JOB_STATUS_ERROR,
|
2017-10-17 11:07:36 +01:00
|
|
|
KEY_TYPE_NORMAL,
|
2017-10-16 12:32:44 +01:00
|
|
|
LETTER_TYPE,
|
2017-09-26 09:56:09 +01:00
|
|
|
NOTIFICATION_SENDING,
|
2017-10-16 12:32:44 +01:00
|
|
|
NOTIFICATION_TECHNICAL_FAILURE,
|
|
|
|
|
SMS_TYPE,
|
2017-09-26 09:56:09 +01:00
|
|
|
)
|
2016-11-11 10:41:39 +00:00
|
|
|
from app.notifications.process_notifications import persist_notification
|
2016-09-28 17:00:17 +01:00
|
|
|
from app.service.utils import service_allowed_to_send_to
|
2016-08-05 10:44:43 +01:00
|
|
|
from app.statsd_decorators import statsd
|
2017-04-06 10:27:13 +01:00
|
|
|
from notifications_utils.s3 import s3upload
|
2016-03-31 15:57:50 +01:00
|
|
|
|
2016-03-09 14:41:36 +00:00
|
|
|
|
2017-10-12 11:38:01 +01:00
|
|
|
@worker_process_shutdown.connect
|
|
|
|
|
def worker_process_shutdown(sender, signal, pid, exitcode):
|
|
|
|
|
current_app.logger.info('Tasks worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
|
|
|
|
|
|
|
|
|
|
|
2016-02-24 17:12:30 +00:00
|
|
|
@notify_celery.task(name="process-job")
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-02-24 17:12:30 +00:00
|
|
|
def process_job(job_id):
|
2016-02-25 11:23:04 +00:00
|
|
|
start = datetime.utcnow()
|
2016-02-24 17:12:30 +00:00
|
|
|
job = dao_get_job_by_id(job_id)
|
2016-03-09 11:28:52 +00:00
|
|
|
|
2017-02-02 11:34:00 +00:00
|
|
|
if job.job_status != JOB_STATUS_PENDING:
|
2016-10-07 12:54:04 +01:00
|
|
|
return
|
|
|
|
|
|
2016-03-09 11:28:52 +00:00
|
|
|
service = job.service
|
|
|
|
|
|
2017-02-02 11:34:00 +00:00
|
|
|
if not service.active:
|
|
|
|
|
job.job_status = JOB_STATUS_CANCELLED
|
|
|
|
|
dao_update_job(job)
|
|
|
|
|
current_app.logger.warn(
|
|
|
|
|
"Job {} has been cancelled, service {} is inactive".format(job_id, service.id))
|
|
|
|
|
return
|
|
|
|
|
|
2016-11-11 10:41:39 +00:00
|
|
|
if __sending_limits_for_job_exceeded(service, job, job_id):
|
2016-03-09 13:57:53 +00:00
|
|
|
return
|
2016-03-09 11:28:52 +00:00
|
|
|
|
2017-02-02 11:34:00 +00:00
|
|
|
job.job_status = JOB_STATUS_IN_PROGRESS
|
2017-10-12 16:21:08 +01:00
|
|
|
job.processing_started = start
|
2016-02-24 17:12:30 +00:00
|
|
|
dao_update_job(job)
|
|
|
|
|
|
2016-12-22 14:40:33 +00:00
|
|
|
db_template = dao_get_template_by_id(job.template_id, job.template_version)
|
|
|
|
|
|
2017-01-24 10:53:41 +00:00
|
|
|
TemplateClass = get_template_class(db_template.template_type)
|
2016-12-22 14:40:33 +00:00
|
|
|
template = TemplateClass(db_template.__dict__)
|
2016-03-09 07:27:26 +00:00
|
|
|
|
2017-10-10 15:04:55 +01:00
|
|
|
current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count))
|
|
|
|
|
|
2016-05-19 10:46:03 +01:00
|
|
|
for row_number, recipient, personalisation in RecipientCSV(
|
2016-04-07 13:44:04 +01:00
|
|
|
s3.get_job_from_s3(str(service.id), str(job_id)),
|
2016-03-09 14:41:36 +00:00
|
|
|
template_type=template.template_type,
|
|
|
|
|
placeholders=template.placeholders
|
2016-05-19 10:46:03 +01:00
|
|
|
).enumerated_recipients_and_personalisation:
|
2017-01-17 12:00:34 +00:00
|
|
|
process_row(row_number, recipient, personalisation, template, job, service)
|
2016-02-24 17:12:30 +00:00
|
|
|
|
2017-10-17 11:07:36 +01:00
|
|
|
job_complete(job, service, template.template_type, start=start)
|
2017-10-13 16:46:17 +01:00
|
|
|
|
|
|
|
|
|
2017-10-17 11:07:36 +01:00
|
|
|
def job_complete(job, service, template_type, resumed=False, start=None):
|
|
|
|
|
if template_type == LETTER_TYPE:
|
2017-08-29 18:15:38 +01:00
|
|
|
if service.research_mode:
|
|
|
|
|
update_job_to_sent_to_dvla.apply_async([str(job.id)], queue=QueueNames.RESEARCH_MODE)
|
|
|
|
|
else:
|
|
|
|
|
build_dvla_file.apply_async([str(job.id)], queue=QueueNames.JOBS)
|
2017-10-13 16:46:17 +01:00
|
|
|
current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job.id, QueueNames.JOBS))
|
2017-04-05 16:00:39 +01:00
|
|
|
else:
|
|
|
|
|
job.job_status = JOB_STATUS_FINISHED
|
2017-04-05 11:57:56 +01:00
|
|
|
|
2016-02-25 11:23:04 +00:00
|
|
|
finished = datetime.utcnow()
|
|
|
|
|
job.processing_finished = finished
|
2016-02-24 17:12:30 +00:00
|
|
|
dao_update_job(job)
|
2017-10-13 16:46:17 +01:00
|
|
|
|
|
|
|
|
if resumed:
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Resumed Job {} completed at {}".format(job.id, job.created_at, start, finished)
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Job {} created at {} started at {} finished at {}".format(job.id, job.created_at, start, finished)
|
|
|
|
|
)
|
2016-02-09 13:31:45 +00:00
|
|
|
|
|
|
|
|
|
2017-01-17 12:00:34 +00:00
|
|
|
def process_row(row_number, recipient, personalisation, template, job, service):
|
|
|
|
|
template_type = template.template_type
|
|
|
|
|
encrypted = encryption.encrypt({
|
|
|
|
|
'template': str(template.id),
|
|
|
|
|
'template_version': job.template_version,
|
|
|
|
|
'job': str(job.id),
|
|
|
|
|
'to': recipient,
|
|
|
|
|
'row_number': row_number,
|
|
|
|
|
'personalisation': dict(personalisation)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
send_fns = {
|
|
|
|
|
SMS_TYPE: send_sms,
|
|
|
|
|
EMAIL_TYPE: send_email,
|
2017-01-18 11:29:38 +00:00
|
|
|
LETTER_TYPE: persist_letter
|
2017-01-17 12:00:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
send_fn = send_fns[template_type]
|
|
|
|
|
|
2017-01-18 11:29:38 +00:00
|
|
|
send_fn.apply_async(
|
|
|
|
|
(
|
|
|
|
|
str(service.id),
|
|
|
|
|
create_uuid(),
|
|
|
|
|
encrypted,
|
|
|
|
|
datetime.utcnow().strftime(DATETIME_FORMAT)
|
|
|
|
|
),
|
2017-05-25 10:51:49 +01:00
|
|
|
queue=QueueNames.DATABASE if not service.research_mode else QueueNames.RESEARCH_MODE
|
2017-01-17 12:00:34 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2016-11-11 10:41:39 +00:00
|
|
|
def __sending_limits_for_job_exceeded(service, job, job_id):
|
|
|
|
|
total_sent = fetch_todays_total_message_count(service.id)
|
|
|
|
|
|
|
|
|
|
if total_sent + job.notification_count > service.message_limit:
|
|
|
|
|
job.job_status = 'sending limits exceeded'
|
|
|
|
|
job.processing_finished = datetime.utcnow()
|
|
|
|
|
dao_update_job(job)
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Job {} size {} error. Sending limits {} exceeded".format(
|
|
|
|
|
job_id, job.notification_count, service.message_limit)
|
|
|
|
|
)
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
2016-08-10 08:46:37 +01:00
|
|
|
@notify_celery.task(bind=True, name="send-sms", max_retries=5, default_retry_delay=300)
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-06-30 17:32:49 +01:00
|
|
|
def send_sms(self,
|
|
|
|
|
service_id,
|
|
|
|
|
notification_id,
|
|
|
|
|
encrypted_notification,
|
|
|
|
|
created_at,
|
|
|
|
|
api_key_id=None,
|
|
|
|
|
key_type=KEY_TYPE_NORMAL):
|
2016-02-16 15:28:30 +00:00
|
|
|
notification = encryption.decrypt(encrypted_notification)
|
2016-03-01 08:48:27 +00:00
|
|
|
service = dao_fetch_service_by_id(service_id)
|
2016-05-10 09:04:22 +01:00
|
|
|
|
2016-09-07 09:57:20 +01:00
|
|
|
if not service_allowed_to_send_to(notification['to'], service, key_type):
|
2016-04-05 14:51:55 +01:00
|
|
|
current_app.logger.info(
|
|
|
|
|
"SMS {} failed as restricted service".format(notification_id)
|
|
|
|
|
)
|
2016-05-31 14:55:06 +01:00
|
|
|
return
|
2016-03-03 12:05:18 +00:00
|
|
|
|
2016-02-15 16:01:14 +00:00
|
|
|
try:
|
2017-05-23 14:47:55 +01:00
|
|
|
saved_notification = persist_notification(
|
|
|
|
|
template_id=notification['template'],
|
|
|
|
|
template_version=notification['template_version'],
|
|
|
|
|
recipient=notification['to'],
|
|
|
|
|
service=service,
|
|
|
|
|
personalisation=notification.get('personalisation'),
|
|
|
|
|
notification_type=SMS_TYPE,
|
|
|
|
|
api_key_id=api_key_id,
|
|
|
|
|
key_type=key_type,
|
2017-10-17 16:05:31 +01:00
|
|
|
created_at=datetime.utcnow(),
|
2017-05-23 14:47:55 +01:00
|
|
|
job_id=notification.get('job', None),
|
|
|
|
|
job_row_number=notification.get('row_number', None),
|
|
|
|
|
notification_id=notification_id
|
|
|
|
|
)
|
2016-11-11 10:41:39 +00:00
|
|
|
|
2016-09-28 15:29:10 +01:00
|
|
|
provider_tasks.deliver_sms.apply_async(
|
2016-11-16 16:15:30 +00:00
|
|
|
[str(saved_notification.id)],
|
2017-07-20 16:17:04 +01:00
|
|
|
queue=QueueNames.SEND_SMS if not service.research_mode else QueueNames.RESEARCH_MODE
|
2016-09-28 15:29:10 +01:00
|
|
|
)
|
2016-04-04 15:02:25 +01:00
|
|
|
|
|
|
|
|
current_app.logger.info(
|
2016-11-11 16:00:31 +00:00
|
|
|
"SMS {} created at {} for job {}".format(saved_notification.id, created_at, notification.get('job', None))
|
2016-04-04 15:02:25 +01:00
|
|
|
)
|
2016-06-03 14:54:46 +01:00
|
|
|
|
2016-02-22 17:17:29 +00:00
|
|
|
except SQLAlchemyError as e:
|
2017-01-17 16:51:27 +00:00
|
|
|
handle_exception(self, notification, notification_id, e)
|
2016-02-22 17:17:29 +00:00
|
|
|
|
|
|
|
|
|
2016-08-10 08:46:37 +01:00
|
|
|
@notify_celery.task(bind=True, name="send-email", max_retries=5, default_retry_delay=300)
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2017-01-17 12:00:34 +00:00
|
|
|
def send_email(self,
|
|
|
|
|
service_id,
|
2016-06-30 17:32:49 +01:00
|
|
|
notification_id,
|
|
|
|
|
encrypted_notification,
|
|
|
|
|
created_at,
|
|
|
|
|
api_key_id=None,
|
|
|
|
|
key_type=KEY_TYPE_NORMAL):
|
2016-02-22 17:17:29 +00:00
|
|
|
notification = encryption.decrypt(encrypted_notification)
|
2016-04-05 14:51:55 +01:00
|
|
|
service = dao_fetch_service_by_id(service_id)
|
2016-02-25 11:23:04 +00:00
|
|
|
|
2016-09-07 09:57:20 +01:00
|
|
|
if not service_allowed_to_send_to(notification['to'], service, key_type):
|
2016-07-01 14:42:40 +01:00
|
|
|
current_app.logger.info("Email {} failed as restricted service".format(notification_id))
|
2016-05-31 14:55:06 +01:00
|
|
|
return
|
2016-03-03 12:05:18 +00:00
|
|
|
|
2016-02-22 17:17:29 +00:00
|
|
|
try:
|
2016-11-11 16:00:31 +00:00
|
|
|
saved_notification = persist_notification(
|
2016-11-11 10:41:39 +00:00
|
|
|
template_id=notification['template'],
|
|
|
|
|
template_version=notification['template_version'],
|
|
|
|
|
recipient=notification['to'],
|
2016-12-19 16:45:18 +00:00
|
|
|
service=service,
|
2016-11-11 10:41:39 +00:00
|
|
|
personalisation=notification.get('personalisation'),
|
|
|
|
|
notification_type=EMAIL_TYPE,
|
|
|
|
|
api_key_id=api_key_id,
|
|
|
|
|
key_type=key_type,
|
2017-10-17 16:05:31 +01:00
|
|
|
created_at=datetime.utcnow(),
|
2016-11-11 10:41:39 +00:00
|
|
|
job_id=notification.get('job', None),
|
|
|
|
|
job_row_number=notification.get('row_number', None),
|
2016-11-25 17:32:01 +00:00
|
|
|
notification_id=notification_id
|
2016-09-07 13:45:37 +01:00
|
|
|
)
|
2016-02-22 17:17:29 +00:00
|
|
|
|
2016-09-28 15:29:10 +01:00
|
|
|
provider_tasks.deliver_email.apply_async(
|
2016-11-16 16:15:30 +00:00
|
|
|
[str(saved_notification.id)],
|
2017-07-20 16:17:04 +01:00
|
|
|
queue=QueueNames.SEND_EMAIL if not service.research_mode else QueueNames.RESEARCH_MODE
|
2016-09-28 15:29:10 +01:00
|
|
|
)
|
2016-04-04 15:02:25 +01:00
|
|
|
|
2016-11-11 16:09:09 +00:00
|
|
|
current_app.logger.info("Email {} created at {}".format(saved_notification.id, created_at))
|
2016-02-16 17:42:04 +00:00
|
|
|
except SQLAlchemyError as e:
|
2017-01-17 16:51:27 +00:00
|
|
|
handle_exception(self, notification, notification_id, e)
|
|
|
|
|
|
|
|
|
|
|
2017-01-18 11:29:38 +00:00
|
|
|
@notify_celery.task(bind=True, name="persist-letter", max_retries=5, default_retry_delay=300)
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def persist_letter(
|
|
|
|
|
self,
|
|
|
|
|
service_id,
|
|
|
|
|
notification_id,
|
|
|
|
|
encrypted_notification,
|
|
|
|
|
created_at
|
|
|
|
|
):
|
|
|
|
|
notification = encryption.decrypt(encrypted_notification)
|
persist_letter saves address correctly to database
the `to` field stores either the phone number or the email address
of the recipient - it's a bit more complicated for letters, since
there are address lines 1 through 6, and a postcode. In utils, they're
stored alongside the personalisation, and we have to ensure that when
we persist to the database we keep as much parity with utils to make
our work easier. Aside from sending, the `to` field is also used to
show recipients on the front end report pages - we've decided that the
best thing to store here is address_line_1 - which is probably going to
be either a person's name, company name, or PO box number
Also, a lot of tests and test cleanup - I added create_template and
create_notification functions in db.py, so if you're creating new
fixtures you can use these functions, and you won't need to pass
notify_db and notify_db_session around, huzzah!
also removed create param from sample_notification since it's not used
anywhere
2017-01-19 12:10:32 +00:00
|
|
|
|
|
|
|
|
# we store the recipient as just the first item of the person's address
|
|
|
|
|
recipient = notification['personalisation']['addressline1']
|
|
|
|
|
|
2017-01-18 11:29:38 +00:00
|
|
|
service = dao_fetch_service_by_id(service_id)
|
|
|
|
|
try:
|
|
|
|
|
saved_notification = persist_notification(
|
|
|
|
|
template_id=notification['template'],
|
|
|
|
|
template_version=notification['template_version'],
|
persist_letter saves address correctly to database
the `to` field stores either the phone number or the email address
of the recipient - it's a bit more complicated for letters, since
there are address lines 1 through 6, and a postcode. In utils, they're
stored alongside the personalisation, and we have to ensure that when
we persist to the database we keep as much parity with utils to make
our work easier. Aside from sending, the `to` field is also used to
show recipients on the front end report pages - we've decided that the
best thing to store here is address_line_1 - which is probably going to
be either a person's name, company name, or PO box number
Also, a lot of tests and test cleanup - I added create_template and
create_notification functions in db.py, so if you're creating new
fixtures you can use these functions, and you won't need to pass
notify_db and notify_db_session around, huzzah!
also removed create param from sample_notification since it's not used
anywhere
2017-01-19 12:10:32 +00:00
|
|
|
recipient=recipient,
|
2017-01-18 11:29:38 +00:00
|
|
|
service=service,
|
persist_letter saves address correctly to database
the `to` field stores either the phone number or the email address
of the recipient - it's a bit more complicated for letters, since
there are address lines 1 through 6, and a postcode. In utils, they're
stored alongside the personalisation, and we have to ensure that when
we persist to the database we keep as much parity with utils to make
our work easier. Aside from sending, the `to` field is also used to
show recipients on the front end report pages - we've decided that the
best thing to store here is address_line_1 - which is probably going to
be either a person's name, company name, or PO box number
Also, a lot of tests and test cleanup - I added create_template and
create_notification functions in db.py, so if you're creating new
fixtures you can use these functions, and you won't need to pass
notify_db and notify_db_session around, huzzah!
also removed create param from sample_notification since it's not used
anywhere
2017-01-19 12:10:32 +00:00
|
|
|
personalisation=notification['personalisation'],
|
|
|
|
|
notification_type=LETTER_TYPE,
|
2017-01-18 11:29:38 +00:00
|
|
|
api_key_id=None,
|
|
|
|
|
key_type=KEY_TYPE_NORMAL,
|
2017-10-17 16:05:31 +01:00
|
|
|
created_at=datetime.utcnow(),
|
persist_letter saves address correctly to database
the `to` field stores either the phone number or the email address
of the recipient - it's a bit more complicated for letters, since
there are address lines 1 through 6, and a postcode. In utils, they're
stored alongside the personalisation, and we have to ensure that when
we persist to the database we keep as much parity with utils to make
our work easier. Aside from sending, the `to` field is also used to
show recipients on the front end report pages - we've decided that the
best thing to store here is address_line_1 - which is probably going to
be either a person's name, company name, or PO box number
Also, a lot of tests and test cleanup - I added create_template and
create_notification functions in db.py, so if you're creating new
fixtures you can use these functions, and you won't need to pass
notify_db and notify_db_session around, huzzah!
also removed create param from sample_notification since it's not used
anywhere
2017-01-19 12:10:32 +00:00
|
|
|
job_id=notification['job'],
|
|
|
|
|
job_row_number=notification['row_number'],
|
2017-04-12 17:56:55 +01:00
|
|
|
notification_id=notification_id,
|
|
|
|
|
reference=create_random_identifier()
|
2017-01-18 11:29:38 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
current_app.logger.info("Letter {} created at {}".format(saved_notification.id, created_at))
|
|
|
|
|
except SQLAlchemyError as e:
|
|
|
|
|
handle_exception(self, notification, notification_id, e)
|
|
|
|
|
|
|
|
|
|
|
2017-04-10 17:13:56 +01:00
|
|
|
@notify_celery.task(bind=True, name="build-dvla-file", countdown=60, max_retries=15, default_retry_delay=300)
|
2017-03-15 15:26:58 +00:00
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def build_dvla_file(self, job_id):
|
2017-03-17 16:25:27 +00:00
|
|
|
try:
|
|
|
|
|
if all_notifications_are_created_for_job(job_id):
|
2017-09-15 17:46:08 +01:00
|
|
|
file_contents = create_dvla_file_contents_for_job(job_id)
|
2017-04-03 11:51:47 +01:00
|
|
|
s3upload(
|
|
|
|
|
filedata=file_contents + '\n',
|
|
|
|
|
region=current_app.config['AWS_REGION'],
|
2017-09-15 17:46:08 +01:00
|
|
|
bucket_name=current_app.config['DVLA_BUCKETS']['job'],
|
2017-04-03 11:51:47 +01:00
|
|
|
file_location="{}-dvla-job.text".format(job_id)
|
|
|
|
|
)
|
2017-04-05 11:57:56 +01:00
|
|
|
dao_update_job_status(job_id, JOB_STATUS_READY_TO_SEND)
|
2017-03-17 16:25:27 +00:00
|
|
|
else:
|
|
|
|
|
current_app.logger.info("All notifications for job {} are not persisted".format(job_id))
|
2017-05-25 10:51:49 +01:00
|
|
|
self.retry(queue=QueueNames.RETRY, exc="All notifications for job {} are not persisted".format(job_id))
|
2017-03-17 16:25:27 +00:00
|
|
|
except Exception as e:
|
2017-03-17 16:57:00 +00:00
|
|
|
current_app.logger.exception("build_dvla_file threw exception")
|
2017-03-17 16:33:51 +00:00
|
|
|
raise e
|
2017-03-15 15:26:58 +00:00
|
|
|
|
|
|
|
|
|
2017-04-06 17:16:08 +01:00
|
|
|
@notify_celery.task(bind=True, name='update-letter-job-to-sent')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def update_job_to_sent_to_dvla(self, job_id):
|
|
|
|
|
# This task will be called by the FTP app to update the job to sent to dvla
|
|
|
|
|
# and update all notifications for this job to sending, provider = DVLA
|
|
|
|
|
provider = get_current_provider(LETTER_TYPE)
|
|
|
|
|
|
2017-09-15 17:46:08 +01:00
|
|
|
updated_count = dao_update_notifications_for_job_to_sent_to_dvla(job_id, provider.identifier)
|
2017-04-06 17:16:08 +01:00
|
|
|
dao_update_job_status(job_id, JOB_STATUS_SENT_TO_DVLA)
|
|
|
|
|
|
2017-04-07 10:59:12 +01:00
|
|
|
current_app.logger.info("Updated {} letter notifications to sending. "
|
|
|
|
|
"Updated {} job to {}".format(updated_count, job_id, JOB_STATUS_SENT_TO_DVLA))
|
|
|
|
|
|
2017-04-06 17:16:08 +01:00
|
|
|
|
2017-09-20 11:12:37 +01:00
|
|
|
@notify_celery.task(bind=True, name='update-letter-job-to-error')
|
2017-09-15 17:46:08 +01:00
|
|
|
@statsd(namespace="tasks")
|
2017-09-20 11:12:37 +01:00
|
|
|
def update_dvla_job_to_error(self, job_id):
|
|
|
|
|
dao_update_job_status(job_id, JOB_STATUS_ERROR)
|
|
|
|
|
current_app.logger.info("Updated {} job to {}".format(job_id, JOB_STATUS_ERROR))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(bind=True, name='update-letter-notifications-to-sent')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def update_letter_notifications_to_sent_to_dvla(self, notification_references):
|
2017-09-15 17:46:08 +01:00
|
|
|
# This task will be called by the FTP app to update notifications as sent to DVLA
|
|
|
|
|
provider = get_current_provider(LETTER_TYPE)
|
|
|
|
|
|
2017-09-20 11:12:37 +01:00
|
|
|
updated_count = dao_update_notifications_by_reference(
|
|
|
|
|
notification_references,
|
|
|
|
|
{
|
|
|
|
|
'status': NOTIFICATION_SENDING,
|
|
|
|
|
'sent_by': provider.identifier,
|
|
|
|
|
'sent_at': datetime.utcnow(),
|
|
|
|
|
'updated_at': datetime.utcnow()
|
|
|
|
|
}
|
|
|
|
|
)
|
2017-09-13 15:25:05 +01:00
|
|
|
|
2017-09-20 11:12:37 +01:00
|
|
|
current_app.logger.info("Updated {} letter notifications to sending".format(updated_count))
|
2017-09-13 15:25:05 +01:00
|
|
|
|
|
|
|
|
|
2017-09-20 11:12:37 +01:00
|
|
|
@notify_celery.task(bind=True, name='update-letter-notifications-to-error')
|
2017-04-18 11:42:48 +01:00
|
|
|
@statsd(namespace="tasks")
|
2017-09-20 11:12:37 +01:00
|
|
|
def update_letter_notifications_to_error(self, notification_references):
|
|
|
|
|
# This task will be called by the FTP app to update notifications as sent to DVLA
|
|
|
|
|
|
|
|
|
|
updated_count = dao_update_notifications_by_reference(
|
|
|
|
|
notification_references,
|
|
|
|
|
{
|
|
|
|
|
'status': NOTIFICATION_TECHNICAL_FAILURE,
|
|
|
|
|
'updated_at': datetime.utcnow()
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
current_app.logger.info("Updated {} letter notifications to technical-failure".format(updated_count))
|
2017-04-18 11:42:48 +01:00
|
|
|
|
|
|
|
|
|
2017-09-13 15:25:05 +01:00
|
|
|
def create_dvla_file_contents_for_job(job_id):
|
|
|
|
|
notifications = dao_get_all_notifications_for_job(job_id)
|
|
|
|
|
|
|
|
|
|
return create_dvla_file_contents_for_notifications(notifications)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_dvla_file_contents_for_notifications(notifications):
|
2017-04-07 14:36:00 +01:00
|
|
|
file_contents = '\n'.join(
|
|
|
|
|
str(LetterDVLATemplate(
|
|
|
|
|
notification.template.__dict__,
|
|
|
|
|
notification.personalisation,
|
2017-04-12 17:56:55 +01:00
|
|
|
notification_reference=notification.reference,
|
2017-09-26 15:06:09 +01:00
|
|
|
contact_block=notification.service.get_default_letter_contact(),
|
2017-04-21 12:39:42 +01:00
|
|
|
org_id=notification.service.dvla_organisation.id,
|
2017-04-07 14:36:00 +01:00
|
|
|
))
|
2017-09-13 15:25:05 +01:00
|
|
|
for notification in notifications
|
2017-04-07 14:36:00 +01:00
|
|
|
)
|
|
|
|
|
return file_contents
|
2017-04-05 11:57:56 +01:00
|
|
|
|
|
|
|
|
|
2017-01-17 16:51:27 +00:00
|
|
|
def handle_exception(task, notification, notification_id, exc):
|
|
|
|
|
if not get_notification_by_id(notification_id):
|
|
|
|
|
retry_msg = '{task} notification for job {job} row number {row} and notification id {noti}'.format(
|
|
|
|
|
task=task.__name__,
|
|
|
|
|
job=notification.get('job', None),
|
|
|
|
|
row=notification.get('row_number', None),
|
|
|
|
|
noti=notification_id
|
|
|
|
|
)
|
|
|
|
|
# Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems
|
|
|
|
|
# SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not
|
|
|
|
|
# send to the retry queue.
|
|
|
|
|
current_app.logger.exception('Retry' + retry_msg)
|
|
|
|
|
try:
|
2017-05-25 10:51:49 +01:00
|
|
|
task.retry(queue=QueueNames.RETRY, exc=exc)
|
2017-01-17 16:51:27 +00:00
|
|
|
except task.MaxRetriesExceededError:
|
|
|
|
|
current_app.logger.exception('Retry' + retry_msg)
|
|
|
|
|
|
|
|
|
|
|
2017-01-24 10:53:41 +00:00
|
|
|
def get_template_class(template_type):
|
|
|
|
|
if template_type == SMS_TYPE:
|
2017-01-17 16:51:27 +00:00
|
|
|
return SMSMessageTemplate
|
2017-01-24 10:53:41 +00:00
|
|
|
elif template_type in (EMAIL_TYPE, LETTER_TYPE):
|
2017-01-17 16:51:27 +00:00
|
|
|
# since we don't need rendering capabilities (we only need to extract placeholders) both email and letter can
|
|
|
|
|
# use the same base template
|
|
|
|
|
return WithSubjectTemplate
|
2017-05-12 17:39:15 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(bind=True, name='update-letter-notifications-statuses')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def update_letter_notifications_statuses(self, filename):
|
|
|
|
|
bucket_location = '{}-ftp'.format(current_app.config['NOTIFY_EMAIL_DOMAIN'])
|
2017-05-22 10:12:18 +01:00
|
|
|
response_file_content = s3.get_s3_file(bucket_location, filename)
|
2017-05-12 17:39:15 +01:00
|
|
|
|
|
|
|
|
try:
|
2017-05-22 10:12:18 +01:00
|
|
|
notification_updates = process_updates_from_file(response_file_content)
|
2017-05-12 17:39:15 +01:00
|
|
|
except TypeError:
|
|
|
|
|
current_app.logger.exception('DVLA response file: {} has an invalid format'.format(filename))
|
|
|
|
|
raise
|
|
|
|
|
else:
|
2017-05-22 10:12:18 +01:00
|
|
|
for update in notification_updates:
|
|
|
|
|
current_app.logger.info('DVLA update: {}'.format(str(update)))
|
|
|
|
|
# TODO: Update notifications with desired status
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_updates_from_file(response_file):
|
|
|
|
|
NotificationUpdate = namedtuple('NotificationUpdate', ['reference', 'status', 'page_count', 'cost_threshold'])
|
|
|
|
|
notification_updates = [NotificationUpdate(*line.split('|')) for line in response_file.splitlines()]
|
|
|
|
|
return notification_updates
|
2017-06-20 17:13:40 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(bind=True, name="send-inbound-sms", max_retries=5, default_retry_delay=300)
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def send_inbound_sms_to_service(self, inbound_sms_id, service_id):
|
|
|
|
|
inbound_api = get_service_inbound_api_for_service(service_id=service_id)
|
|
|
|
|
if not inbound_api:
|
|
|
|
|
# No API data has been set for this service
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
inbound_sms = dao_get_inbound_sms_by_id(service_id=service_id,
|
|
|
|
|
inbound_id=inbound_sms_id)
|
|
|
|
|
data = {
|
|
|
|
|
"id": str(inbound_sms.id),
|
2017-06-22 10:15:08 +01:00
|
|
|
"source_number": inbound_sms.user_number,
|
|
|
|
|
"destination_number": inbound_sms.notify_number,
|
|
|
|
|
"message": inbound_sms.content,
|
2017-06-20 17:13:40 +01:00
|
|
|
"date_received": inbound_sms.provider_date.strftime(DATETIME_FORMAT)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
response = request(
|
|
|
|
|
method="POST",
|
|
|
|
|
url=inbound_api.url,
|
|
|
|
|
data=json.dumps(data),
|
|
|
|
|
headers={
|
|
|
|
|
'Content-Type': 'application/json',
|
|
|
|
|
'Authorization': 'Bearer {}'.format(inbound_api.bearer_token)
|
|
|
|
|
},
|
|
|
|
|
timeout=60
|
|
|
|
|
)
|
|
|
|
|
try:
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
except HTTPError as e:
|
|
|
|
|
current_app.logger.exception("Exception raised in send_inbound_sms_to_service for service_id: {} and url: {}. "
|
|
|
|
|
"\n{}".format(service_id, inbound_api.url, e))
|
|
|
|
|
if e.response.status_code >= 500:
|
2017-06-21 15:29:55 +01:00
|
|
|
try:
|
|
|
|
|
self.retry(queue=QueueNames.RETRY,
|
|
|
|
|
exc='Unable to send_inbound_sms_to_service for service_id: {} and url: {}. \n{}'.format(
|
|
|
|
|
service_id, inbound_api.url, e))
|
|
|
|
|
except self.MaxRetriesExceededError:
|
|
|
|
|
current_app.logger.exception('Retry: send_inbound_sms_to_service has retried the max number of times')
|
2017-10-16 12:32:44 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name='process-incomplete-jobs')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def process_incomplete_jobs(job_ids):
|
|
|
|
|
current_app.logger.info("Resuming Job(s) {}".format(job_ids))
|
|
|
|
|
for job_id in job_ids:
|
|
|
|
|
process_incomplete_job(job_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_incomplete_job(job_id):
|
|
|
|
|
|
2017-10-17 11:07:36 +01:00
|
|
|
job = dao_get_job_by_id(job_id)
|
2017-10-16 12:32:44 +01:00
|
|
|
|
2017-10-17 11:07:36 +01:00
|
|
|
last_notification_added = dao_get_last_notification_added_for_job_id(job_id)
|
2017-10-16 12:32:44 +01:00
|
|
|
|
|
|
|
|
if last_notification_added:
|
|
|
|
|
resume_from_row = last_notification_added.job_row_number
|
|
|
|
|
else:
|
|
|
|
|
resume_from_row = -1 # The first row in the csv with a number is row 0
|
|
|
|
|
|
|
|
|
|
current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row))
|
|
|
|
|
|
|
|
|
|
db_template = dao_get_template_by_id(job.template_id, job.template_version)
|
|
|
|
|
|
|
|
|
|
TemplateClass = get_template_class(db_template.template_type)
|
|
|
|
|
template = TemplateClass(db_template.__dict__)
|
|
|
|
|
|
|
|
|
|
for row_number, recipient, personalisation in RecipientCSV(
|
|
|
|
|
s3.get_job_from_s3(str(job.service_id), str(job.id)),
|
|
|
|
|
template_type=template.template_type,
|
|
|
|
|
placeholders=template.placeholders
|
|
|
|
|
).enumerated_recipients_and_personalisation:
|
|
|
|
|
if row_number > resume_from_row:
|
|
|
|
|
process_row(row_number, recipient, personalisation, template, job, job.service)
|
|
|
|
|
|
2017-10-17 11:07:36 +01:00
|
|
|
job_complete(job, job.service, template, resumed=True)
|