import os import urllib from datetime import datetime, timedelta, timezone from time import monotonic import itertools import ago from flask import ( Flask, session, render_template, make_response, current_app, request, g, url_for ) from flask._compat import string_types from flask.globals import _lookup_req_object, _request_ctx_stack from flask_login import LoginManager from flask_wtf import CSRFProtect from flask_wtf.csrf import CSRFError from functools import partial from notifications_python_client.errors import HTTPError from notifications_utils import logging, request_id, formatters from notifications_utils.clients.statsd.statsd_client import StatsdClient from notifications_utils.recipients import ( validate_phone_number, InvalidPhoneError, format_phone_number_human_readable, ) from notifications_utils.formatters import formatted_list from werkzeug.exceptions import abort from werkzeug.local import LocalProxy from app import proxy_fix from app.asset_fingerprinter import AssetFingerprinter from app.its_dangerous_session import ItsdangerousSessionInterface from app.notify_client.service_api_client import ServiceAPIClient from app.notify_client.api_key_api_client import ApiKeyApiClient from app.notify_client.invite_api_client import InviteApiClient from app.notify_client.job_api_client import JobApiClient from app.notify_client.notification_api_client import NotificationApiClient from app.notify_client.status_api_client import StatusApiClient from app.notify_client.template_statistics_api_client import TemplateStatisticsApiClient from app.notify_client.user_api_client import UserApiClient from app.notify_client.events_api_client import EventsApiClient from app.notify_client.provider_client import ProviderClient from app.notify_client.organisations_client import OrganisationsClient from app.notify_client.models import AnonymousUser from app.notify_client.letter_jobs_client import LetterJobsClient from app.notify_client.inbound_number_client import InboundNumberClient from app.notify_client.billing_api_client import BillingAPIClient from app.utils import get_cdn_domain from app.utils import gmt_timezones login_manager = LoginManager() csrf = CSRFProtect() service_api_client = ServiceAPIClient() user_api_client = UserApiClient() api_key_api_client = ApiKeyApiClient() job_api_client = JobApiClient() notification_api_client = NotificationApiClient() status_api_client = StatusApiClient() invite_api_client = InviteApiClient() template_statistics_client = TemplateStatisticsApiClient() events_api_client = EventsApiClient() provider_client = ProviderClient() organisations_client = OrganisationsClient() asset_fingerprinter = AssetFingerprinter() statsd_client = StatsdClient() letter_jobs_client = LetterJobsClient() inbound_number_client = InboundNumberClient() billing_api_client = BillingAPIClient() # The current service attached to the request stack. current_service = LocalProxy(partial(_lookup_req_object, 'service')) def create_app(): from app.config import configs application = Flask(__name__) notify_environment = os.environ['NOTIFY_ENVIRONMENT'] application.config.from_object(configs[notify_environment]) init_app(application) statsd_client.init_app(application) logging.init_app(application, statsd_client) init_csrf(application) request_id.init_app(application) service_api_client.init_app(application) user_api_client.init_app(application) api_key_api_client.init_app(application) job_api_client.init_app(application) notification_api_client.init_app(application) status_api_client.init_app(application) invite_api_client.init_app(application) template_statistics_client.init_app(application) events_api_client.init_app(application) provider_client.init_app(application) organisations_client.init_app(application) letter_jobs_client.init_app(application) inbound_number_client.init_app(application) billing_api_client.init_app(application) login_manager.init_app(application) login_manager.login_view = 'main.sign_in' login_manager.login_message_category = 'default' login_manager.session_protection = None login_manager.anonymous_user = AnonymousUser from app.main import main as main_blueprint application.register_blueprint(main_blueprint) from .status import status as status_blueprint application.register_blueprint(status_blueprint) proxy_fix.init_app(application) application.session_interface = ItsdangerousSessionInterface() application.add_template_filter(format_datetime) application.add_template_filter(format_datetime_24h) application.add_template_filter(format_datetime_normal) application.add_template_filter(format_datetime_short) application.add_template_filter(format_time) application.add_template_filter(valid_phone_number) application.add_template_filter(linkable_name) application.add_template_filter(format_date) application.add_template_filter(format_date_normal) application.add_template_filter(format_date_short) application.add_template_filter(format_datetime_relative) application.add_template_filter(format_delta) application.add_template_filter(format_notification_status) application.add_template_filter(format_notification_status_as_time) application.add_template_filter(format_notification_status_as_field_status) application.add_template_filter(format_notification_status_as_url) application.add_template_filter(formatted_list) application.add_template_filter(nl2br) application.add_template_filter(format_phone_number_human_readable) application.after_request(useful_headers_after_request) application.after_request(save_service_after_request) application.before_request(load_service_before_request) @application.context_processor def _attach_current_service(): return {'current_service': current_service} register_errorhandlers(application) setup_event_handlers() return application def init_csrf(application): csrf.init_app(application) @application.errorhandler(CSRFError) def csrf_handler(reason): application.logger.warning('csrf.error_message: {}'.format(reason)) if 'user_id' not in session: application.logger.warning( u'csrf.session_expired: Redirecting user to log in page' ) return application.login_manager.unauthorized() application.logger.warning( u'csrf.invalid_token: Aborting request, user_id: {user_id}', extra={'user_id': session['user_id']}) abort(400, reason) def init_app(application): @application.before_request def record_start_time(): g.start = monotonic() g.endpoint = request.endpoint @application.context_processor def inject_global_template_variables(): return { 'asset_path': '/static/', 'header_colour': application.config['HEADER_COLOUR'], 'asset_url': asset_fingerprinter.get_url } def convert_to_boolean(value): if isinstance(value, string_types): if value.lower() in ['t', 'true', 'on', 'yes', '1']: return True elif value.lower() in ['f', 'false', 'off', 'no', '0']: return False return value def linkable_name(value): return urllib.parse.quote_plus(value) def format_datetime(date): return '{} at {}'.format( format_date(date), format_time(date) ) def format_datetime_24h(date): return '{} at {}'.format( format_date(date), format_time_24h(date), ) def format_datetime_normal(date): return '{} at {}'.format( format_date_normal(date), format_time(date) ) def format_datetime_short(date): return '{} at {}'.format( format_date_short(date), format_time(date) ) def format_datetime_relative(date): return '{} at {}'.format( get_human_day(date), format_time(date) ) def format_datetime_numeric(date): return '{} {}'.format( format_date_numeric(date), format_time_24h(date), ) def format_date_numeric(date): return gmt_timezones(date).strftime('%Y-%m-%d') def format_time_24h(date): return gmt_timezones(date).strftime('%H:%M') def get_human_day(time): # Add 1 hour to get ‘midnight today’ instead of ‘midnight tomorrow’ time_as_day = (gmt_timezones(time) - timedelta(hours=1)).strftime('%A') six_days_ago = gmt_timezones((datetime.utcnow() + timedelta(days=-6)).isoformat()) if gmt_timezones(time) < six_days_ago: return format_date_short(time) if time_as_day == (datetime.utcnow() + timedelta(days=1)).strftime('%A'): return 'tomorrow' if time_as_day == datetime.utcnow().strftime('%A'): return 'today' if time_as_day == (datetime.utcnow() + timedelta(days=-1)).strftime('%A'): return 'yesterday' return format_date_short(time) def format_time(date): return { '12:00AM': 'Midnight', '12:00PM': 'Midday' }.get( gmt_timezones(date).strftime('%-I:%M%p'), gmt_timezones(date).strftime('%-I:%M%p') ).lower() def format_date(date): return gmt_timezones(date).strftime('%A %d %B %Y') def format_date_normal(date): return gmt_timezones(date).strftime('%d %B %Y').lstrip('0') def format_date_short(date): return gmt_timezones(date).strftime('%d %B').lstrip('0') def format_delta(date): delta = ( datetime.now(timezone.utc) ) - ( gmt_timezones(date) ) if delta < timedelta(seconds=30): return "just now" if delta < timedelta(seconds=60): return "in the last minute" return ago.human( delta, future_tense='{} from now', # No-one should ever see this past_tense='{} ago', precision=1 ) def valid_phone_number(phone_number): try: validate_phone_number(phone_number) return True except InvalidPhoneError: return False def format_notification_status(status, template_type): return { 'email': { 'failed': 'Failed', 'technical-failure': 'Technical failure', 'temporary-failure': 'Inbox not accepting messages right now', 'permanent-failure': 'Email address doesn’t exist', 'delivered': 'Delivered', 'sending': 'Sending', 'created': 'Sending', 'sent': 'Delivered' }, 'sms': { 'failed': 'Failed', 'technical-failure': 'Technical failure', 'temporary-failure': 'Phone not accepting messages right now', 'permanent-failure': 'Phone number doesn’t exist', 'delivered': 'Delivered', 'sending': 'Sending', 'created': 'Sending', 'sent': 'Sent internationally' }, 'letter': { 'failed': 'Failed', 'technical-failure': 'Technical failure', 'temporary-failure': 'Temporary failure', 'permanent-failure': 'Permanent failure', 'delivered': 'Delivered', 'sending': 'Sending', 'created': 'Sending', 'sent': 'Delivered' } }[template_type].get(status, status) def format_notification_status_as_time(status, created, updated): return { 'sending': ' since {}'.format(created), 'created': ' since {}'.format(created) }.get(status, updated) def format_notification_status_as_field_status(status): return { 'failed': 'error', 'technical-failure': 'error', 'temporary-failure': 'error', 'permanent-failure': 'error', 'delivered': None, 'sent': None, 'sending': 'default', 'created': 'default' }.get(status, 'error') def format_notification_status_as_url(status): url = partial(url_for, "main.using_notify") return { 'technical-failure': url(_anchor='technical-failure'), 'temporary-failure': url(_anchor='not-accepting-messages'), 'permanent-failure': url(_anchor='does-not-exist') }.get(status) def nl2br(value): return formatters.nl2br(value) if value else '' @login_manager.user_loader def load_user(user_id): return user_api_client.get_user(user_id) def load_service_before_request(): if '/static/' in request.url: _request_ctx_stack.top.service = None return service_id = request.view_args.get('service_id', session.get('service_id')) if request.view_args \ else session.get('service_id') if _request_ctx_stack.top is not None: _request_ctx_stack.top.service = service_api_client.get_service(service_id)['data'] if service_id else None def save_service_after_request(response): # Only save the current session if the request is 200 service_id = request.view_args.get('service_id', None) if request.view_args else None if response.status_code == 200 and service_id: session['service_id'] = service_id return response # https://www.owasp.org/index.php/List_of_useful_HTTP_headers def useful_headers_after_request(response): response.headers.add('X-Frame-Options', 'deny') response.headers.add('X-Content-Type-Options', 'nosniff') response.headers.add('X-XSS-Protection', '1; mode=block') response.headers.add('Content-Security-Policy', ( "default-src 'self' 'unsafe-inline';" "script-src 'self' *.google-analytics.com 'unsafe-inline' 'unsafe-eval' data:;" "object-src 'self';" "font-src 'self' data:;" "img-src 'self' *.google-analytics.com *.notifications.service.gov.uk {} data:;" "frame-src www.youtube.com;".format(get_cdn_domain()) )) if 'Cache-Control' in response.headers: del response.headers['Cache-Control'] response.headers.add( 'Cache-Control', 'no-store, no-cache, private, must-revalidate') return response def register_errorhandlers(application): def _error_response(error_code): application.logger.exception('Admin app errored with %s', error_code) resp = make_response(render_template("error/{0}.html".format(error_code)), error_code) return useful_headers_after_request(resp) @application.errorhandler(HTTPError) def render_http_error(error): application.logger.error("API {} failed with status {} message {}".format( error.response.url if error.response else 'unknown', error.status_code, error.message )) error_code = error.status_code if error_code == 400: if isinstance(error.message, str): msg = [error.message] else: msg = list(itertools.chain(*[error.message[x] for x in error.message.keys()])) resp = make_response(render_template("error/400.html", message=msg)) return useful_headers_after_request(resp) elif error_code not in [401, 404, 403, 410, 500]: error_code = 500 return _error_response(error_code) @application.errorhandler(410) def handle_gone(error): return _error_response(410) @application.errorhandler(404) def handle_not_found(error): return _error_response(404) @application.errorhandler(403) def handle_not_authorized(error): return _error_response(403) @application.errorhandler(401) def handle_no_permissions(error): return _error_response(401) @application.errorhandler(500) def handle_exception(error): if current_app.config.get('DEBUG', None): raise error return _error_response(500) @application.errorhandler(Exception) def handle_bad_request(error): # We want the Flask in browser stacktrace if current_app.config.get('DEBUG', None): raise error return _error_response(500) def setup_event_handlers(): from flask_login import user_logged_in from app.event_handlers import on_user_logged_in user_logged_in.connect(on_user_logged_in)