mirror of
https://github.com/GSA/notifications-api.git
synced 2026-04-04 01:19:31 -04:00
Merge pull request #2819 from alphagov/additional-prometheus-metrics
Additional prometheus metrics
This commit is contained in:
@@ -1,19 +1,23 @@
|
||||
import time
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
import uuid
|
||||
|
||||
from flask import _request_ctx_stack, request, g, jsonify, make_response
|
||||
from celery import current_task
|
||||
from flask import _request_ctx_stack, request, g, jsonify, make_response, current_app, has_request_context
|
||||
from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy
|
||||
from flask_marshmallow import Marshmallow
|
||||
from flask_migrate import Migrate
|
||||
from gds_metrics import GDSMetrics
|
||||
from gds_metrics.metrics import Gauge, Histogram
|
||||
from time import monotonic
|
||||
from notifications_utils.clients.zendesk.zendesk_client import ZendeskClient
|
||||
from notifications_utils.clients.statsd.statsd_client import StatsdClient
|
||||
from notifications_utils.clients.redis.redis_client import RedisClient
|
||||
from notifications_utils.clients.encryption.encryption_client import Encryption
|
||||
from notifications_utils import logging, request_helper
|
||||
from sqlalchemy import event
|
||||
from werkzeug.exceptions import HTTPException as WerkzeugHTTPException
|
||||
from werkzeug.local import LocalProxy
|
||||
|
||||
@@ -64,6 +68,11 @@ clients = Clients()
|
||||
api_user = LocalProxy(lambda: _request_ctx_stack.top.api_user)
|
||||
authenticated_service = LocalProxy(lambda: _request_ctx_stack.top.authenticated_service)
|
||||
|
||||
CONCURRENT_REQUESTS = Gauge(
|
||||
'concurrent_web_request_count',
|
||||
'How many concurrent requests are currently being served',
|
||||
)
|
||||
|
||||
|
||||
def create_app(application):
|
||||
from app.config import configs
|
||||
@@ -108,6 +117,9 @@ def create_app(application):
|
||||
from app.commands import setup_commands
|
||||
setup_commands(application)
|
||||
|
||||
# set up sqlalchemy events
|
||||
setup_sqlalchemy_events(application)
|
||||
|
||||
return application
|
||||
|
||||
|
||||
@@ -255,17 +267,22 @@ def register_v2_blueprints(application):
|
||||
|
||||
|
||||
def init_app(app):
|
||||
|
||||
@app.before_request
|
||||
def record_user_agent():
|
||||
statsd_client.incr("user-agent.{}".format(process_user_agent(request.headers.get('User-Agent', None))))
|
||||
|
||||
@app.before_request
|
||||
def record_request_details():
|
||||
CONCURRENT_REQUESTS.inc()
|
||||
|
||||
g.start = monotonic()
|
||||
g.endpoint = request.endpoint
|
||||
|
||||
@app.after_request
|
||||
def after_request(response):
|
||||
CONCURRENT_REQUESTS.dec()
|
||||
|
||||
response.headers.add('Access-Control-Allow-Origin', '*')
|
||||
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
|
||||
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE')
|
||||
@@ -311,3 +328,83 @@ def process_user_agent(user_agent_string):
|
||||
return "non-notify-user-agent"
|
||||
else:
|
||||
return "unknown"
|
||||
|
||||
|
||||
def setup_sqlalchemy_events(app):
|
||||
|
||||
TOTAL_DB_CONNECTIONS = Gauge(
|
||||
'db_connection_total_connected',
|
||||
'How many db connections are currently held (potentially idle) by the server',
|
||||
)
|
||||
|
||||
TOTAL_CHECKED_OUT_DB_CONNECTIONS = Gauge(
|
||||
'db_connection_total_checked_out',
|
||||
'How many db connections are currently checked out by web requests',
|
||||
)
|
||||
|
||||
DB_CONNECTION_OPEN_DURATION_SECONDS = Histogram(
|
||||
'db_connection_open_duration_seconds',
|
||||
'How long db connections are held open for in seconds',
|
||||
['method', 'host', 'path']
|
||||
)
|
||||
|
||||
# need this or db.engine isn't accessible
|
||||
with app.app_context():
|
||||
@event.listens_for(db.engine, 'connect')
|
||||
def connect(dbapi_connection, connection_record):
|
||||
# connection first opened with db
|
||||
TOTAL_DB_CONNECTIONS.inc()
|
||||
|
||||
@event.listens_for(db.engine, 'close')
|
||||
def close(dbapi_connection, connection_record):
|
||||
# connection closed (probably only happens with overflow connections)
|
||||
TOTAL_DB_CONNECTIONS.dec()
|
||||
|
||||
@event.listens_for(db.engine, 'checkout')
|
||||
def checkout(dbapi_connection, connection_record, connection_proxy):
|
||||
# connection given to a web worker
|
||||
TOTAL_CHECKED_OUT_DB_CONNECTIONS.inc()
|
||||
|
||||
# this will overwrite any previous checkout_at timestamp
|
||||
connection_record.info['checkout_at'] = time.monotonic()
|
||||
|
||||
# checkin runs after the request is already torn down, therefore we add the request_data onto the
|
||||
# connection_record as otherwise it won't have that information when checkin actually runs.
|
||||
# Note: this is not a problem for checkouts as the checkout always happens within a web request or task
|
||||
|
||||
# web requests
|
||||
if has_request_context():
|
||||
connection_record.info['request_data'] = {
|
||||
'method': request.method,
|
||||
'host': request.host,
|
||||
'url_rule': request.url_rule.rule if request.url_rule else 'No endpoint'
|
||||
}
|
||||
# celery apps
|
||||
elif current_task:
|
||||
connection_record.info['request_data'] = {
|
||||
'method': 'celery',
|
||||
'host': current_app.config['NOTIFY_APP_NAME'], # worker name
|
||||
'url_rule': current_task.name, # task name
|
||||
}
|
||||
# anything else. migrations possibly.
|
||||
else:
|
||||
current_app.logger.warning('Checked out sqlalchemy connection from outside of request/task')
|
||||
connection_record.info['request_data'] = {
|
||||
'method': 'unknown',
|
||||
'host': 'unknown',
|
||||
'url_rule': 'unknown',
|
||||
}
|
||||
|
||||
@event.listens_for(db.engine, 'checkin')
|
||||
def checkin(dbapi_connection, connection_record):
|
||||
# connection returned by a web worker
|
||||
TOTAL_CHECKED_OUT_DB_CONNECTIONS.dec()
|
||||
|
||||
# duration that connection was held by a single web request
|
||||
duration = time.monotonic() - connection_record.info['checkout_at']
|
||||
|
||||
DB_CONNECTION_OPEN_DURATION_SECONDS.labels(
|
||||
connection_record.info['request_data']['method'],
|
||||
connection_record.info['request_data']['host'],
|
||||
connection_record.info['request_data']['url_rule']
|
||||
).observe(duration)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import time
|
||||
|
||||
from gds_metrics.metrics import Histogram
|
||||
from celery import Celery, Task
|
||||
from celery.signals import worker_process_shutdown
|
||||
from flask import g, request
|
||||
@@ -19,6 +20,12 @@ def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs):
|
||||
|
||||
|
||||
def make_task(app):
|
||||
SQS_APPLY_ASYNC_DURATION_SECONDS = Histogram(
|
||||
'sqs_apply_async_duration_seconds',
|
||||
'Time taken to put task on queue',
|
||||
['task_name']
|
||||
)
|
||||
|
||||
class NotifyTask(Task):
|
||||
abstract = True
|
||||
start = None
|
||||
@@ -52,7 +59,8 @@ def make_task(app):
|
||||
if has_request_context() and hasattr(request, 'request_id'):
|
||||
kwargs['request_id'] = request.request_id
|
||||
|
||||
return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
|
||||
with SQS_APPLY_ASYNC_DURATION_SECONDS.labels(self.name).time():
|
||||
return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
|
||||
|
||||
return NotifyTask
|
||||
|
||||
|
||||
@@ -34,6 +34,15 @@ from app.dao.notifications_dao import (
|
||||
from app.v2.errors import BadRequestError
|
||||
|
||||
|
||||
from gds_metrics import Histogram
|
||||
|
||||
|
||||
REDIS_GET_AND_INCR_DAILY_LIMIT_DURATION_SECONDS = Histogram(
|
||||
'redis_get_and_incr_daily_limit_duration_seconds',
|
||||
'Time taken to get and possibly incremement the daily limit cache key',
|
||||
)
|
||||
|
||||
|
||||
def create_content_for_notification(template, personalisation):
|
||||
template_object = template._as_utils_template_with_personalisation(personalisation)
|
||||
check_placeholders(template_object)
|
||||
@@ -115,8 +124,9 @@ def persist_notification(
|
||||
if not simulated:
|
||||
dao_create_notification(notification)
|
||||
if key_type != KEY_TYPE_TEST:
|
||||
if redis_store.get(redis.daily_limit_cache_key(service.id)):
|
||||
redis_store.incr(redis.daily_limit_cache_key(service.id))
|
||||
with REDIS_GET_AND_INCR_DAILY_LIMIT_DURATION_SECONDS.time():
|
||||
if redis_store.get(redis.daily_limit_cache_key(service.id)):
|
||||
redis_store.incr(redis.daily_limit_cache_key(service.id))
|
||||
|
||||
current_app.logger.info(
|
||||
"{} {} created at {}".format(notification_type, notification_id, notification_created_at)
|
||||
|
||||
@@ -22,15 +22,24 @@ from app.utils import get_public_notify_type_text
|
||||
from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id
|
||||
from app.dao.service_letter_contact_dao import dao_get_letter_contact_by_id
|
||||
|
||||
from gds_metrics.metrics import Histogram
|
||||
|
||||
|
||||
REDIS_EXCEEDED_RATE_LIMIT_DURATION_SECONDS = Histogram(
|
||||
'redis_exceeded_rate_limit_duration_seconds',
|
||||
'Time taken to check rate limit',
|
||||
)
|
||||
|
||||
|
||||
def check_service_over_api_rate_limit(service, api_key):
|
||||
if current_app.config['API_RATE_LIMIT_ENABLED'] and current_app.config['REDIS_ENABLED']:
|
||||
cache_key = rate_limit_cache_key(service.id, api_key.key_type)
|
||||
rate_limit = service.rate_limit
|
||||
interval = 60
|
||||
if redis_store.exceeded_rate_limit(cache_key, rate_limit, interval):
|
||||
current_app.logger.info("service {} has been rate limited for throughput".format(service.id))
|
||||
raise RateLimitError(rate_limit, interval, api_key.key_type)
|
||||
with REDIS_EXCEEDED_RATE_LIMIT_DURATION_SECONDS.time():
|
||||
if redis_store.exceeded_rate_limit(cache_key, rate_limit, interval):
|
||||
current_app.logger.info("service {} has been rate limited for throughput".format(service.id))
|
||||
raise RateLimitError(rate_limit, interval, api_key.key_type)
|
||||
|
||||
|
||||
def check_service_over_daily_message_limit(key_type, service):
|
||||
|
||||
@@ -28,4 +28,6 @@ awscli-cwlogs>=1.4,<1.5
|
||||
|
||||
git+https://github.com/alphagov/notifications-utils.git@39.4.4#egg=notifications-utils==39.4.4
|
||||
|
||||
# gds-metrics requires prometheseus 0.2.0, override that requirement as 0.7.1 brings significant performance gains
|
||||
prometheus-client==0.7.1
|
||||
gds-metrics==0.2.0
|
||||
|
||||
@@ -30,6 +30,8 @@ awscli-cwlogs>=1.4,<1.5
|
||||
|
||||
git+https://github.com/alphagov/notifications-utils.git@39.4.4#egg=notifications-utils==39.4.4
|
||||
|
||||
# gds-metrics requires prometheseus 0.2.0, override that requirement as 0.7.1 brings significant performance gains
|
||||
prometheus-client==0.7.1
|
||||
gds-metrics==0.2.0
|
||||
|
||||
## The following requirements were added by pip freeze:
|
||||
@@ -66,7 +68,6 @@ mistune==0.8.4
|
||||
monotonic==1.5
|
||||
orderedset==2.0.1
|
||||
phonenumbers==8.11.2
|
||||
prometheus-client==0.2.0
|
||||
pyasn1==0.4.8
|
||||
pycparser==2.20
|
||||
PyPDF2==1.26.0
|
||||
|
||||
@@ -4,7 +4,7 @@ from sqlalchemy.exc import DataError
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def app_for_test(mocker):
|
||||
def app_for_test():
|
||||
import flask
|
||||
from flask import Blueprint
|
||||
from app.authentication.auth import AuthError
|
||||
|
||||
Reference in New Issue
Block a user