create cronitor decorator that alerts if tasks fail

make a decorator that pings cronitor before and after each task run.
Designed for use with nightly tasks, so we have visibility if they
fail. We have a bunch of cronitor monitors set up - 5 character keys
that go into a URL that we then make a GET to with a self-explanatory
url path (run/fail/complete).

the cronitor URLs are defined in the credentials repo as a dictionary
of celery task names to URL slugs. If the name passed in to the
decorator  isn't in that dict, it won't run.

to use it, all you need to do is call `@cronitor(my_task_name)`
instead of `@notify_celery.task`, and make sure that the task name and
the matching slug are included in the credentials repo (or locally,
json dumped and stored in the CRONITOR_KEYS environment variable)
This commit is contained in:
Leo Hemsted
2019-01-16 14:11:03 +00:00
parent d783e2b236
commit 754c65a6a2
7 changed files with 96 additions and 15 deletions

View File

@@ -30,15 +30,30 @@ from app.exceptions import NotificationTechnicalFailureException
from app.models import ( from app.models import (
Notification, Notification,
NOTIFICATION_SENDING, NOTIFICATION_SENDING,
EMAIL_TYPE,
SMS_TYPE,
LETTER_TYPE, LETTER_TYPE,
KEY_TYPE_NORMAL KEY_TYPE_NORMAL
) )
from app.performance_platform import total_sent_notifications, processing_time from app.performance_platform import total_sent_notifications, processing_time
from app.cronitor import cronitor
@notify_celery.task(name="remove_csv_files") @notify_celery.task(name="remove_sms_email_jobs")
@cronitor("remove_sms_email_jobs")
@statsd(namespace="tasks") @statsd(namespace="tasks")
def remove_csv_files(job_types): def remove_sms_email_csv_files(job_types):
_remove_csv_files([EMAIL_TYPE, SMS_TYPE])
@notify_celery.task(name="remove_letter_jobs")
@cronitor("remove_letter_jobs")
@statsd(namespace="tasks")
def remove_letter_csv_files(job_types):
_remove_csv_files([LETTER_TYPE])
def _remove_csv_files(job_types):
jobs = dao_get_jobs_older_than_data_retention(notification_types=job_types) jobs = dao_get_jobs_older_than_data_retention(notification_types=job_types)
for job in jobs: for job in jobs:
s3.remove_job_from_s3(job.service_id, job.id) s3.remove_job_from_s3(job.service_id, job.id)
@@ -47,6 +62,7 @@ def remove_csv_files(job_types):
@notify_celery.task(name="delete-sms-notifications") @notify_celery.task(name="delete-sms-notifications")
@cronitor("delete-sms-notifications")
@statsd(namespace="tasks") @statsd(namespace="tasks")
def delete_sms_notifications_older_than_seven_days(): def delete_sms_notifications_older_than_seven_days():
try: try:
@@ -66,6 +82,7 @@ def delete_sms_notifications_older_than_seven_days():
@notify_celery.task(name="delete-email-notifications") @notify_celery.task(name="delete-email-notifications")
@cronitor("delete-email-notifications")
@statsd(namespace="tasks") @statsd(namespace="tasks")
def delete_email_notifications_older_than_seven_days(): def delete_email_notifications_older_than_seven_days():
try: try:
@@ -85,6 +102,7 @@ def delete_email_notifications_older_than_seven_days():
@notify_celery.task(name="delete-letter-notifications") @notify_celery.task(name="delete-letter-notifications")
@cronitor("delete-letter-notifications")
@statsd(namespace="tasks") @statsd(namespace="tasks")
def delete_letter_notifications_older_than_seven_days(): def delete_letter_notifications_older_than_seven_days():
try: try:
@@ -104,6 +122,7 @@ def delete_letter_notifications_older_than_seven_days():
@notify_celery.task(name='timeout-sending-notifications') @notify_celery.task(name='timeout-sending-notifications')
@cronitor('timeout-sending-notifications')
@statsd(namespace="tasks") @statsd(namespace="tasks")
def timeout_notifications(): def timeout_notifications():
technical_failure_notifications, temporary_failure_notifications = \ technical_failure_notifications, temporary_failure_notifications = \
@@ -128,6 +147,7 @@ def timeout_notifications():
@notify_celery.task(name='send-daily-performance-platform-stats') @notify_celery.task(name='send-daily-performance-platform-stats')
@cronitor('send-daily-performance-platform-stats')
@statsd(namespace="tasks") @statsd(namespace="tasks")
def send_daily_performance_platform_stats(): def send_daily_performance_platform_stats():
if performance_platform_client.active: if performance_platform_client.active:
@@ -168,6 +188,7 @@ def send_total_sent_notifications_to_performance_platform(day):
@notify_celery.task(name="delete-inbound-sms") @notify_celery.task(name="delete-inbound-sms")
@cronitor("delete-inbound-sms")
@statsd(namespace="tasks") @statsd(namespace="tasks")
def delete_inbound_sms_older_than_seven_days(): def delete_inbound_sms_older_than_seven_days():
try: try:
@@ -186,6 +207,7 @@ def delete_inbound_sms_older_than_seven_days():
@notify_celery.task(name="remove_transformed_dvla_files") @notify_celery.task(name="remove_transformed_dvla_files")
@cronitor("remove_transformed_dvla_files")
@statsd(namespace="tasks") @statsd(namespace="tasks")
def remove_transformed_dvla_files(): def remove_transformed_dvla_files():
jobs = dao_get_jobs_older_than_data_retention(notification_types=[LETTER_TYPE]) jobs = dao_get_jobs_older_than_data_retention(notification_types=[LETTER_TYPE])
@@ -194,6 +216,7 @@ def remove_transformed_dvla_files():
current_app.logger.info("Transformed dvla file for job {} has been removed from s3.".format(job.id)) current_app.logger.info("Transformed dvla file for job {} has been removed from s3.".format(job.id))
# TODO: remove me, i'm not being run by anything
@notify_celery.task(name="delete_dvla_response_files") @notify_celery.task(name="delete_dvla_response_files")
@statsd(namespace="tasks") @statsd(namespace="tasks")
def delete_dvla_response_files_older_than_seven_days(): def delete_dvla_response_files_older_than_seven_days():
@@ -221,6 +244,7 @@ def delete_dvla_response_files_older_than_seven_days():
@notify_celery.task(name="raise-alert-if-letter-notifications-still-sending") @notify_celery.task(name="raise-alert-if-letter-notifications-still-sending")
@cronitor("raise-alert-if-letter-notifications-still-sending")
@statsd(namespace="tasks") @statsd(namespace="tasks")
def raise_alert_if_letter_notifications_still_sending(): def raise_alert_if_letter_notifications_still_sending():
today = datetime.utcnow().date() today = datetime.utcnow().date()
@@ -257,6 +281,7 @@ def raise_alert_if_letter_notifications_still_sending():
@notify_celery.task(name='raise-alert-if-no-letter-ack-file') @notify_celery.task(name='raise-alert-if-no-letter-ack-file')
@cronitor('raise-alert-if-no-letter-ack-file')
@statsd(namespace="tasks") @statsd(namespace="tasks")
def letter_raise_alert_if_no_ack_file_for_zip(): def letter_raise_alert_if_no_ack_file_for_zip():
# get a list of zip files since yesterday # get a list of zip files since yesterday

