From 6e32ca59963b53a8a404b9362a56fc689d6fd4c2 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 24 Apr 2020 14:36:21 +0100 Subject: [PATCH 1/7] add prometheus metrics for connection pools (both web and sql) add the following prometheus metrics to keep track of general app instance health. # sqs_apply_async_duration how long does the actual SQS call (a standard web request to AWS) take. a histogram with default bucket sizes, split up per task that was created. # concurrent_web_request_count how many web requests is this app currently serving. this is split up per process, so we'd expect multiple responses per app instance # db_connection_total_connected how many connections does this app (process) have open to the database. They might be idle. # db_connection_total_checked_out how many connections does this app (process) have open that are currently in use by a web worker # db_connection_open_duration_seconds a histogram per endpoint of how long the db connection was taken from the pool for. won't have any data if a connection was never opened. --- app/__init__.py | 76 ++++++++++++++++++++++++++++++++++++++++++++ app/celery/celery.py | 10 +++++- 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/app/__init__.py b/app/__init__.py index b33c78034..286fcf4bc 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,3 +1,4 @@ +import time import os import random import string @@ -8,12 +9,14 @@ from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy from flask_marshmallow import Marshmallow from flask_migrate import Migrate from gds_metrics import GDSMetrics +from gds_metrics.metrics import Gauge, Histogram from time import monotonic from notifications_utils.clients.zendesk.zendesk_client import ZendeskClient from notifications_utils.clients.statsd.statsd_client import StatsdClient from notifications_utils.clients.redis.redis_client import RedisClient from notifications_utils.clients.encryption.encryption_client import Encryption from notifications_utils import logging, request_helper +from sqlalchemy import event from werkzeug.exceptions import HTTPException as WerkzeugHTTPException from werkzeug.local import LocalProxy @@ -108,6 +111,9 @@ def create_app(application): from app.commands import setup_commands setup_commands(application) + # set up sqlalchemy events + setup_sqlalchemy_events(application) + return application @@ -255,17 +261,27 @@ def register_v2_blueprints(application): def init_app(app): + + CONCURRENT_REQUESTS = Gauge( + 'concurrent_web_request_count', + 'How many concurrent requests are currently being served', + ) + @app.before_request def record_user_agent(): statsd_client.incr("user-agent.{}".format(process_user_agent(request.headers.get('User-Agent', None)))) @app.before_request def record_request_details(): + CONCURRENT_REQUESTS.inc() + g.start = monotonic() g.endpoint = request.endpoint @app.after_request def after_request(response): + CONCURRENT_REQUESTS.dec() + response.headers.add('Access-Control-Allow-Origin', '*') response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization') response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE') @@ -311,3 +327,63 @@ def process_user_agent(user_agent_string): return "non-notify-user-agent" else: return "unknown" + + +def setup_sqlalchemy_events(app): + + TOTAL_DB_CONNECTIONS = Gauge( + 'db_connection_total_connected', + 'How many db connections are currently held (potentially idel) by the server', + ) + + TOTAL_CHECKED_OUT_DB_CONNECTIONS = Gauge( + 'db_connection_total_checked_out', + 'How many db connections are currently checked out by web requests', + ) + + DB_CONNECTION_OPEN_DURATION_SECONDS = Histogram( + 'db_connection_open_duration_seconds', + 'How long db connections are held open for in seconds', + ['method', 'host', 'path'] + ) + + # need this or db.engine isn't accessible + with app.app_context(): + @event.listens_for(db.engine, 'connect') + def connect(dbapi_connection, connection_record): + # connection first opened with db + TOTAL_DB_CONNECTIONS.inc() + + @event.listens_for(db.engine, 'close') + def close(dbapi_connection, connection_record): + # connection closed (probably only happens with overflow connections) + TOTAL_DB_CONNECTIONS.dec() + + @event.listens_for(db.engine, 'checkout') + def checkout(dbapi_connection, connection_record, connection_proxy): + # connection given to a web worker + TOTAL_CHECKED_OUT_DB_CONNECTIONS.inc() + + # this will overwrite any previous checkout_at timestamp + connection_record.info['checkout_at'] = time.monotonic() + + # checkin runs after the request is already torn down so we'll need to capture request info earlier + connection_record.info['request_data'] = { + 'method': request.method, + 'host': request.host, + 'url_rule': request.url_rule.rule if request.url_rule else 'No endpoint' + } + + @event.listens_for(db.engine, 'checkin') + def checkin(dbapi_connection, connection_record): + # connection returned by a web worker + TOTAL_CHECKED_OUT_DB_CONNECTIONS.dec() + + # duration that connection was held by a single web request + duration = time.monotonic() - connection_record.info['checkout_at'] + + DB_CONNECTION_OPEN_DURATION_SECONDS.labels( + connection_record.info['request_data']['method'], + connection_record.info['request_data']['host'], + connection_record.info['request_data']['url_rule'] + ).observe(duration) diff --git a/app/celery/celery.py b/app/celery/celery.py index 28ff6fb5f..5c43d770b 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -1,5 +1,6 @@ import time +from gds_metrics.metrics import Histogram from celery import Celery, Task from celery.signals import worker_process_shutdown from flask import g, request @@ -19,6 +20,12 @@ def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs): def make_task(app): + SQS_APPLY_ASYNC_DURATION = Histogram( + 'sqs_apply_async_duration', + 'Time taken to put task on queue', + ['task_name'] + ) + class NotifyTask(Task): abstract = True start = None @@ -52,7 +59,8 @@ def make_task(app): if has_request_context() and hasattr(request, 'request_id'): kwargs['request_id'] = request.request_id - return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options) + with SQS_APPLY_ASYNC_DURATION.labels(self.name).time(): + return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options) return NotifyTask From c4dc0f64c5378158e46200d8df4bb79f780ed569 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 24 Apr 2020 15:15:41 +0100 Subject: [PATCH 2/7] dd sqlalchemy connection metrics for celery tasks grab the worker app name and task name rather than the web host and endpoint. also add a fallback for if we're not in a web request or a celery task. I think that'll probably happen when we use alembic, or if we do things from within flask shell --- app/__init__.py | 36 +++++++++++++++++++++++++++++------- app/celery/celery.py | 6 +++--- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 286fcf4bc..236a81a6c 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -4,7 +4,8 @@ import random import string import uuid -from flask import _request_ctx_stack, request, g, jsonify, make_response +from celery import current_task +from flask import _request_ctx_stack, request, g, jsonify, make_response, current_app, has_request_context from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy from flask_marshmallow import Marshmallow from flask_migrate import Migrate @@ -367,12 +368,33 @@ def setup_sqlalchemy_events(app): # this will overwrite any previous checkout_at timestamp connection_record.info['checkout_at'] = time.monotonic() - # checkin runs after the request is already torn down so we'll need to capture request info earlier - connection_record.info['request_data'] = { - 'method': request.method, - 'host': request.host, - 'url_rule': request.url_rule.rule if request.url_rule else 'No endpoint' - } + # checkin runs after the request is already torn down so we'll need to capture request info earlier. + # checkout runs when the connection is first used, ie: when we first make a query, so within the request or + # task + + # web requests + if has_request_context(): + connection_record.info['request_data'] = { + 'method': request.method, + 'host': request.host, + 'url_rule': request.url_rule.rule if request.url_rule else 'No endpoint' + } + # celery apps + elif current_task: + print(current_task) + connection_record.info['request_data'] = { + 'method': 'celery', + 'host': current_app.config['NOTIFY_APP_NAME'], # worker name + 'url_rule': current_task.name, # task name + } + # anything else. migrations possibly. + else: + current_app.logger.warning('Checked out sqlalchemy connection from outside of request/task') + connection_record.info['request_data'] = { + 'method': 'unknown', + 'host': 'unknown', + 'url_rule': 'unknown', + } @event.listens_for(db.engine, 'checkin') def checkin(dbapi_connection, connection_record): diff --git a/app/celery/celery.py b/app/celery/celery.py index 5c43d770b..76e4f464e 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -20,8 +20,8 @@ def log_on_worker_shutdown(sender, signal, pid, exitcode, **kwargs): def make_task(app): - SQS_APPLY_ASYNC_DURATION = Histogram( - 'sqs_apply_async_duration', + SQS_APPLY_ASYNC_DURATION_SECONDS = Histogram( + 'sqs_apply_async_duration_seconds', 'Time taken to put task on queue', ['task_name'] ) @@ -59,7 +59,7 @@ def make_task(app): if has_request_context() and hasattr(request, 'request_id'): kwargs['request_id'] = request.request_id - with SQS_APPLY_ASYNC_DURATION.labels(self.name).time(): + with SQS_APPLY_ASYNC_DURATION_SECONDS.labels(self.name).time(): return super().apply_async(args, kwargs, task_id, producer, link, link_error, **options) return NotifyTask From faa8faa0c40902b56725e7e6adc65545ed59bfae Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 24 Apr 2020 16:18:44 +0100 Subject: [PATCH 3/7] bump prometheus-client to 0.7.1 there's multiple performance improvements from prometheus-client 0.2.0. pin this bump while we wait for gds metrics client to increase its dependency --- requirements-app.txt | 2 ++ requirements.txt | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/requirements-app.txt b/requirements-app.txt index 7b693e54f..15cb80641 100644 --- a/requirements-app.txt +++ b/requirements-app.txt @@ -28,4 +28,6 @@ awscli-cwlogs>=1.4,<1.5 git+https://github.com/alphagov/notifications-utils.git@39.4.4#egg=notifications-utils==39.4.4 +# gds-metrics requires prometheseus 0.2.0, override that requirement as 0.7.1 brings significant performance gains +prometheus-client==0.7.1 gds-metrics==0.2.0 diff --git a/requirements.txt b/requirements.txt index 330b5f583..5a114515e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -30,6 +30,8 @@ awscli-cwlogs>=1.4,<1.5 git+https://github.com/alphagov/notifications-utils.git@39.4.4#egg=notifications-utils==39.4.4 +# gds-metrics requires prometheseus 0.2.0, override that requirement as 0.7.1 brings significant performance gains +prometheus-client==0.7.1 gds-metrics==0.2.0 ## The following requirements were added by pip freeze: @@ -66,7 +68,6 @@ mistune==0.8.4 monotonic==1.5 orderedset==2.0.1 phonenumbers==8.11.2 -prometheus-client==0.2.0 pyasn1==0.4.8 pycparser==2.20 PyPDF2==1.26.0 From cd9b80f415dba8445b5bc393dd02847afb18d94a Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Mon, 27 Apr 2020 15:27:08 +0100 Subject: [PATCH 4/7] set test_errors app fixture to session scope we have one global metrics variable `metrics = GDSMetrics()`, and we then call `metrics.init_app` from within the flask application set up. The v2/test_errors.py app_for_test fixture calls create_app, would call metrics.init_app multiple times for the same metrics instance. This causes errors, so change the fixture to session level so it only calls once per test run. --- app/__init__.py | 6 +++--- tests/app/v2/test_errors.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 236a81a6c..aefc92d08 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -368,9 +368,9 @@ def setup_sqlalchemy_events(app): # this will overwrite any previous checkout_at timestamp connection_record.info['checkout_at'] = time.monotonic() - # checkin runs after the request is already torn down so we'll need to capture request info earlier. - # checkout runs when the connection is first used, ie: when we first make a query, so within the request or - # task + # checkin runs after the request is already torn down, therefore we add the request_data onto the + # connection_record as otherwise it won't have that information when checkin actually runs. + # Note: this is not a problem for checkouts as the checkout always happens within a web request or task # web requests if has_request_context(): diff --git a/tests/app/v2/test_errors.py b/tests/app/v2/test_errors.py index 1b3416bce..7c1c6b09a 100644 --- a/tests/app/v2/test_errors.py +++ b/tests/app/v2/test_errors.py @@ -3,8 +3,8 @@ from flask import url_for from sqlalchemy.exc import DataError -@pytest.fixture(scope='function') -def app_for_test(mocker): +@pytest.fixture(scope='session') +def app_for_test(): import flask from flask import Blueprint from app.authentication.auth import AuthError From d9b3b31a6a0c1af683830452ab31e2fee7fd5d63 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Mon, 27 Apr 2020 17:05:40 +0100 Subject: [PATCH 5/7] add loadtesting to manifest so we can deploy a separate app --- manifest.yml.j2 | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/manifest.yml.j2 b/manifest.yml.j2 index 80c991309..42a21862d 100644 --- a/manifest.yml.j2 +++ b/manifest.yml.j2 @@ -25,6 +25,23 @@ }, }, + 'notify-api-loadtesting': { + 'comments': 'DELETE ME LATER', + 'NOTIFY_APP_NAME': 'api', + 'disk_quota': '2G', + 'sqlalchemy_pool_size': 30, + 'routes': { + 'staging': [], + }, + 'health-check-type': 'port', + 'health-check-invocation-timeout': 3, + 'instances': { + 'preview': None, + 'staging': 4, + 'production': None + }, + }, + 'notify-delivery-celery-beat': {'memory': '128M'}, 'notify-delivery-worker-jobs': {}, 'notify-delivery-worker-research': {}, @@ -70,7 +87,7 @@ applications: services: - notify-db - logit-ssl-syslog-drain - {% if CF_APP == 'notify-api' %} + {% if CF_APP in ('notify-api', 'notify-api-loadtesting') %} - notify-prometheus {% endif %} From 15ce9fe3f92e081e25a01d96a585a981bd76f1f1 Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Wed, 13 May 2020 11:06:27 +0100 Subject: [PATCH 6/7] add metrics for redis timings --- app/notifications/process_notifications.py | 14 ++++++++++++-- app/notifications/validators.py | 15 ++++++++++++--- gunicorn_config.py | 2 +- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/app/notifications/process_notifications.py b/app/notifications/process_notifications.py index 54d222891..3e8be9a89 100644 --- a/app/notifications/process_notifications.py +++ b/app/notifications/process_notifications.py @@ -34,6 +34,15 @@ from app.dao.notifications_dao import ( from app.v2.errors import BadRequestError +from gds_metrics import Histogram + + +REDIS_GET_AND_INCR_DAILY_LIMIT_DURATION_SECONDS = Histogram( + 'redis_get_and_incr_daily_limit_duration_seconds', + 'Time taken to get and possibly incremement the daily limit cache key', +) + + def create_content_for_notification(template, personalisation): template_object = template._as_utils_template_with_personalisation(personalisation) check_placeholders(template_object) @@ -115,8 +124,9 @@ def persist_notification( if not simulated: dao_create_notification(notification) if key_type != KEY_TYPE_TEST: - if redis_store.get(redis.daily_limit_cache_key(service.id)): - redis_store.incr(redis.daily_limit_cache_key(service.id)) + with REDIS_GET_AND_INCR_DAILY_LIMIT_DURATION_SECONDS.time(): + if redis_store.get(redis.daily_limit_cache_key(service.id)): + redis_store.incr(redis.daily_limit_cache_key(service.id)) current_app.logger.info( "{} {} created at {}".format(notification_type, notification_id, notification_created_at) diff --git a/app/notifications/validators.py b/app/notifications/validators.py index 3b9cf6c62..fb5880064 100644 --- a/app/notifications/validators.py +++ b/app/notifications/validators.py @@ -22,15 +22,24 @@ from app.utils import get_public_notify_type_text from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id from app.dao.service_letter_contact_dao import dao_get_letter_contact_by_id +from gds_metrics.metrics import Histogram + + +REDIS_EXCEEDED_RATE_LIMIT_DURATION_SECONDS = Histogram( + 'redis_exceeded_rate_limit_duration_seconds', + 'Time taken to check rate limit', +) + def check_service_over_api_rate_limit(service, api_key): if current_app.config['API_RATE_LIMIT_ENABLED'] and current_app.config['REDIS_ENABLED']: cache_key = rate_limit_cache_key(service.id, api_key.key_type) rate_limit = service.rate_limit interval = 60 - if redis_store.exceeded_rate_limit(cache_key, rate_limit, interval): - current_app.logger.info("service {} has been rate limited for throughput".format(service.id)) - raise RateLimitError(rate_limit, interval, api_key.key_type) + with REDIS_EXCEEDED_RATE_LIMIT_DURATION_SECONDS.time(): + if redis_store.exceeded_rate_limit(cache_key, rate_limit, interval): + current_app.logger.info("service {} has been rate limited for throughput".format(service.id)) + raise RateLimitError(rate_limit, interval, api_key.key_type) def check_service_over_daily_message_limit(key_type, service): diff --git a/gunicorn_config.py b/gunicorn_config.py index d32eaba2d..87db63ded 100644 --- a/gunicorn_config.py +++ b/gunicorn_config.py @@ -7,7 +7,7 @@ from gds_metrics.gunicorn import child_exit # noqa workers = 4 worker_class = "eventlet" -worker_connections = 256 +worker_connections = 30 errorlog = "/home/vcap/logs/gunicorn_error.log" bind = "0.0.0.0:{}".format(os.getenv("PORT")) statsd_host = "{}:8125".format(os.getenv("STATSD_HOST")) From 8b4a424df105313fc7d36ebf29b9ecdcb238fd95 Mon Sep 17 00:00:00 2001 From: David McDonald Date: Fri, 12 Jun 2020 14:52:04 +0100 Subject: [PATCH 7/7] Tidy up --- app/__init__.py | 13 ++++++------- gunicorn_config.py | 2 +- manifest.yml.j2 | 19 +------------------ tests/app/v2/test_errors.py | 2 +- 4 files changed, 9 insertions(+), 27 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index aefc92d08..62fe06c50 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -68,6 +68,11 @@ clients = Clients() api_user = LocalProxy(lambda: _request_ctx_stack.top.api_user) authenticated_service = LocalProxy(lambda: _request_ctx_stack.top.authenticated_service) +CONCURRENT_REQUESTS = Gauge( + 'concurrent_web_request_count', + 'How many concurrent requests are currently being served', +) + def create_app(application): from app.config import configs @@ -263,11 +268,6 @@ def register_v2_blueprints(application): def init_app(app): - CONCURRENT_REQUESTS = Gauge( - 'concurrent_web_request_count', - 'How many concurrent requests are currently being served', - ) - @app.before_request def record_user_agent(): statsd_client.incr("user-agent.{}".format(process_user_agent(request.headers.get('User-Agent', None)))) @@ -334,7 +334,7 @@ def setup_sqlalchemy_events(app): TOTAL_DB_CONNECTIONS = Gauge( 'db_connection_total_connected', - 'How many db connections are currently held (potentially idel) by the server', + 'How many db connections are currently held (potentially idle) by the server', ) TOTAL_CHECKED_OUT_DB_CONNECTIONS = Gauge( @@ -381,7 +381,6 @@ def setup_sqlalchemy_events(app): } # celery apps elif current_task: - print(current_task) connection_record.info['request_data'] = { 'method': 'celery', 'host': current_app.config['NOTIFY_APP_NAME'], # worker name diff --git a/gunicorn_config.py b/gunicorn_config.py index 87db63ded..d32eaba2d 100644 --- a/gunicorn_config.py +++ b/gunicorn_config.py @@ -7,7 +7,7 @@ from gds_metrics.gunicorn import child_exit # noqa workers = 4 worker_class = "eventlet" -worker_connections = 30 +worker_connections = 256 errorlog = "/home/vcap/logs/gunicorn_error.log" bind = "0.0.0.0:{}".format(os.getenv("PORT")) statsd_host = "{}:8125".format(os.getenv("STATSD_HOST")) diff --git a/manifest.yml.j2 b/manifest.yml.j2 index 42a21862d..80c991309 100644 --- a/manifest.yml.j2 +++ b/manifest.yml.j2 @@ -25,23 +25,6 @@ }, }, - 'notify-api-loadtesting': { - 'comments': 'DELETE ME LATER', - 'NOTIFY_APP_NAME': 'api', - 'disk_quota': '2G', - 'sqlalchemy_pool_size': 30, - 'routes': { - 'staging': [], - }, - 'health-check-type': 'port', - 'health-check-invocation-timeout': 3, - 'instances': { - 'preview': None, - 'staging': 4, - 'production': None - }, - }, - 'notify-delivery-celery-beat': {'memory': '128M'}, 'notify-delivery-worker-jobs': {}, 'notify-delivery-worker-research': {}, @@ -87,7 +70,7 @@ applications: services: - notify-db - logit-ssl-syslog-drain - {% if CF_APP in ('notify-api', 'notify-api-loadtesting') %} + {% if CF_APP == 'notify-api' %} - notify-prometheus {% endif %} diff --git a/tests/app/v2/test_errors.py b/tests/app/v2/test_errors.py index 7c1c6b09a..390afb605 100644 --- a/tests/app/v2/test_errors.py +++ b/tests/app/v2/test_errors.py @@ -3,7 +3,7 @@ from flask import url_for from sqlalchemy.exc import DataError -@pytest.fixture(scope='session') +@pytest.fixture(scope='function') def app_for_test(): import flask from flask import Blueprint