2017-02-13 15:05:39 +00:00
|
|
|
from datetime import (
|
2017-12-18 16:19:27 +00:00
|
|
|
date,
|
2017-02-13 15:05:39 +00:00
|
|
|
datetime,
|
|
|
|
|
timedelta
|
|
|
|
|
)
|
2016-06-20 13:33:53 +01:00
|
|
|
|
2017-10-12 11:38:01 +01:00
|
|
|
from celery.signals import worker_process_shutdown
|
2016-06-20 13:33:53 +01:00
|
|
|
from flask import current_app
|
2017-10-16 12:32:44 +01:00
|
|
|
from sqlalchemy import and_
|
2016-06-20 13:33:53 +01:00
|
|
|
from sqlalchemy.exc import SQLAlchemyError
|
2017-09-13 15:25:05 +01:00
|
|
|
from notifications_utils.s3 import s3upload
|
2016-06-20 13:33:53 +01:00
|
|
|
|
2016-09-07 15:36:59 +01:00
|
|
|
from app.aws import s3
|
2016-06-20 13:33:53 +01:00
|
|
|
from app import notify_celery
|
2017-11-09 10:32:39 +00:00
|
|
|
from app.dao.services_dao import (
|
|
|
|
|
dao_fetch_monthly_historical_stats_by_template
|
|
|
|
|
)
|
|
|
|
|
from app.dao.stats_template_usage_by_month_dao import insert_or_update_stats_for_template
|
2017-08-30 14:36:16 +01:00
|
|
|
from app.performance_platform import total_sent_notifications, processing_time
|
2018-01-17 14:43:28 +00:00
|
|
|
from app import performance_platform_client, deskpro_client
|
2017-08-10 16:37:30 +01:00
|
|
|
from app.dao.date_util import get_month_start_and_end_date_in_utc
|
2017-06-02 14:28:52 +01:00
|
|
|
from app.dao.inbound_sms_dao import delete_inbound_sms_created_more_than_a_week_ago
|
2016-06-20 13:33:53 +01:00
|
|
|
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
|
2017-07-24 15:13:18 +01:00
|
|
|
from app.dao.jobs_dao import (
|
2017-08-24 11:57:46 +01:00
|
|
|
dao_get_letter_job_ids_by_status,
|
2017-07-24 15:13:18 +01:00
|
|
|
dao_set_scheduled_jobs_to_pending,
|
2017-10-16 12:32:44 +01:00
|
|
|
dao_get_jobs_older_than_limited_by
|
|
|
|
|
)
|
2017-07-24 15:13:18 +01:00
|
|
|
from app.dao.monthly_billing_dao import (
|
2017-08-10 16:37:30 +01:00
|
|
|
get_service_ids_that_need_billing_populated,
|
|
|
|
|
create_or_update_monthly_billing
|
2017-07-24 15:13:18 +01:00
|
|
|
)
|
2017-01-27 12:30:56 +00:00
|
|
|
from app.dao.notifications_dao import (
|
2017-02-13 15:05:39 +00:00
|
|
|
dao_timeout_notifications,
|
2017-05-23 13:40:36 +01:00
|
|
|
is_delivery_slow_for_provider,
|
2017-05-24 13:21:22 +01:00
|
|
|
delete_notifications_created_more_than_a_week_ago_by_type,
|
2017-12-18 16:19:27 +00:00
|
|
|
dao_get_count_of_letters_to_process_for_date,
|
2017-05-22 15:07:16 +01:00
|
|
|
dao_get_scheduled_notifications,
|
2017-09-15 17:46:08 +01:00
|
|
|
set_scheduled_notification_to_processed,
|
|
|
|
|
dao_set_created_live_letter_api_notifications_to_pending,
|
|
|
|
|
)
|
2017-05-12 12:19:56 +01:00
|
|
|
from app.dao.statistics_dao import dao_timeout_job_statistics
|
2017-02-13 15:05:39 +00:00
|
|
|
from app.dao.provider_details_dao import (
|
|
|
|
|
get_current_provider,
|
|
|
|
|
dao_toggle_sms_provider
|
2017-01-27 12:30:56 +00:00
|
|
|
)
|
2016-06-20 13:33:53 +01:00
|
|
|
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
2017-10-13 16:46:17 +01:00
|
|
|
from app.models import (
|
|
|
|
|
Job,
|
2018-01-17 14:43:28 +00:00
|
|
|
Notification,
|
|
|
|
|
NOTIFICATION_SENDING,
|
2017-10-13 16:46:17 +01:00
|
|
|
LETTER_TYPE,
|
2017-10-17 11:07:36 +01:00
|
|
|
JOB_STATUS_IN_PROGRESS,
|
|
|
|
|
JOB_STATUS_READY_TO_SEND
|
2017-10-13 16:46:17 +01:00
|
|
|
)
|
2017-05-16 13:47:22 +01:00
|
|
|
from app.notifications.process_notifications import send_notification_to_queue
|
2016-08-05 10:44:43 +01:00
|
|
|
from app.statsd_decorators import statsd
|
2017-10-13 16:46:17 +01:00
|
|
|
from app.celery.tasks import (
|
|
|
|
|
create_dvla_file_contents_for_notifications,
|
2017-10-16 12:32:44 +01:00
|
|
|
process_job
|
2017-10-13 16:46:17 +01:00
|
|
|
)
|
2017-08-22 15:49:56 +01:00
|
|
|
from app.config import QueueNames, TaskNames
|
2017-08-10 16:37:30 +01:00
|
|
|
from app.utils import convert_utc_to_bst
|
2018-01-12 15:10:42 +00:00
|
|
|
from app.v2.errors import JobIncompleteError, NoAckFileReceived
|
2017-12-07 16:37:36 +00:00
|
|
|
from app.dao.service_callback_api_dao import get_service_callback_api_for_service
|
|
|
|
|
from app.celery.service_callback_tasks import send_delivery_status_to_service
|
2016-08-24 17:03:56 +01:00
|
|
|
|
|
|
|
|
|
2017-10-12 11:38:01 +01:00
|
|
|
@worker_process_shutdown.connect
|
|
|
|
|
def worker_process_shutdown(sender, signal, pid, exitcode):
|
2017-10-12 12:31:51 +01:00
|
|
|
current_app.logger.info('Scheduled tasks worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
|
2017-10-12 11:38:01 +01:00
|
|
|
|
|
|
|
|
|
2016-09-07 15:36:59 +01:00
|
|
|
@notify_celery.task(name="remove_csv_files")
|
|
|
|
|
@statsd(namespace="tasks")
|
2017-06-06 16:02:01 +01:00
|
|
|
def remove_csv_files(job_types):
|
|
|
|
|
jobs = dao_get_jobs_older_than_limited_by(job_types=job_types)
|
2016-09-07 15:36:59 +01:00
|
|
|
for job in jobs:
|
|
|
|
|
s3.remove_job_from_s3(job.service_id, job.id)
|
|
|
|
|
current_app.logger.info("Job ID {} has been removed from s3.".format(job.id))
|
|
|
|
|
|
|
|
|
|
|
2016-08-24 17:03:56 +01:00
|
|
|
@notify_celery.task(name="run-scheduled-jobs")
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def run_scheduled_jobs():
|
|
|
|
|
try:
|
2016-10-07 12:28:42 +01:00
|
|
|
for job in dao_set_scheduled_jobs_to_pending():
|
2017-05-25 10:51:49 +01:00
|
|
|
process_job.apply_async([str(job.id)], queue=QueueNames.JOBS)
|
2016-10-07 12:55:48 +01:00
|
|
|
current_app.logger.info("Job ID {} added to process job queue".format(job.id))
|
2017-05-22 16:30:45 +01:00
|
|
|
except SQLAlchemyError:
|
2016-10-17 17:44:17 +01:00
|
|
|
current_app.logger.exception("Failed to run scheduled jobs")
|
2016-08-24 17:03:56 +01:00
|
|
|
raise
|
2016-06-20 13:33:53 +01:00
|
|
|
|
|
|
|
|
|
2017-05-16 13:47:22 +01:00
|
|
|
@notify_celery.task(name='send-scheduled-notifications')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def send_scheduled_notifications():
|
|
|
|
|
try:
|
2017-05-17 12:59:00 +01:00
|
|
|
scheduled_notifications = dao_get_scheduled_notifications()
|
|
|
|
|
for notification in scheduled_notifications:
|
2017-05-16 13:47:22 +01:00
|
|
|
send_notification_to_queue(notification, notification.service.research_mode)
|
2017-05-22 15:07:16 +01:00
|
|
|
set_scheduled_notification_to_processed(notification.id)
|
2017-05-17 12:59:00 +01:00
|
|
|
current_app.logger.info(
|
2017-05-22 16:30:45 +01:00
|
|
|
"Sent {} scheduled notifications to the provider queue".format(len(scheduled_notifications)))
|
|
|
|
|
except SQLAlchemyError:
|
2017-05-16 13:47:22 +01:00
|
|
|
current_app.logger.exception("Failed to send scheduled notifications")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
2016-06-20 13:33:53 +01:00
|
|
|
@notify_celery.task(name="delete-verify-codes")
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-06-20 13:33:53 +01:00
|
|
|
def delete_verify_codes():
|
|
|
|
|
try:
|
|
|
|
|
start = datetime.utcnow()
|
|
|
|
|
deleted = delete_codes_older_created_more_than_a_day_ago()
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Delete job started {} finished {} deleted {} verify codes".format(start, datetime.utcnow(), deleted)
|
|
|
|
|
)
|
2017-08-24 17:08:39 +01:00
|
|
|
except SQLAlchemyError:
|
2016-10-17 17:44:17 +01:00
|
|
|
current_app.logger.exception("Failed to delete verify codes")
|
2016-06-20 13:33:53 +01:00
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
2017-05-23 13:40:36 +01:00
|
|
|
@notify_celery.task(name="delete-sms-notifications")
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def delete_sms_notifications_older_than_seven_days():
|
|
|
|
|
try:
|
|
|
|
|
start = datetime.utcnow()
|
|
|
|
|
deleted = delete_notifications_created_more_than_a_week_ago_by_type('sms')
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Delete {} job started {} finished {} deleted {} sms notifications".format(
|
|
|
|
|
'sms',
|
|
|
|
|
start,
|
|
|
|
|
datetime.utcnow(),
|
|
|
|
|
deleted
|
|
|
|
|
)
|
|
|
|
|
)
|
2017-08-24 17:08:39 +01:00
|
|
|
except SQLAlchemyError:
|
2017-05-23 13:40:36 +01:00
|
|
|
current_app.logger.exception("Failed to delete sms notifications")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name="delete-email-notifications")
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2017-05-23 13:40:36 +01:00
|
|
|
def delete_email_notifications_older_than_seven_days():
|
2016-06-20 13:33:53 +01:00
|
|
|
try:
|
|
|
|
|
start = datetime.utcnow()
|
2017-05-23 13:40:36 +01:00
|
|
|
deleted = delete_notifications_created_more_than_a_week_ago_by_type('email')
|
2016-06-20 13:33:53 +01:00
|
|
|
current_app.logger.info(
|
2017-05-23 13:40:36 +01:00
|
|
|
"Delete {} job started {} finished {} deleted {} email notifications".format(
|
|
|
|
|
'email',
|
2016-06-20 13:33:53 +01:00
|
|
|
start,
|
|
|
|
|
datetime.utcnow(),
|
|
|
|
|
deleted
|
|
|
|
|
)
|
|
|
|
|
)
|
2017-08-24 17:08:39 +01:00
|
|
|
except SQLAlchemyError:
|
2017-05-23 13:40:36 +01:00
|
|
|
current_app.logger.exception("Failed to delete sms notifications")
|
2016-06-20 13:33:53 +01:00
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
2017-05-23 13:40:36 +01:00
|
|
|
@notify_celery.task(name="delete-letter-notifications")
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2017-05-23 13:40:36 +01:00
|
|
|
def delete_letter_notifications_older_than_seven_days():
|
2016-06-20 13:33:53 +01:00
|
|
|
try:
|
|
|
|
|
start = datetime.utcnow()
|
2017-05-23 13:40:36 +01:00
|
|
|
deleted = delete_notifications_created_more_than_a_week_ago_by_type('letter')
|
2016-06-20 13:33:53 +01:00
|
|
|
current_app.logger.info(
|
2017-05-23 13:40:36 +01:00
|
|
|
"Delete {} job started {} finished {} deleted {} letter notifications".format(
|
|
|
|
|
'letter',
|
2016-06-20 13:33:53 +01:00
|
|
|
start,
|
|
|
|
|
datetime.utcnow(),
|
|
|
|
|
deleted
|
|
|
|
|
)
|
|
|
|
|
)
|
2017-08-24 17:08:39 +01:00
|
|
|
except SQLAlchemyError:
|
2017-05-23 13:40:36 +01:00
|
|
|
current_app.logger.exception("Failed to delete sms notifications")
|
2016-06-20 13:33:53 +01:00
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name="delete-invitations")
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-06-20 13:33:53 +01:00
|
|
|
def delete_invitations():
|
|
|
|
|
try:
|
|
|
|
|
start = datetime.utcnow()
|
|
|
|
|
deleted = delete_invitations_created_more_than_two_days_ago()
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Delete job started {} finished {} deleted {} invitations".format(start, datetime.utcnow(), deleted)
|
|
|
|
|
)
|
2017-08-24 17:08:39 +01:00
|
|
|
except SQLAlchemyError:
|
2016-10-17 17:44:17 +01:00
|
|
|
current_app.logger.exception("Failed to delete invitations")
|
2016-06-20 13:33:53 +01:00
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name='timeout-sending-notifications')
|
2016-08-05 10:44:43 +01:00
|
|
|
@statsd(namespace="tasks")
|
2016-06-20 13:33:53 +01:00
|
|
|
def timeout_notifications():
|
2017-12-07 16:37:36 +00:00
|
|
|
notifications = dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
|
|
|
|
|
|
|
|
|
|
if notifications:
|
|
|
|
|
for notification in notifications:
|
|
|
|
|
# queue callback task only if the service_callback_api exists
|
|
|
|
|
service_callback_api = get_service_callback_api_for_service(service_id=notification.service_id)
|
|
|
|
|
|
|
|
|
|
if service_callback_api:
|
2017-12-13 11:55:08 +00:00
|
|
|
send_delivery_status_to_service.apply_async([str(id)], queue=QueueNames.CALLBACKS)
|
2017-12-07 16:37:36 +00:00
|
|
|
|
2016-09-14 10:38:34 +01:00
|
|
|
current_app.logger.info(
|
2017-12-07 16:37:36 +00:00
|
|
|
"Timeout period reached for {} notifications, status has been updated.".format(len(notifications)))
|
2017-01-27 12:30:56 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name='send-daily-performance-platform-stats')
|
|
|
|
|
@statsd(namespace="tasks")
|
2017-01-30 18:24:18 +00:00
|
|
|
def send_daily_performance_platform_stats():
|
|
|
|
|
if performance_platform_client.active:
|
2017-08-30 14:36:16 +01:00
|
|
|
send_total_sent_notifications_to_performance_platform()
|
|
|
|
|
processing_time.send_processing_time_to_performance_platform()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def send_total_sent_notifications_to_performance_platform():
|
|
|
|
|
count_dict = total_sent_notifications.get_total_sent_notifications_yesterday()
|
|
|
|
|
email_sent_count = count_dict.get('email').get('count')
|
|
|
|
|
sms_sent_count = count_dict.get('sms').get('count')
|
|
|
|
|
start_date = count_dict.get('start_date')
|
|
|
|
|
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Attempting to update performance platform for date {} with email count {} and sms count {}"
|
|
|
|
|
.format(start_date, email_sent_count, sms_sent_count)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
total_sent_notifications.send_total_notifications_sent_for_day_stats(
|
|
|
|
|
start_date,
|
|
|
|
|
'sms',
|
|
|
|
|
sms_sent_count
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
total_sent_notifications.send_total_notifications_sent_for_day_stats(
|
|
|
|
|
start_date,
|
|
|
|
|
'email',
|
|
|
|
|
email_sent_count
|
|
|
|
|
)
|
2017-02-13 15:05:39 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name='switch-current-sms-provider-on-slow-delivery')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def switch_current_sms_provider_on_slow_delivery():
|
|
|
|
|
"""
|
|
|
|
|
Switch providers if there are at least two slow delivery notifications (more than four minutes)
|
|
|
|
|
in the last ten minutes. Search from the time we last switched to the current provider.
|
|
|
|
|
"""
|
|
|
|
|
functional_test_provider_service_id = current_app.config.get('FUNCTIONAL_TEST_PROVIDER_SERVICE_ID')
|
|
|
|
|
functional_test_provider_template_id = current_app.config.get('FUNCTIONAL_TEST_PROVIDER_SMS_TEMPLATE_ID')
|
|
|
|
|
|
|
|
|
|
if functional_test_provider_service_id and functional_test_provider_template_id:
|
|
|
|
|
current_provider = get_current_provider('sms')
|
2017-02-24 13:41:32 +00:00
|
|
|
slow_delivery_notifications = is_delivery_slow_for_provider(
|
2017-02-13 15:05:39 +00:00
|
|
|
provider=current_provider.identifier,
|
|
|
|
|
threshold=2,
|
2017-02-24 13:41:32 +00:00
|
|
|
sent_at=max(datetime.utcnow() - timedelta(minutes=10), current_provider.updated_at),
|
2017-02-13 15:05:39 +00:00
|
|
|
delivery_time=timedelta(minutes=4),
|
|
|
|
|
service_id=functional_test_provider_service_id,
|
|
|
|
|
template_id=functional_test_provider_template_id
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if slow_delivery_notifications:
|
|
|
|
|
current_app.logger.warning(
|
2017-02-24 13:41:32 +00:00
|
|
|
'Slow delivery notifications detected for provider {}'.format(
|
|
|
|
|
current_provider.identifier
|
2017-02-13 15:05:39 +00:00
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
dao_toggle_sms_provider(current_provider.identifier)
|
2017-05-11 15:22:57 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name='timeout-job-statistics')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def timeout_job_statistics():
|
|
|
|
|
updated = dao_timeout_job_statistics(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
|
|
|
|
|
if updated:
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Timeout period reached for {} job statistics, failure count has been updated.".format(updated))
|
2017-06-02 14:28:52 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name="delete-inbound-sms")
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def delete_inbound_sms_older_than_seven_days():
|
|
|
|
|
try:
|
|
|
|
|
start = datetime.utcnow()
|
|
|
|
|
deleted = delete_inbound_sms_created_more_than_a_week_ago()
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Delete inbound sms job started {} finished {} deleted {} inbound sms notifications".format(
|
|
|
|
|
start,
|
|
|
|
|
datetime.utcnow(),
|
|
|
|
|
deleted
|
|
|
|
|
)
|
|
|
|
|
)
|
2017-08-24 17:08:39 +01:00
|
|
|
except SQLAlchemyError:
|
2017-06-02 14:28:52 +01:00
|
|
|
current_app.logger.exception("Failed to delete inbound sms notifications")
|
|
|
|
|
raise
|
2017-06-07 16:31:51 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name="remove_transformed_dvla_files")
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def remove_transformed_dvla_files():
|
|
|
|
|
jobs = dao_get_jobs_older_than_limited_by(job_types=[LETTER_TYPE])
|
|
|
|
|
for job in jobs:
|
|
|
|
|
s3.remove_transformed_dvla_file(job.id)
|
|
|
|
|
current_app.logger.info("Transformed dvla file for job {} has been removed from s3.".format(job.id))
|
2017-06-12 15:55:42 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name="delete_dvla_response_files")
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def delete_dvla_response_files_older_than_seven_days():
|
|
|
|
|
try:
|
|
|
|
|
start = datetime.utcnow()
|
|
|
|
|
bucket_objects = s3.get_s3_bucket_objects(
|
|
|
|
|
current_app.config['DVLA_RESPONSE_BUCKET_NAME'],
|
|
|
|
|
'root/dispatch'
|
|
|
|
|
)
|
|
|
|
|
older_than_seven_days = s3.filter_s3_bucket_objects_within_date_range(bucket_objects)
|
|
|
|
|
|
|
|
|
|
for f in older_than_seven_days:
|
|
|
|
|
s3.remove_s3_object(current_app.config['DVLA_RESPONSE_BUCKET_NAME'], f['Key'])
|
|
|
|
|
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Delete dvla response files started {} finished {} deleted {} files".format(
|
|
|
|
|
start,
|
|
|
|
|
datetime.utcnow(),
|
|
|
|
|
len(older_than_seven_days)
|
|
|
|
|
)
|
|
|
|
|
)
|
2017-08-24 17:08:39 +01:00
|
|
|
except SQLAlchemyError:
|
2017-06-12 15:55:42 +01:00
|
|
|
current_app.logger.exception("Failed to delete dvla response files")
|
|
|
|
|
raise
|
2017-07-24 15:13:18 +01:00
|
|
|
|
|
|
|
|
|
2018-01-17 14:43:28 +00:00
|
|
|
@notify_celery.task(name="raise-alert-if-letter-notifications-still-sending")
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def raise_alert_if_letter_notifications_still_sending():
|
|
|
|
|
today = datetime.utcnow().date()
|
|
|
|
|
|
|
|
|
|
# Do nothing on the weekend
|
|
|
|
|
if today.isoweekday() in [6, 7]:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if today.isoweekday() == 1:
|
|
|
|
|
offset_days = 3
|
|
|
|
|
else:
|
|
|
|
|
offset_days = 1
|
|
|
|
|
|
|
|
|
|
still_sending = Notification.query.filter(
|
|
|
|
|
Notification.notification_type == LETTER_TYPE,
|
|
|
|
|
Notification.status == NOTIFICATION_SENDING,
|
|
|
|
|
Notification.sent_at >= today - timedelta(days=offset_days),
|
|
|
|
|
Notification.sent_at < today
|
|
|
|
|
).count()
|
|
|
|
|
|
|
|
|
|
if still_sending:
|
|
|
|
|
deskpro_client.create_ticket(
|
|
|
|
|
subject="Letters still sending",
|
|
|
|
|
message="There are {} letters in the 'sending' state from {}".format(
|
|
|
|
|
still_sending,
|
|
|
|
|
(today - timedelta(days=offset_days)).strftime('%A %d %B')
|
|
|
|
|
),
|
|
|
|
|
ticket_type="alert"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2017-07-24 15:13:18 +01:00
|
|
|
@notify_celery.task(name="populate_monthly_billing")
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def populate_monthly_billing():
|
|
|
|
|
# for every service with billable units this month update billing totals for yesterday
|
|
|
|
|
# this will overwrite the existing amount.
|
|
|
|
|
yesterday = datetime.utcnow() - timedelta(days=1)
|
2017-08-10 16:37:30 +01:00
|
|
|
yesterday_in_bst = convert_utc_to_bst(yesterday)
|
|
|
|
|
start_date, end_date = get_month_start_and_end_date_in_utc(yesterday_in_bst)
|
|
|
|
|
services = get_service_ids_that_need_billing_populated(start_date=start_date, end_date=end_date)
|
|
|
|
|
[create_or_update_monthly_billing(service_id=s.service_id, billing_month=end_date) for s in services]
|
2017-08-22 09:55:47 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name="run-letter-jobs")
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def run_letter_jobs():
|
2017-08-24 11:57:46 +01:00
|
|
|
job_ids = dao_get_letter_job_ids_by_status(JOB_STATUS_READY_TO_SEND)
|
2017-09-22 14:06:31 +01:00
|
|
|
if job_ids:
|
|
|
|
|
notify_celery.send_task(
|
|
|
|
|
name=TaskNames.DVLA_JOBS,
|
|
|
|
|
args=(job_ids,),
|
|
|
|
|
queue=QueueNames.PROCESS_FTP
|
|
|
|
|
)
|
|
|
|
|
current_app.logger.info("Queued {} ready letter job ids onto {}".format(len(job_ids), QueueNames.PROCESS_FTP))
|
2017-09-13 15:25:05 +01:00
|
|
|
|
|
|
|
|
|
2017-12-19 13:18:56 +00:00
|
|
|
@notify_celery.task(name="trigger-letter-pdfs-for-day")
|
2017-12-18 16:19:27 +00:00
|
|
|
@statsd(namespace="tasks")
|
2017-12-19 13:18:56 +00:00
|
|
|
def trigger_letter_pdfs_for_day():
|
2017-12-18 16:19:27 +00:00
|
|
|
letter_pdfs_count = dao_get_count_of_letters_to_process_for_date()
|
|
|
|
|
if letter_pdfs_count:
|
|
|
|
|
notify_celery.send_task(
|
|
|
|
|
name='collate-letter-pdfs-for-day',
|
|
|
|
|
args=(date.today().strftime("%Y-%m-%d"),),
|
|
|
|
|
queue=QueueNames.LETTERS
|
|
|
|
|
)
|
|
|
|
|
current_app.logger.info("{} letter pdfs to be process by {} task".format(
|
|
|
|
|
letter_pdfs_count, 'collate-letter-pdfs-for-day'))
|
|
|
|
|
|
|
|
|
|
|
2017-09-26 12:03:06 +01:00
|
|
|
@notify_celery.task(name="run-letter-api-notifications")
|
2017-09-13 15:25:05 +01:00
|
|
|
@statsd(namespace="tasks")
|
2017-09-26 12:03:06 +01:00
|
|
|
def run_letter_api_notifications():
|
2017-09-15 17:46:08 +01:00
|
|
|
current_time = datetime.utcnow().isoformat()
|
|
|
|
|
|
|
|
|
|
notifications = dao_set_created_live_letter_api_notifications_to_pending()
|
2017-09-13 15:25:05 +01:00
|
|
|
|
2017-09-22 14:06:31 +01:00
|
|
|
if notifications:
|
|
|
|
|
file_contents = create_dvla_file_contents_for_notifications(notifications)
|
2017-09-15 17:46:08 +01:00
|
|
|
|
2017-09-26 09:56:09 +01:00
|
|
|
filename = '{}-dvla-notifications.txt'.format(current_time)
|
2017-09-22 14:06:31 +01:00
|
|
|
s3upload(
|
|
|
|
|
filedata=file_contents + '\n',
|
|
|
|
|
region=current_app.config['AWS_REGION'],
|
|
|
|
|
bucket_name=current_app.config['DVLA_BUCKETS']['notification'],
|
2017-09-26 09:56:09 +01:00
|
|
|
file_location=filename
|
2017-09-22 14:06:31 +01:00
|
|
|
)
|
2017-09-13 15:25:05 +01:00
|
|
|
|
2017-09-22 14:06:31 +01:00
|
|
|
notify_celery.send_task(
|
|
|
|
|
name=TaskNames.DVLA_NOTIFICATIONS,
|
2017-09-26 09:56:09 +01:00
|
|
|
kwargs={'filename': filename},
|
2017-09-22 14:06:31 +01:00
|
|
|
queue=QueueNames.PROCESS_FTP
|
|
|
|
|
)
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"Queued {} ready letter api notifications onto {}".format(
|
|
|
|
|
len(notifications),
|
|
|
|
|
QueueNames.PROCESS_FTP
|
|
|
|
|
)
|
2017-09-15 17:46:08 +01:00
|
|
|
)
|
2017-10-12 16:21:08 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name='check-job-status')
|
|
|
|
|
@statsd(namespace="tasks")
|
|
|
|
|
def check_job_status():
|
|
|
|
|
"""
|
|
|
|
|
every x minutes do this check
|
|
|
|
|
select
|
|
|
|
|
from jobs
|
|
|
|
|
where job_status == 'in progress'
|
|
|
|
|
and template_type in ('sms', 'email')
|
|
|
|
|
and scheduled_at or created_at is older that 30 minutes.
|
|
|
|
|
if any results then
|
|
|
|
|
raise error
|
2017-10-12 16:23:28 +01:00
|
|
|
process the rows in the csv that are missing (in another task) just do the check here.
|
2017-10-12 16:21:08 +01:00
|
|
|
"""
|
|
|
|
|
thirty_minutes_ago = datetime.utcnow() - timedelta(minutes=30)
|
|
|
|
|
thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35)
|
|
|
|
|
|
|
|
|
|
jobs_not_complete_after_30_minutes = Job.query.filter(
|
|
|
|
|
Job.job_status == JOB_STATUS_IN_PROGRESS,
|
|
|
|
|
and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago)
|
2017-10-18 09:50:39 +01:00
|
|
|
).order_by(Job.processing_started).all()
|
2017-10-12 16:21:08 +01:00
|
|
|
|
|
|
|
|
job_ids = [str(x.id) for x in jobs_not_complete_after_30_minutes]
|
|
|
|
|
if job_ids:
|
2017-10-13 16:46:17 +01:00
|
|
|
notify_celery.send_task(
|
|
|
|
|
name=TaskNames.PROCESS_INCOMPLETE_JOBS,
|
|
|
|
|
args=(job_ids,),
|
|
|
|
|
queue=QueueNames.JOBS
|
|
|
|
|
)
|
2017-10-12 16:21:08 +01:00
|
|
|
raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids))
|
2017-11-09 10:32:39 +00:00
|
|
|
|
|
|
|
|
|
2017-11-14 14:32:34 +00:00
|
|
|
@notify_celery.task(name='daily-stats-template-usage-by-month')
|
2017-11-09 10:32:39 +00:00
|
|
|
@statsd(namespace="tasks")
|
2017-11-09 14:13:42 +00:00
|
|
|
def daily_stats_template_usage_by_month():
|
2017-11-09 10:32:39 +00:00
|
|
|
results = dao_fetch_monthly_historical_stats_by_template()
|
|
|
|
|
|
|
|
|
|
for result in results:
|
2017-11-10 13:49:20 +00:00
|
|
|
if result.template_id:
|
|
|
|
|
insert_or_update_stats_for_template(
|
|
|
|
|
result.template_id,
|
|
|
|
|
result.month,
|
|
|
|
|
result.year,
|
|
|
|
|
result.count
|
|
|
|
|
)
|
2018-01-11 16:37:39 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@notify_celery.task(name='raise-alert-if-no-letter-ack-file')
|
|
|
|
|
@statsd(namespace="tasks")
|
2018-01-12 15:10:42 +00:00
|
|
|
def letter_raise_alert_if_no_ack_file_for_zip():
|
2018-01-16 09:29:31 +00:00
|
|
|
# get a list of zip files since yesterday
|
2018-01-12 15:10:42 +00:00
|
|
|
zip_file_list = []
|
2018-01-16 09:29:31 +00:00
|
|
|
|
2018-01-12 15:10:42 +00:00
|
|
|
for key in s3.get_list_of_files_by_suffix(bucket_name=current_app.config['LETTERS_PDF_BUCKET_NAME'],
|
2018-01-18 10:44:36 +00:00
|
|
|
subfolder=datetime.utcnow().strftime('%Y-%m-%d') + '/zips_sent',
|
|
|
|
|
suffix='.TXT'):
|
|
|
|
|
zip_file_list.append(key.upper().rstrip('.TXT'))
|
2018-01-12 15:10:42 +00:00
|
|
|
|
|
|
|
|
# get acknowledgement file
|
|
|
|
|
ack_file_list = []
|
2018-01-16 16:06:08 +00:00
|
|
|
# yesterday = datetime.now(tz=pytz.utc) - timedelta(days=1)
|
|
|
|
|
yesterday = datetime.utcnow() - timedelta(days=1)
|
2018-01-12 15:10:42 +00:00
|
|
|
for key in s3.get_list_of_files_by_suffix(bucket_name=current_app.config['DVLA_RESPONSE_BUCKET_NAME'],
|
2018-01-17 12:21:56 +00:00
|
|
|
subfolder='root/dispatch', suffix='.ACK.txt', last_modified=yesterday):
|
2018-01-12 15:10:42 +00:00
|
|
|
ack_file_list.append(key)
|
|
|
|
|
|
2018-01-16 09:29:31 +00:00
|
|
|
today_str = datetime.utcnow().strftime('%Y%m%d')
|
|
|
|
|
zip_not_today = []
|
2018-01-12 15:10:42 +00:00
|
|
|
|
|
|
|
|
for key in ack_file_list:
|
2018-01-16 09:29:31 +00:00
|
|
|
if today_str in key:
|
2018-01-12 15:10:42 +00:00
|
|
|
content = s3.get_s3_file(current_app.config['DVLA_RESPONSE_BUCKET_NAME'], key)
|
|
|
|
|
for zip_file in content.split('\n'): # each line
|
|
|
|
|
s = zip_file.split('|')
|
|
|
|
|
for zf in zip_file_list:
|
2018-01-18 10:44:36 +00:00
|
|
|
if s[0].upper() in zf:
|
2018-01-12 15:10:42 +00:00
|
|
|
zip_file_list.remove(zf)
|
2018-01-16 09:29:31 +00:00
|
|
|
else:
|
|
|
|
|
zip_not_today.append(s[0])
|
2018-01-12 15:10:42 +00:00
|
|
|
|
|
|
|
|
if zip_file_list:
|
|
|
|
|
raise NoAckFileReceived(message=zip_file_list)
|
2018-01-16 09:29:31 +00:00
|
|
|
|
|
|
|
|
if zip_not_today:
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
"letter ack contains zip that is not for today {} ".format(zip_not_today)
|
|
|
|
|
)
|