Files
notifications-api/app/celery/celery.py
Leo Hemsted 6e32ca5996 add prometheus metrics for connection pools (both web and sql)
add the following prometheus metrics to keep track of general app
instance health.

 # sqs_apply_async_duration

how long does the actual SQS call (a standard web request to AWS) take.
a histogram with default bucket sizes, split up per task that was
created.

 # concurrent_web_request_count

how many web requests is this app currently serving. this is split up
per process, so we'd expect multiple responses per app instance

 # db_connection_total_connected

how many connections does this app (process) have open to the database.
They might be idle.

 # db_connection_total_checked_out

how many connections does this app (process) have open that are
currently in use by a web worker

 # db_connection_open_duration_seconds

a histogram per endpoint of how long the db connection was taken from
the pool for. won't have any data if a connection was never opened.
2020-06-12 14:52:22 +01:00

79 lines
2.8 KiB
Python

import time
from gds_metrics.metrics import Histogram
from celery import Celery, Task
from celery.signals import worker_process_shutdown
from flask import g, request
from flask.ctx import has_request_context
@worker_process_shutdown.connect
def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs):
# imported here to avoid circular imports
from app import notify_celery
# if the worker has already restarted at least once, then we no longer have app context and current_app won't work
# to create a new one. Instead we have to create a new app context from the original flask app and use that instead.
with notify_celery._app.app_context():
# if the worker has restarted
notify_celery._app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode))
def make_task(app):
SQS_APPLY_ASYNC_DURATION = Histogram(
'sqs_apply_async_duration',
'Time taken to put task on queue',
['task_name']
)
class NotifyTask(Task):
abstract = True
start = None
def on_success(self, retval, task_id, args, kwargs):
elapsed_time = time.time() - self.start
app.logger.info(
"{task_name} took {time}".format(
task_name=self.name, time="{0:.4f}".format(elapsed_time)
)
)
def on_failure(self, exc, task_id, args, kwargs, einfo):
# ensure task will log exceptions to correct handlers
app.logger.exception('Celery task: {} failed'.format(self.name))
super().on_failure(exc, task_id, args, kwargs, einfo)
def __call__(self, *args, **kwargs):
# ensure task has flask context to access config, logger, etc
with app.app_context():
self.start = time.time()
# Remove 'request_id' from the kwargs (so the task doesn't get an unexpected kwarg), then add it to g
# so that it gets logged
g.request_id = kwargs.pop('request_id', None)
return super().__call__(*args, **kwargs)
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, **options):
kwargs = kwargs or {}
if has_request_context() and hasattr(request, 'request_id'):
kwargs['request_id'] = request.request_id
with SQS_APPLY_ASYNC_DURATION.labels(self.name).time():
return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
return NotifyTask
class NotifyCelery(Celery):
def init_app(self, app):
super().__init__(
app.import_name,
broker=app.config['BROKER_URL'],
task_cls=make_task(app),
)
self.conf.update(app.config)
self._app = app