View File

@@ -4,6 +4,7 @@ from flask import current_app
from notifications_utils.statsd_decorators import statsd from notifications_utils.statsd_decorators import statsd
from app import notify_celery from app import notify_celery
from app.cronitor import cronitor
from app.dao.fact_billing_dao import ( from app.dao.fact_billing_dao import (
fetch_billing_data_for_day, fetch_billing_data_for_day,
update_fact_billing update_fact_billing
@@ -12,6 +13,7 @@ from app.dao.fact_notification_status_dao import fetch_notification_status_for_d
@notify_celery.task(name="create-nightly-billing") @notify_celery.task(name="create-nightly-billing")
@cronitor("create-nightly-billing")
@statsd(namespace="tasks") @statsd(namespace="tasks")
def create_nightly_billing(day_start=None): def create_nightly_billing(day_start=None):
# day_start is a datetime.date() object. e.g. # day_start is a datetime.date() object. e.g.
@@ -34,6 +36,7 @@ def create_nightly_billing(day_start=None):
@notify_celery.task(name="create-nightly-notification-status") @notify_celery.task(name="create-nightly-notification-status")
@cronitor("create-nightly-notification-status")
@statsd(namespace="tasks") @statsd(namespace="tasks")
def create_nightly_notification_status(day_start=None): def create_nightly_notification_status(day_start=None):
# day_start is a datetime.date() object. e.g. # day_start is a datetime.date() object. e.g.

View File

@@ -5,10 +5,6 @@ import json
from celery.schedules import crontab from celery.schedules import crontab
from kombu import Exchange, Queue from kombu import Exchange, Queue
from app.models import (
EMAIL_TYPE, SMS_TYPE, LETTER_TYPE,
)
if os.environ.get('VCAP_SERVICES'): if os.environ.get('VCAP_SERVICES'):
# on cloudfoundry, config is a json blob in VCAP_SERVICES - unpack it, and populate # on cloudfoundry, config is a json blob in VCAP_SERVICES - unpack it, and populate
# standard environment variables from it # standard environment variables from it
@@ -108,6 +104,10 @@ class Config(object):
DEBUG = False DEBUG = False
NOTIFY_LOG_PATH = os.getenv('NOTIFY_LOG_PATH') NOTIFY_LOG_PATH = os.getenv('NOTIFY_LOG_PATH')
# Cronitor
CRONITOR_ENABLED = False
CRONITOR_KEYS = json.loads(os.environ.get('CRONITOR_KEYS', '{}'))
########################### ###########################
# Default config values ### # Default config values ###
########################### ###########################
@@ -157,7 +157,12 @@ class Config(object):
CELERY_TIMEZONE = 'Europe/London' CELERY_TIMEZONE = 'Europe/London'
CELERY_ACCEPT_CONTENT = ['json'] CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json' CELERY_TASK_SERIALIZER = 'json'
CELERY_IMPORTS = ('app.celery.tasks', 'app.celery.scheduled_tasks', 'app.celery.reporting_tasks') CELERY_IMPORTS = (
'app.celery.tasks',
'app.celery.scheduled_tasks',
'app.celery.reporting_tasks',
'app.celery.nightly_tasks',
)
CELERYBEAT_SCHEDULE = { CELERYBEAT_SCHEDULE = {
# app/celery/scheduled_tasks.py # app/celery/scheduled_tasks.py
'run-scheduled-jobs': { 'run-scheduled-jobs': {
@@ -238,17 +243,15 @@ class Config(object):
'options': {'queue': QueueNames.PERIODIC} 'options': {'queue': QueueNames.PERIODIC}
}, },
'remove_sms_email_jobs': { 'remove_sms_email_jobs': {
'task': 'remove_csv_files', 'task': 'remove_sms_email_jobs',
'schedule': crontab(hour=4, minute=0), 'schedule': crontab(hour=4, minute=0),
'options': {'queue': QueueNames.PERIODIC}, 'options': {'queue': QueueNames.PERIODIC},
'kwargs': {'job_types': [EMAIL_TYPE, SMS_TYPE]}
}, },
'remove_letter_jobs': { 'remove_letter_jobs': {
'task': 'remove_csv_files', 'task': 'remove_letter_jobs',
'schedule': crontab(hour=4, minute=20), # this has to run AFTER remove_transformed_dvla_files 'schedule': crontab(hour=4, minute=20), # this has to run AFTER remove_transformed_dvla_files
# since we mark jobs as archived # since we mark jobs as archived
'options': {'queue': QueueNames.PERIODIC}, 'options': {'queue': QueueNames.PERIODIC},
'kwargs': {'job_types': [LETTER_TYPE]}
}, },
'raise-alert-if-letter-notifications-still-sending': { 'raise-alert-if-letter-notifications-still-sending': {
'task': 'raise-alert-if-letter-notifications-still-sending', 'task': 'raise-alert-if-letter-notifications-still-sending',
@@ -436,6 +439,8 @@ class Live(Config):
API_RATE_LIMIT_ENABLED = True API_RATE_LIMIT_ENABLED = True
CHECK_PROXY_HEADER = True CHECK_PROXY_HEADER = True
CRONITOR_ENABLED = True
class CloudFoundryConfig(Config): class CloudFoundryConfig(Config):
pass pass

50
app/cronitor.py Normal file
View File

@@ -0,0 +1,50 @@
import requests
from functools import wraps
from flask import current_app
def cronitor(task_name):
# check if task_name is in config
def decorator(func):
def ping_cronitor(command):
if not current_app.config['CRONITOR_ENABLED']:
return
task_slug = current_app.config['CRONITOR_KEYS'].get(task_name)
if not task_slug:
current_app.logger.error(
'Cronitor enabled but task_name {} not found in environment'.format(task_name)
)
if command not in {'run', 'complete', 'fail'}:
raise ValueError('command {} not a valid cronitor command'.format(command))
resp = requests.get(
'https://cronitor.link/{}/{}'.format(task_slug, command),
# cronitor limits msg to 1000 characters
params={
'host': current_app.config['API_HOST_NAME'],
}
)
if resp.status_code != 200:
current_app.logger.warning('Cronitor API returned {} for task {}, body {}'.format(
resp.status_code,
task_name,
resp.text
))
@wraps(func)
def inner_decorator(*args, **kwargs):
ping_cronitor('run')
try:
ret = func(*args, **kwargs)
status = 'complete'
return ret
except Exception:
status = 'fail'
raise
finally:
ping_cronitor(status)
return inner_decorator
return decorator

View File

@@ -22,6 +22,7 @@ env:
SECRET_KEY: null SECRET_KEY: null
ROUTE_SECRET_KEY_1: null ROUTE_SECRET_KEY_1: null
ROUTE_SECRET_KEY_2: null ROUTE_SECRET_KEY_2: null
CRONITOR_KEYS: null
PERFORMANCE_PLATFORM_ENDPOINTS: null PERFORMANCE_PLATFORM_ENDPOINTS: null

View File

@@ -20,6 +20,7 @@ env:
SECRET_KEY: null SECRET_KEY: null
ROUTE_SECRET_KEY_1: null ROUTE_SECRET_KEY_1: null
ROUTE_SECRET_KEY_2: null ROUTE_SECRET_KEY_2: null
CRONITOR_KEYS: null
PERFORMANCE_PLATFORM_ENDPOINTS: null PERFORMANCE_PLATFORM_ENDPOINTS: null

View File

@@ -20,10 +20,6 @@ from app import db
from tests.app.db import create_service, create_template, create_notification from tests.app.db import create_service, create_template, create_notification
def test_reporting_should_have_decorated_tasks_functions():
assert create_nightly_billing.__wrapped__.__name__ == 'create_nightly_billing'
def mocker_get_rate( def mocker_get_rate(
non_letter_rates, letter_rates, notification_type, date, crown=None, rate_multiplier=None, post_class="second" non_letter_rates, letter_rates, notification_type, date, crown=None, rate_multiplier=None, post_class="second"
): ):