diff --git a/app/celery/tasks.py b/app/celery/tasks.py index bb9967ee4..9e67fed92 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -1,3 +1,4 @@ +import json from datetime import (datetime) from collections import namedtuple @@ -6,6 +7,8 @@ from notifications_utils.recipients import ( RecipientCSV ) from notifications_utils.template import SMSMessageTemplate, WithSubjectTemplate, LetterDVLATemplate +from requests import HTTPError +from requests import request from sqlalchemy.exc import SQLAlchemyError from app import ( create_uuid, @@ -17,6 +20,7 @@ from app import ( from app.aws import s3 from app.celery import provider_tasks from app.config import QueueNames +from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id from app.dao.jobs_dao import ( dao_update_job, dao_get_job_by_id, @@ -25,6 +29,7 @@ from app.dao.jobs_dao import ( dao_update_job_status) from app.dao.notifications_dao import get_notification_by_id, dao_update_notifications_sent_to_dvla from app.dao.provider_details_dao import get_current_provider +from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count from app.dao.templates_dao import dao_get_template_by_id from app.models import ( @@ -371,3 +376,44 @@ def process_updates_from_file(response_file): NotificationUpdate = namedtuple('NotificationUpdate', ['reference', 'status', 'page_count', 'cost_threshold']) notification_updates = [NotificationUpdate(*line.split('|')) for line in response_file.splitlines()] return notification_updates + + +@notify_celery.task(bind=True, name="send-inbound-sms", max_retries=5, default_retry_delay=300) +@statsd(namespace="tasks") +def send_inbound_sms_to_service(self, inbound_sms_id, service_id): + inbound_api = get_service_inbound_api_for_service(service_id=service_id) + if not inbound_api: + # No API data has been set for this service + return + + inbound_sms = dao_get_inbound_sms_by_id(service_id=service_id, + inbound_id=inbound_sms_id) + data = { + "id": str(inbound_sms.id), + "from_number": inbound_sms.user_number, + "content": inbound_sms.content, + "date_received": inbound_sms.provider_date.strftime(DATETIME_FORMAT) + } + + response = request( + method="POST", + url=inbound_api.url, + data=json.dumps(data), + headers={ + 'Content-Type': 'application/json', + 'Authorization': 'Bearer {}'.format(inbound_api.bearer_token) + }, + timeout=60 + ) + try: + response.raise_for_status() + except HTTPError as e: + current_app.logger.exception("Exception raised in send_inbound_sms_to_service for service_id: {} and url: {}. " + "\n{}".format(service_id, inbound_api.url, e)) + if e.response.status_code >= 500: + try: + self.retry(queue=QueueNames.RETRY, + exc='Unable to send_inbound_sms_to_service for service_id: {} and url: {}. \n{}'.format( + service_id, inbound_api.url, e)) + except self.MaxRetriesExceededError: + current_app.logger.exception('Retry: send_inbound_sms_to_service has retried the max number of times') diff --git a/app/dao/service_inbound_api_dao.py b/app/dao/service_inbound_api_dao.py index bc26db584..15dad1415 100644 --- a/app/dao/service_inbound_api_dao.py +++ b/app/dao/service_inbound_api_dao.py @@ -30,3 +30,7 @@ def reset_service_inbound_api(service_inbound_api, updated_by_id, url=None, bear def get_service_inbound_api(service_inbound_api_id, service_id): return ServiceInboundApi.query.filter_by(id=service_inbound_api_id, service_id=service_id).first() + + +def get_service_inbound_api_for_service(service_id): + return ServiceInboundApi.query.filter_by(service_id=service_id).first() diff --git a/app/notifications/receive_notifications.py b/app/notifications/receive_notifications.py index fdd1f95ec..c4005b5e0 100644 --- a/app/notifications/receive_notifications.py +++ b/app/notifications/receive_notifications.py @@ -5,6 +5,8 @@ from flask import jsonify, Blueprint, current_app, request from notifications_utils.recipients import validate_and_format_phone_number from app import statsd_client, firetext_client, mmg_client +from app.celery import tasks +from app.config import QueueNames from app.dao.services_dao import dao_fetch_services_by_sms_sender from app.dao.inbound_sms_dao import dao_create_inbound_sms from app.models import InboundSms @@ -30,28 +32,61 @@ def receive_mmg_sms(): inbound_number = strip_leading_forty_four(post_data['Number']) - potential_services = dao_fetch_services_by_sms_sender(inbound_number) - - if len(potential_services) != 1: - current_app.logger.error('Inbound number "{}" from MMG not associated with exactly one service'.format( - post_data['Number'] - )) - statsd_client.incr('inbound.mmg.failed') + potential_services = fetch_potential_services(inbound_number, 'mmg') + if not potential_services: # since this is an issue with our service <-> number mapping, we should still tell MMG that we received - # succesfully + # successfully return 'RECEIVED', 200 statsd_client.incr('inbound.mmg.successful') service = potential_services[0] - inbound = create_inbound_mmg_sms_object(service, post_data) + inbound = create_inbound_sms_object(service, + content=format_mmg_message(post_data["Message"]), + from_number=post_data['MSISDN'], + provider_ref=post_data["ID"], + date_received=post_data.get('DateRecieved'), + provider_name="mmg") - current_app.logger.info('{} received inbound SMS with reference {}'.format(service.id, inbound.provider_reference)) + tasks.send_inbound_sms_to_service.apply_async([str(inbound.id), str(service.id)], queue=QueueNames.NOTIFY) + current_app.logger.info( + '{} received inbound SMS with reference {} from MMG'.format(service.id, inbound.provider_reference)) return 'RECEIVED', 200 +@receive_notifications_blueprint.route('/notifications/sms/receive/firetext', methods=['POST']) +def receive_firetext_sms(): + post_data = request.form + + inbound_number = strip_leading_forty_four(post_data['destination']) + + potential_services = fetch_potential_services(inbound_number, 'firetext') + if not potential_services: + return jsonify({ + "status": "ok" + }), 200 + + service = potential_services[0] + + inbound = create_inbound_sms_object(service=service, + content=post_data["message"], + from_number=post_data['source'], + provider_ref=None, + date_received=post_data['time'], + provider_name="firetext") + + statsd_client.incr('inbound.firetext.successful') + + tasks.send_inbound_sms_to_service.apply_async([str(inbound.id), str(service.id)], queue=QueueNames.NOTIFY) + current_app.logger.info( + '{} received inbound SMS with reference {} from Firetext'.format(service.id, inbound.provider_reference)) + return jsonify({ + "status": "ok" + }), 200 + + def format_mmg_message(message): return unquote(message.replace('+', ' ')) @@ -66,11 +101,10 @@ def format_mmg_datetime(date): return convert_bst_to_utc(parsed_datetime) -def create_inbound_mmg_sms_object(service, json): - message = format_mmg_message(json['Message']) - user_number = validate_and_format_phone_number(json['MSISDN'], international=True) +def create_inbound_sms_object(service, content, from_number, provider_ref, date_received, provider_name): + user_number = validate_and_format_phone_number(from_number, international=True) - provider_date = json.get('DateRecieved') + provider_date = date_received if provider_date: provider_date = format_mmg_datetime(provider_date) @@ -79,52 +113,24 @@ def create_inbound_mmg_sms_object(service, json): notify_number=service.sms_sender, user_number=user_number, provider_date=provider_date, - provider_reference=json.get('ID'), - content=message, - provider=mmg_client.name + provider_reference=provider_ref, + content=content, + provider=provider_name ) dao_create_inbound_sms(inbound) return inbound -@receive_notifications_blueprint.route('/notifications/sms/receive/firetext', methods=['POST']) -def receive_firetext_sms(): - post_data = request.form - - inbound_number = strip_leading_forty_four(post_data['destination']) - +def fetch_potential_services(inbound_number, provider_name): potential_services = dao_fetch_services_by_sms_sender(inbound_number) + if len(potential_services) != 1: - current_app.logger.error('Inbound number "{}" from firetext not associated with exactly one service'.format( - post_data['destination'] + current_app.logger.error('Inbound number "{}" from {} not associated with exactly one service'.format( + inbound_number, provider_name )) - statsd_client.incr('inbound.firetext.failed') - return jsonify({ - "status": "ok" - }), 200 - - service = potential_services[0] - - user_number = validate_and_format_phone_number(post_data['source'], international=True) - message = post_data['message'] - timestamp = post_data['time'] - - dao_create_inbound_sms( - InboundSms( - service=service, - notify_number=service.sms_sender, - user_number=user_number, - provider_date=timestamp, - content=message, - provider=firetext_client.name - ) - ) - - statsd_client.incr('inbound.firetext.successful') - - return jsonify({ - "status": "ok" - }), 200 + statsd_client.incr('inbound.{}.failed'.format(provider_name)) + return False + return potential_services def strip_leading_forty_four(number): diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 8cb5a13d4..b49d86c41 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1,8 +1,10 @@ +import json import uuid from datetime import datetime from unittest.mock import Mock import pytest +import requests_mock from flask import current_app from freezegun import freeze_time from sqlalchemy.exc import SQLAlchemyError @@ -25,8 +27,8 @@ from app.celery.tasks import ( get_template_class, update_job_to_sent_to_dvla, update_letter_notifications_statuses, - process_updates_from_file -) + process_updates_from_file, + send_inbound_sms_to_service) from app.dao import jobs_dao, services_dao from app.models import ( Notification, @@ -47,7 +49,7 @@ from tests.app.conftest import ( sample_email_template, sample_notification ) -from tests.app.db import create_user, create_notification, create_job +from tests.app.db import create_user, create_notification, create_job, create_service_inbound_api, create_inbound_sms class AnyStringWith(str): @@ -1119,3 +1121,97 @@ def test_update_letter_notifications_statuses_builds_updates_list(notify_api, mo assert updates[1].status == 'Sent' assert updates[1].page_count == '2' assert updates[1].cost_threshold == 'Sorted' + + +def test_send_inbound_sms_to_service_post_https_request_to_service(notify_api, sample_service): + inbound_api = create_service_inbound_api(service=sample_service, url="https://some.service.gov.uk/", + bearer_token="something_unique") + inbound_sms = create_inbound_sms(service=sample_service, notify_number="0751421", user_number="447700900111", + provider_date=datetime(2017, 6, 20), content="Here is some content") + data = { + "id": str(inbound_sms.id), + "from_number": inbound_sms.user_number, + "content": inbound_sms.content, + "date_received": inbound_sms.provider_date.strftime(DATETIME_FORMAT) + } + + with requests_mock.Mocker() as request_mock: + request_mock.post(inbound_api.url, + json={}, + status_code=200) + send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id) + assert request_mock.call_count == 1 + assert request_mock.request_history[0].url == inbound_api.url + assert request_mock.request_history[0].method == 'POST' + assert request_mock.request_history[0].text == json.dumps(data) + assert request_mock.request_history[0].headers["Content-type"] == "application/json" + assert request_mock.request_history[0].headers["Authorization"] == "Bearer {}".format(inbound_api.bearer_token) + + +def test_send_inbound_sms_to_service_does_not_send_request_when_inbound_sms_does_not_exist(notify_api, sample_service): + inbound_api = create_service_inbound_api(service=sample_service) + with requests_mock.Mocker() as request_mock: + request_mock.post(inbound_api.url, + json={}, + status_code=200) + with pytest.raises(SQLAlchemyError): + send_inbound_sms_to_service(inbound_sms_id=uuid.uuid4(), service_id=sample_service.id) + + assert request_mock.call_count == 0 + + +def test_send_inbound_sms_to_service_does_not_sent_request_when_inbound_api_does_not_exist( + notify_api, sample_service, mocker): + inbound_sms = create_inbound_sms(service=sample_service, notify_number="0751421", user_number="447700900111", + provider_date=datetime(2017, 6, 20), content="Here is some content") + mocked = mocker.patch("requests.request") + send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id) + + mocked.call_count == 0 + + +def test_send_inbound_sms_to_service_retries_if_request_returns_500(notify_api, sample_service, mocker): + inbound_api = create_service_inbound_api(service=sample_service, url="https://some.service.gov.uk/", + bearer_token="something_unique") + inbound_sms = create_inbound_sms(service=sample_service, notify_number="0751421", user_number="447700900111", + provider_date=datetime(2017, 6, 20), content="Here is some content") + data = { + "id": str(inbound_sms.id), + "from_number": inbound_sms.user_number, + "content": inbound_sms.content, + "date_received": inbound_sms.provider_date.strftime(DATETIME_FORMAT) + } + + mocked = mocker.patch('app.celery.tasks.send_inbound_sms_to_service.retry') + with requests_mock.Mocker() as request_mock: + request_mock.post(inbound_api.url, + json={}, + status_code=500) + send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id) + + mocked.assert_called_with( + exc='Unable to send_inbound_sms_to_service for service_id: {} ' + 'and url: {}. \n500 Server Error: None'.format(sample_service.id, inbound_api.url), + queue="retry-tasks") + + +def test_send_inbound_sms_to_service_does_not_retries_if_request_returns_404(notify_api, sample_service, mocker): + inbound_api = create_service_inbound_api(service=sample_service, url="https://some.service.gov.uk/", + bearer_token="something_unique") + inbound_sms = create_inbound_sms(service=sample_service, notify_number="0751421", user_number="447700900111", + provider_date=datetime(2017, 6, 20), content="Here is some content") + data = { + "id": str(inbound_sms.id), + "from_number": inbound_sms.user_number, + "content": inbound_sms.content, + "date_received": inbound_sms.provider_date.strftime(DATETIME_FORMAT) + } + + mocked = mocker.patch('app.celery.tasks.send_inbound_sms_to_service.retry') + with requests_mock.Mocker() as request_mock: + request_mock.post(inbound_api.url, + json={}, + status_code=404) + send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id) + + mocked.call_count == 0 diff --git a/tests/app/dao/test_service_inbound_api_dao.py b/tests/app/dao/test_service_inbound_api_dao.py index 68b804d23..d924c2a70 100644 --- a/tests/app/dao/test_service_inbound_api_dao.py +++ b/tests/app/dao/test_service_inbound_api_dao.py @@ -7,9 +7,10 @@ from app import encryption from app.dao.service_inbound_api_dao import ( save_service_inbound_api, reset_service_inbound_api, - get_service_inbound_api -) + get_service_inbound_api, + get_service_inbound_api_for_service) from app.models import ServiceInboundApi +from tests.app.db import create_service_inbound_api def test_save_service_inbound_api(sample_service): @@ -115,3 +116,14 @@ def test_get_service_inbound_api(sample_service): assert inbound_api.bearer_token == "some_unique_string" assert inbound_api._bearer_token != "some_unique_string" assert inbound_api.updated_at is None + + +def test_get_service_inbound_api_for_service(sample_service): + service_inbound_api = create_service_inbound_api(service=sample_service) + result = get_service_inbound_api_for_service(sample_service.id) + assert result.id == service_inbound_api.id + assert result.url == service_inbound_api.url + assert result.bearer_token == service_inbound_api.bearer_token + assert result.created_at == service_inbound_api.created_at + assert result.updated_at == service_inbound_api.updated_at + assert result.updated_by_id == service_inbound_api.updated_by_id diff --git a/tests/app/notifications/test_receive_notification.py b/tests/app/notifications/test_receive_notification.py index 66f743be4..49c78553a 100644 --- a/tests/app/notifications/test_receive_notification.py +++ b/tests/app/notifications/test_receive_notification.py @@ -1,3 +1,4 @@ +import uuid from datetime import datetime from unittest.mock import call @@ -7,7 +8,7 @@ from flask import json from app.notifications.receive_notifications import ( format_mmg_message, format_mmg_datetime, - create_inbound_mmg_sms_object, + create_inbound_sms_object, strip_leading_forty_four ) @@ -15,7 +16,8 @@ from app.models import InboundSms from tests.app.db import create_service -def test_receive_notification_returns_received_to_mmg(client, sample_service): +def test_receive_notification_returns_received_to_mmg(client, sample_service, mocker): + mocked = mocker.patch("app.notifications.receive_notifications.tasks.send_inbound_sms_to_service.apply_async") data = {"ID": "1234", "MSISDN": "447700900855", "Message": "Some message to notify", @@ -30,6 +32,8 @@ def test_receive_notification_returns_received_to_mmg(client, sample_service): assert response.status_code == 200 assert response.get_data(as_text=True) == 'RECEIVED' + inbound_sms_id = InboundSms.query.all()[0].id + mocked.assert_called_once_with([str(inbound_sms_id), str(sample_service.id)], queue="notify-internal-tasks") @pytest.mark.parametrize('message, expected_output', [ @@ -61,7 +65,8 @@ def test_create_inbound_mmg_sms_object(sample_service): 'ID': 'bar', } - inbound_sms = create_inbound_mmg_sms_object(sample_service, data) + inbound_sms = create_inbound_sms_object(sample_service, format_mmg_message(data["Message"]), + data["MSISDN"], data["ID"], data["DateRecieved"], "mmg") assert inbound_sms.service_id == sample_service.id assert inbound_sms.notify_number == 'foo' @@ -96,9 +101,10 @@ def test_receive_notification_error_if_not_single_matching_service(client, notif def test_receive_notification_returns_received_to_firetext(notify_db_session, client, mocker): + mocked = mocker.patch("app.notifications.receive_notifications.tasks.send_inbound_sms_to_service.apply_async") mock = mocker.patch('app.notifications.receive_notifications.statsd_client.incr') - create_service(service_name='b', sms_sender='07111111111') + service = create_service(service_name='b', sms_sender='07111111111') data = "source=07999999999&destination=07111111111&message=this is a message&time=2017-01-01 12:00:00" @@ -113,9 +119,12 @@ def test_receive_notification_returns_received_to_firetext(notify_db_session, cl mock.assert_has_calls([call('inbound.firetext.successful')]) assert result['status'] == 'ok' + inbound_sms_id = InboundSms.query.all()[0].id + mocked.assert_called_once_with([str(inbound_sms_id), str(service.id)], queue="notify-internal-tasks") def test_receive_notification_from_firetext_persists_message(notify_db_session, client, mocker): + mocked = mocker.patch("app.notifications.receive_notifications.tasks.send_inbound_sms_to_service.apply_async") mocker.patch('app.notifications.receive_notifications.statsd_client.incr') service = create_service(service_name='b', sms_sender='07111111111') @@ -139,9 +148,11 @@ def test_receive_notification_from_firetext_persists_message(notify_db_session, assert persisted.content == 'this is a message' assert persisted.provider == 'firetext' assert persisted.provider_date == datetime(2017, 1, 1, 12, 0, 0, 0) + mocked.assert_called_once_with([str(persisted.id), str(service.id)], queue="notify-internal-tasks") def test_receive_notification_from_firetext_persists_message_with_normalized_phone(notify_db_session, client, mocker): + mocker.patch("app.notifications.receive_notifications.tasks.send_inbound_sms_to_service.apply_async") mock = mocker.patch('app.notifications.receive_notifications.statsd_client.incr') create_service(service_name='b', sms_sender='07111111111') @@ -163,6 +174,7 @@ def test_receive_notification_from_firetext_persists_message_with_normalized_pho def test_returns_ok_to_firetext_if_mismatched_sms_sender(notify_db_session, client, mocker): + mocked = mocker.patch("app.notifications.receive_notifications.tasks.send_inbound_sms_to_service.apply_async") mock = mocker.patch('app.notifications.receive_notifications.statsd_client.incr') create_service(service_name='b', sms_sender='07111111199') @@ -180,6 +192,7 @@ def test_returns_ok_to_firetext_if_mismatched_sms_sender(notify_db_session, clie assert not InboundSms.query.all() assert result['status'] == 'ok' mock.assert_has_calls([call('inbound.firetext.failed')]) + mocked.call_count == 0 @pytest.mark.parametrize(