import os import secrets import string import time import uuid from time import monotonic from celery import current_task from flask import ( current_app, g, has_request_context, jsonify, make_response, request, ) from flask_marshmallow import Marshmallow from flask_migrate import Migrate from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy from gds_metrics import GDSMetrics from gds_metrics.metrics import Gauge, Histogram from notifications_utils import logging, request_helper from notifications_utils.celery import NotifyCelery from notifications_utils.clients.encryption.encryption_client import Encryption from notifications_utils.clients.redis.redis_client import RedisClient from notifications_utils.clients.statsd.statsd_client import StatsdClient from notifications_utils.clients.zendesk.zendesk_client import ZendeskClient from sqlalchemy import event from werkzeug.exceptions import HTTPException as WerkzeugHTTPException from werkzeug.local import LocalProxy from app.clients import NotificationProviderClients from app.clients.document_download import DocumentDownloadClient from app.clients.email.aws_ses import AwsSesClient from app.clients.email.aws_ses_stub import AwsSesStubClient from app.clients.sms.aws_sns import AwsSnsClient class SQLAlchemy(_SQLAlchemy): """We need to subclass SQLAlchemy in order to override create_engine options""" def apply_driver_hacks(self, app, info, options): sa_url, options = super().apply_driver_hacks(app, info, options) if 'connect_args' not in options: options['connect_args'] = {} options['connect_args']["options"] = "-c statement_timeout={}".format( int(app.config['SQLALCHEMY_STATEMENT_TIMEOUT']) * 1000 ) return (sa_url, options) db = SQLAlchemy() migrate = Migrate() ma = Marshmallow() notify_celery = NotifyCelery() aws_ses_client = AwsSesClient() aws_ses_stub_client = AwsSesStubClient() aws_sns_client = AwsSnsClient() encryption = Encryption() zendesk_client = ZendeskClient() statsd_client = StatsdClient() redis_store = RedisClient() document_download_client = DocumentDownloadClient() metrics = GDSMetrics() notification_provider_clients = NotificationProviderClients() api_user = LocalProxy(lambda: g.api_user) authenticated_service = LocalProxy(lambda: g.authenticated_service) CONCURRENT_REQUESTS = Gauge( 'concurrent_web_request_count', 'How many concurrent requests are currently being served', ) def create_app(application): from app.config import configs notify_environment = os.environ['NOTIFY_ENVIRONMENT'] application.config.from_object(configs[notify_environment]) application.config['NOTIFY_APP_NAME'] = application.name init_app(application) # Metrics intentionally high up to give the most accurate timing and reliability that the metric is recorded metrics.init_app(application) request_helper.init_app(application) db.init_app(application) migrate.init_app(application, db=db) ma.init_app(application) zendesk_client.init_app(application) statsd_client.init_app(application) logging.init_app(application) aws_sns_client.init_app(application, statsd_client=statsd_client) aws_ses_client.init_app(statsd_client=statsd_client) aws_ses_stub_client.init_app( statsd_client=statsd_client, stub_url=application.config['SES_STUB_URL'] ) # If a stub url is provided for SES, then use the stub client rather than the real SES boto client email_clients = [aws_ses_stub_client] if application.config['SES_STUB_URL'] else [aws_ses_client] notification_provider_clients.init_app( sms_clients=[aws_sns_client], email_clients=email_clients ) notify_celery.init_app(application) encryption.init_app(application) redis_store.init_app(application) document_download_client.init_app(application) register_blueprint(application) register_v2_blueprints(application) # avoid circular imports by importing this file later from app.commands import setup_commands setup_commands(application) # set up sqlalchemy events setup_sqlalchemy_events(application) return application def register_blueprint(application): from app.authentication.auth import ( requires_admin_auth, requires_auth, requires_no_auth, ) from app.billing.rest import billing_blueprint from app.complaint.complaint_rest import complaint_blueprint from app.docs import docs as docs_blueprint from app.email_branding.rest import email_branding_blueprint from app.events.rest import events as events_blueprint from app.inbound_number.rest import inbound_number_blueprint from app.inbound_sms.rest import inbound_sms as inbound_sms_blueprint from app.job.rest import job_blueprint from app.notifications.notifications_ses_callback import ( ses_callback_blueprint, ) from app.notifications.receive_notifications import ( receive_notifications_blueprint, ) from app.notifications.rest import notifications as notifications_blueprint from app.organisation.invite_rest import organisation_invite_blueprint from app.organisation.rest import organisation_blueprint from app.performance_dashboard.rest import performance_dashboard_blueprint from app.platform_stats.rest import platform_stats_blueprint from app.provider_details.rest import ( provider_details as provider_details_blueprint, ) from app.service.callback_rest import service_callback_blueprint from app.service.rest import service_blueprint from app.service_invite.rest import ( service_invite as service_invite_blueprint, ) from app.status.healthcheck import status as status_blueprint from app.template.rest import template_blueprint from app.template_folder.rest import template_folder_blueprint from app.template_statistics.rest import ( template_statistics as template_statistics_blueprint, ) from app.upload.rest import upload_blueprint from app.user.rest import user_blueprint from app.webauthn.rest import webauthn_blueprint service_blueprint.before_request(requires_admin_auth) application.register_blueprint(service_blueprint, url_prefix='/service') user_blueprint.before_request(requires_admin_auth) application.register_blueprint(user_blueprint, url_prefix='/user') webauthn_blueprint.before_request(requires_admin_auth) application.register_blueprint(webauthn_blueprint) template_blueprint.before_request(requires_admin_auth) application.register_blueprint(template_blueprint) status_blueprint.before_request(requires_no_auth) application.register_blueprint(status_blueprint) docs_blueprint.before_request(requires_no_auth) application.register_blueprint(docs_blueprint) # delivery receipts ses_callback_blueprint.before_request(requires_no_auth) application.register_blueprint(ses_callback_blueprint) # inbound sms receive_notifications_blueprint.before_request(requires_no_auth) application.register_blueprint(receive_notifications_blueprint) notifications_blueprint.before_request(requires_auth) application.register_blueprint(notifications_blueprint) job_blueprint.before_request(requires_admin_auth) application.register_blueprint(job_blueprint) service_invite_blueprint.before_request(requires_admin_auth) application.register_blueprint(service_invite_blueprint) organisation_invite_blueprint.before_request(requires_admin_auth) application.register_blueprint(organisation_invite_blueprint) inbound_number_blueprint.before_request(requires_admin_auth) application.register_blueprint(inbound_number_blueprint) inbound_sms_blueprint.before_request(requires_admin_auth) application.register_blueprint(inbound_sms_blueprint) template_statistics_blueprint.before_request(requires_admin_auth) application.register_blueprint(template_statistics_blueprint) events_blueprint.before_request(requires_admin_auth) application.register_blueprint(events_blueprint) provider_details_blueprint.before_request(requires_admin_auth) application.register_blueprint(provider_details_blueprint, url_prefix='/provider-details') email_branding_blueprint.before_request(requires_admin_auth) application.register_blueprint(email_branding_blueprint, url_prefix='/email-branding') billing_blueprint.before_request(requires_admin_auth) application.register_blueprint(billing_blueprint) service_callback_blueprint.before_request(requires_admin_auth) application.register_blueprint(service_callback_blueprint) organisation_blueprint.before_request(requires_admin_auth) application.register_blueprint(organisation_blueprint, url_prefix='/organisations') complaint_blueprint.before_request(requires_admin_auth) application.register_blueprint(complaint_blueprint) performance_dashboard_blueprint.before_request(requires_admin_auth) application.register_blueprint(performance_dashboard_blueprint) platform_stats_blueprint.before_request(requires_admin_auth) application.register_blueprint(platform_stats_blueprint, url_prefix='/platform-stats') template_folder_blueprint.before_request(requires_admin_auth) application.register_blueprint(template_folder_blueprint) upload_blueprint.before_request(requires_admin_auth) application.register_blueprint(upload_blueprint) def register_v2_blueprints(application): from app.authentication.auth import requires_auth from app.v2.inbound_sms.get_inbound_sms import v2_inbound_sms_blueprint from app.v2.notifications import ( # noqa get_notifications, post_notifications, v2_notification_blueprint, ) from app.v2.template import ( # noqa get_template, post_template, v2_template_blueprint, ) from app.v2.templates.get_templates import v2_templates_blueprint v2_notification_blueprint.before_request(requires_auth) application.register_blueprint(v2_notification_blueprint) v2_templates_blueprint.before_request(requires_auth) application.register_blueprint(v2_templates_blueprint) v2_template_blueprint.before_request(requires_auth) application.register_blueprint(v2_template_blueprint) v2_inbound_sms_blueprint.before_request(requires_auth) application.register_blueprint(v2_inbound_sms_blueprint) def init_app(app): @app.before_request def record_request_details(): CONCURRENT_REQUESTS.inc() g.start = monotonic() g.endpoint = request.endpoint @app.after_request def after_request(response): CONCURRENT_REQUESTS.dec() response.headers.add('X-Content-Type-Options', 'nosniff') return response @app.errorhandler(Exception) def exception(error): app.logger.exception(error) # error.code is set for our exception types. msg = getattr(error, 'message', str(error)) code = getattr(error, 'code', 500) response = make_response( jsonify(result='error', message=msg), code, error.get_headers() ) response.content_type = "application/json" return response @app.errorhandler(WerkzeugHTTPException) def werkzeug_exception(e): response = make_response( jsonify(result='error', message=e.description), e.code, e.get_headers() ) response.content_type = 'application/json' return response @app.errorhandler(404) def page_not_found(e): msg = e.description or "Not found" response = make_response( jsonify(result='error', message=msg), 404, e.get_headers() ) response.content_type = 'application/json' return response def create_uuid(): return str(uuid.uuid4()) def create_random_identifier(): return ''.join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(16)) def setup_sqlalchemy_events(app): TOTAL_DB_CONNECTIONS = Gauge( 'db_connection_total_connected', 'How many db connections are currently held (potentially idle) by the server', ) TOTAL_CHECKED_OUT_DB_CONNECTIONS = Gauge( 'db_connection_total_checked_out', 'How many db connections are currently checked out by web requests', ) DB_CONNECTION_OPEN_DURATION_SECONDS = Histogram( 'db_connection_open_duration_seconds', 'How long db connections are held open for in seconds', ['method', 'host', 'path'] ) # need this or db.engine isn't accessible with app.app_context(): @event.listens_for(db.engine, 'connect') def connect(dbapi_connection, connection_record): # connection first opened with db TOTAL_DB_CONNECTIONS.inc() @event.listens_for(db.engine, 'close') def close(dbapi_connection, connection_record): # connection closed (probably only happens with overflow connections) TOTAL_DB_CONNECTIONS.dec() @event.listens_for(db.engine, 'checkout') def checkout(dbapi_connection, connection_record, connection_proxy): try: # connection given to a web worker TOTAL_CHECKED_OUT_DB_CONNECTIONS.inc() # this will overwrite any previous checkout_at timestamp connection_record.info['checkout_at'] = time.monotonic() # checkin runs after the request is already torn down, therefore we add the request_data onto the # connection_record as otherwise it won't have that information when checkin actually runs. # Note: this is not a problem for checkouts as the checkout always happens within a web request or task # web requests if has_request_context(): connection_record.info['request_data'] = { 'method': request.method, 'host': request.host, 'url_rule': request.url_rule.rule if request.url_rule else 'No endpoint' } # celery apps elif current_task: connection_record.info['request_data'] = { 'method': 'celery', 'host': current_app.config['NOTIFY_APP_NAME'], # worker name 'url_rule': current_task.name, # task name } # anything else. migrations possibly, or flask cli commands. else: current_app.logger.warning('Checked out sqlalchemy connection from outside of request/task') connection_record.info['request_data'] = { 'method': 'unknown', 'host': 'unknown', 'url_rule': 'unknown', } except Exception: current_app.logger.exception("Exception caught for checkout event.") @event.listens_for(db.engine, 'checkin') def checkin(dbapi_connection, connection_record): try: # connection returned by a web worker TOTAL_CHECKED_OUT_DB_CONNECTIONS.dec() # duration that connection was held by a single web request duration = time.monotonic() - connection_record.info['checkout_at'] DB_CONNECTION_OPEN_DURATION_SECONDS.labels( connection_record.info['request_data']['method'], connection_record.info['request_data']['host'], connection_record.info['request_data']['url_rule'] ).observe(duration) except Exception: current_app.logger.exception("Exception caught for checkin event.")