Merge pull request #1304 from alphagov/ken-alert-missing-notis

Added alert when job.notification_count doesn't match total notification for job
This commit is contained in:
Richard Chapman
2017-10-13 11:12:23 +01:00
committed by GitHub
8 changed files with 149 additions and 16 deletions

View File

@@ -5,6 +5,7 @@ from datetime import (
from celery.signals import worker_process_shutdown
from flask import current_app
from sqlalchemy import or_, and_
from sqlalchemy.exc import SQLAlchemyError
from notifications_utils.s3 import s3upload
@@ -18,8 +19,8 @@ from app.dao.invited_user_dao import delete_invitations_created_more_than_two_da
from app.dao.jobs_dao import (
dao_get_letter_job_ids_by_status,
dao_set_scheduled_jobs_to_pending,
dao_get_jobs_older_than_limited_by
)
dao_get_jobs_older_than_limited_by,
dao_get_job_by_id)
from app.dao.monthly_billing_dao import (
get_service_ids_that_need_billing_populated,
create_or_update_monthly_billing
@@ -38,12 +39,14 @@ from app.dao.provider_details_dao import (
dao_toggle_sms_provider
)
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.models import LETTER_TYPE, JOB_STATUS_READY_TO_SEND
from app.models import LETTER_TYPE, JOB_STATUS_READY_TO_SEND, JOB_STATUS_SENT_TO_DVLA, JOB_STATUS_FINISHED, Job, \
EMAIL_TYPE, SMS_TYPE, JOB_STATUS_IN_PROGRESS
from app.notifications.process_notifications import send_notification_to_queue
from app.statsd_decorators import statsd
from app.celery.tasks import process_job, create_dvla_file_contents_for_notifications
from app.config import QueueNames, TaskNames
from app.utils import convert_utc_to_bst
from app.v2.errors import JobIncompleteError
@worker_process_shutdown.connect
@@ -360,3 +363,30 @@ def run_letter_api_notifications():
QueueNames.PROCESS_FTP
)
)
@notify_celery.task(name='check-job-status')
@statsd(namespace="tasks")
def check_job_status():
"""
every x minutes do this check
select
from jobs
where job_status == 'in progress'
and template_type in ('sms', 'email')
and scheduled_at or created_at is older that 30 minutes.
if any results then
raise error
process the rows in the csv that are missing (in another task) just do the check here.
"""
thirty_minutes_ago = datetime.utcnow() - timedelta(minutes=30)
thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35)
jobs_not_complete_after_30_minutes = Job.query.filter(
Job.job_status == JOB_STATUS_IN_PROGRESS,
and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago)
).all()
job_ids = [str(x.id) for x in jobs_not_complete_after_30_minutes]
if job_ids:
raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids))

View File

@@ -1,5 +1,5 @@
import json
from datetime import (datetime)
from datetime import datetime
from collections import namedtuple
from celery.signals import worker_process_init, worker_process_shutdown
@@ -7,9 +7,15 @@ from flask import current_app
from notifications_utils.recipients import (
RecipientCSV
)
from notifications_utils.template import SMSMessageTemplate, WithSubjectTemplate, LetterDVLATemplate
from requests import HTTPError
from requests import request
from notifications_utils.template import (
SMSMessageTemplate,
WithSubjectTemplate,
LetterDVLATemplate
)
from requests import (
HTTPError,
request
)
from sqlalchemy.exc import SQLAlchemyError
from app import (
create_uuid,
@@ -84,6 +90,7 @@ def process_job(job_id):
return
job.job_status = JOB_STATUS_IN_PROGRESS
job.processing_started = start
dao_update_job(job)
db_template = dao_get_template_by_id(job.template_id, job.template_version)
@@ -91,6 +98,8 @@ def process_job(job_id):
TemplateClass = get_template_class(db_template.template_type)
template = TemplateClass(db_template.__dict__)
current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count))
for row_number, recipient, personalisation in RecipientCSV(
s3.get_job_from_s3(str(service.id), str(job_id)),
template_type=template.template_type,
@@ -108,7 +117,6 @@ def process_job(job_id):
job.job_status = JOB_STATUS_FINISHED
finished = datetime.utcnow()
job.processing_started = start
job.processing_finished = finished
dao_update_job(job)
current_app.logger.info(

View File

@@ -234,6 +234,11 @@ class Config(object):
'task': 'run-letter-api-notifications',
'schedule': crontab(hour=17, minute=40),
'options': {'queue': QueueNames.PERIODIC}
},
'check-job-status': {
'task': 'check-job-status',
'schedule': crontab(),
'options': {'queue': QueueNames.PERIODIC}
}
}
CELERY_QUEUES = []

View File

@@ -7,6 +7,23 @@ from app.authentication.auth import AuthError
from app.errors import InvalidRequest
class JobIncompleteError(Exception):
def __init__(self, message):
self.message = message
self.status_code = 500
def to_dict_v2(self):
return {
'status_code': self.status_code,
"errors": [
{
"error": 'JobIncompleteError',
"message": self.message
}
]
}
class TooManyRequestsError(InvalidRequest):
status_code = 429
message_template = 'Exceeded send limits ({}) for today'
@@ -49,6 +66,10 @@ def register_errors(blueprint):
current_app.logger.exception(error)
return jsonify(json.loads(error.message)), 400
@blueprint.errorhandler(JobIncompleteError)
def job_incomplete_error(error):
return jsonify(error.to_dict_v2()), 500
@blueprint.errorhandler(NoResultFound)
@blueprint.errorhandler(DataError)
def no_result_found(e):