diff --git a/app/__init__.py b/app/__init__.py index b33c78034..286fcf4bc 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,3 +1,4 @@ +import time import os import random import string @@ -8,12 +9,14 @@ from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy from flask_marshmallow import Marshmallow from flask_migrate import Migrate from gds_metrics import GDSMetrics +from gds_metrics.metrics import Gauge, Histogram from time import monotonic from notifications_utils.clients.zendesk.zendesk_client import ZendeskClient from notifications_utils.clients.statsd.statsd_client import StatsdClient from notifications_utils.clients.redis.redis_client import RedisClient from notifications_utils.clients.encryption.encryption_client import Encryption from notifications_utils import logging, request_helper +from sqlalchemy import event from werkzeug.exceptions import HTTPException as WerkzeugHTTPException from werkzeug.local import LocalProxy @@ -108,6 +111,9 @@ def create_app(application): from app.commands import setup_commands setup_commands(application) + # set up sqlalchemy events + setup_sqlalchemy_events(application) + return application @@ -255,17 +261,27 @@ def register_v2_blueprints(application): def init_app(app): + + CONCURRENT_REQUESTS = Gauge( + 'concurrent_web_request_count', + 'How many concurrent requests are currently being served', + ) + @app.before_request def record_user_agent(): statsd_client.incr("user-agent.{}".format(process_user_agent(request.headers.get('User-Agent', None)))) @app.before_request def record_request_details(): + CONCURRENT_REQUESTS.inc() + g.start = monotonic() g.endpoint = request.endpoint @app.after_request def after_request(response): + CONCURRENT_REQUESTS.dec() + response.headers.add('Access-Control-Allow-Origin', '*') response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization') response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE') @@ -311,3 +327,63 @@ def process_user_agent(user_agent_string): return "non-notify-user-agent" else: return "unknown" + + +def setup_sqlalchemy_events(app): + + TOTAL_DB_CONNECTIONS = Gauge( + 'db_connection_total_connected', + 'How many db connections are currently held (potentially idel) by the server', + ) + + TOTAL_CHECKED_OUT_DB_CONNECTIONS = Gauge( + 'db_connection_total_checked_out', + 'How many db connections are currently checked out by web requests', + ) + + DB_CONNECTION_OPEN_DURATION_SECONDS = Histogram( + 'db_connection_open_duration_seconds', + 'How long db connections are held open for in seconds', + ['method', 'host', 'path'] + ) + + # need this or db.engine isn't accessible + with app.app_context(): + @event.listens_for(db.engine, 'connect') + def connect(dbapi_connection, connection_record): + # connection first opened with db + TOTAL_DB_CONNECTIONS.inc() + + @event.listens_for(db.engine, 'close') + def close(dbapi_connection, connection_record): + # connection closed (probably only happens with overflow connections) + TOTAL_DB_CONNECTIONS.dec() + + @event.listens_for(db.engine, 'checkout') + def checkout(dbapi_connection, connection_record, connection_proxy): + # connection given to a web worker + TOTAL_CHECKED_OUT_DB_CONNECTIONS.inc() + + # this will overwrite any previous checkout_at timestamp + connection_record.info['checkout_at'] = time.monotonic() + + # checkin runs after the request is already torn down so we'll need to capture request info earlier + connection_record.info['request_data'] = { + 'method': request.method, + 'host': request.host, + 'url_rule': request.url_rule.rule if request.url_rule else 'No endpoint' + } + + @event.listens_for(db.engine, 'checkin') + def checkin(dbapi_connection, connection_record): + # connection returned by a web worker + TOTAL_CHECKED_OUT_DB_CONNECTIONS.dec() + + # duration that connection was held by a single web request + duration = time.monotonic() - connection_record.info['checkout_at'] + + DB_CONNECTION_OPEN_DURATION_SECONDS.labels( + connection_record.info['request_data']['method'], + connection_record.info['request_data']['host'], + connection_record.info['request_data']['url_rule'] + ).observe(duration) diff --git a/app/celery/celery.py b/app/celery/celery.py index 28ff6fb5f..5c43d770b 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -1,5 +1,6 @@ import time +from gds_metrics.metrics import Histogram from celery import Celery, Task from celery.signals import worker_process_shutdown from flask import g, request @@ -19,6 +20,12 @@ def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs): def make_task(app): + SQS_APPLY_ASYNC_DURATION = Histogram( + 'sqs_apply_async_duration', + 'Time taken to put task on queue', + ['task_name'] + ) + class NotifyTask(Task): abstract = True start = None @@ -52,7 +59,8 @@ def make_task(app): if has_request_context() and hasattr(request, 'request_id'): kwargs['request_id'] = request.request_id - return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options) + with SQS_APPLY_ASYNC_DURATION.labels(self.name).time(): + return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options) return NotifyTask