mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-23 08:51:30 -05:00
dd sqlalchemy connection metrics for celery tasks
grab the worker app name and task name rather than the web host and endpoint. also add a fallback for if we're not in a web request or a celery task. I think that'll probably happen when we use alembic, or if we do things from within flask shell
This commit is contained in:
committed by
David McDonald
parent
6e32ca5996
commit
c4dc0f64c5
@@ -4,7 +4,8 @@ import random
|
|||||||
import string
|
import string
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from flask import _request_ctx_stack, request, g, jsonify, make_response
|
from celery import current_task
|
||||||
|
from flask import _request_ctx_stack, request, g, jsonify, make_response, current_app, has_request_context
|
||||||
from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy
|
from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy
|
||||||
from flask_marshmallow import Marshmallow
|
from flask_marshmallow import Marshmallow
|
||||||
from flask_migrate import Migrate
|
from flask_migrate import Migrate
|
||||||
@@ -367,12 +368,33 @@ def setup_sqlalchemy_events(app):
|
|||||||
# this will overwrite any previous checkout_at timestamp
|
# this will overwrite any previous checkout_at timestamp
|
||||||
connection_record.info['checkout_at'] = time.monotonic()
|
connection_record.info['checkout_at'] = time.monotonic()
|
||||||
|
|
||||||
# checkin runs after the request is already torn down so we'll need to capture request info earlier
|
# checkin runs after the request is already torn down so we'll need to capture request info earlier.
|
||||||
connection_record.info['request_data'] = {
|
# checkout runs when the connection is first used, ie: when we first make a query, so within the request or
|
||||||
'method': request.method,
|
# task
|
||||||
'host': request.host,
|
|
||||||
'url_rule': request.url_rule.rule if request.url_rule else 'No endpoint'
|
# web requests
|
||||||
}
|
if has_request_context():
|
||||||
|
connection_record.info['request_data'] = {
|
||||||
|
'method': request.method,
|
||||||
|
'host': request.host,
|
||||||
|
'url_rule': request.url_rule.rule if request.url_rule else 'No endpoint'
|
||||||
|
}
|
||||||
|
# celery apps
|
||||||
|
elif current_task:
|
||||||
|
print(current_task)
|
||||||
|
connection_record.info['request_data'] = {
|
||||||
|
'method': 'celery',
|
||||||
|
'host': current_app.config['NOTIFY_APP_NAME'], # worker name
|
||||||
|
'url_rule': current_task.name, # task name
|
||||||
|
}
|
||||||
|
# anything else. migrations possibly.
|
||||||
|
else:
|
||||||
|
current_app.logger.warning('Checked out sqlalchemy connection from outside of request/task')
|
||||||
|
connection_record.info['request_data'] = {
|
||||||
|
'method': 'unknown',
|
||||||
|
'host': 'unknown',
|
||||||
|
'url_rule': 'unknown',
|
||||||
|
}
|
||||||
|
|
||||||
@event.listens_for(db.engine, 'checkin')
|
@event.listens_for(db.engine, 'checkin')
|
||||||
def checkin(dbapi_connection, connection_record):
|
def checkin(dbapi_connection, connection_record):
|
||||||
|
|||||||
@@ -20,8 +20,8 @@ def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs):
|
|||||||
|
|
||||||
|
|
||||||
def make_task(app):
|
def make_task(app):
|
||||||
SQS_APPLY_ASYNC_DURATION = Histogram(
|
SQS_APPLY_ASYNC_DURATION_SECONDS = Histogram(
|
||||||
'sqs_apply_async_duration',
|
'sqs_apply_async_duration_seconds',
|
||||||
'Time taken to put task on queue',
|
'Time taken to put task on queue',
|
||||||
['task_name']
|
['task_name']
|
||||||
)
|
)
|
||||||
@@ -59,7 +59,7 @@ def make_task(app):
|
|||||||
if has_request_context() and hasattr(request, 'request_id'):
|
if has_request_context() and hasattr(request, 'request_id'):
|
||||||
kwargs['request_id'] = request.request_id
|
kwargs['request_id'] = request.request_id
|
||||||
|
|
||||||
with SQS_APPLY_ASYNC_DURATION.labels(self.name).time():
|
with SQS_APPLY_ASYNC_DURATION_SECONDS.labels(self.name).time():
|
||||||
return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
|
return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
|
||||||
|
|
||||||
return NotifyTask
|
return NotifyTask
|
||||||
|
|||||||
Reference in New Issue
Block a user