rename callback_tasks.py to process_ses_receipts.py

create service_callback_tasks.py for tasks to send delivery statuses to services
This commit is contained in:
venusbb
2017-12-01 16:15:21 +00:00
parent 988b22391b
commit 489f43a2c9
7 changed files with 290 additions and 235 deletions

View File

@@ -5,7 +5,7 @@ from requests import request, RequestException, HTTPError
from app.models import SMS_TYPE
from app.config import QueueNames
from app.celery.callback_tasks import process_ses_results
from app.celery.process_ses_receipts_tasks import process_ses_results
temp_fail = "7700900003"
perm_fail = "7700900002"

View File

@@ -0,0 +1,71 @@
import json
from app import (
DATETIME_FORMAT,
notify_celery,
)
from app.dao.notifications_dao import (
get_notification_by_id,
)
from app.statsd_decorators import statsd
from app.dao.service_callback_api_dao import get_service_callback_api_for_service
from requests import (
HTTPError,
request,
RequestException
)
from flask import current_app
from app.config import QueueNames
@notify_celery.task(bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300)
@statsd(namespace="tasks")
def send_delivery_status_to_service(self, notification_id):
# TODO: do we need to do rate limit this?
notification = get_notification_by_id(notification_id)
service_callback_api = get_service_callback_api_for_service(service_id=notification.service_id)
if not service_callback_api:
# No delivery receipt API info set
return
data = {
"id": str(notification_id),
"reference": str(notification.client_reference),
"to": notification.to,
"status": notification.status,
"created_at": notification.created_at.strftime(DATETIME_FORMAT), # the time GOV.UK email sent the request
"updated_at": notification.updated_at.strftime(DATETIME_FORMAT), # the last time the status was updated
"sent_at": notification.sent_at.strftime(DATETIME_FORMAT), # the time the email was sent
"notification_type": notification.notification_type
}
try:
response = request(
method="POST",
url=service_callback_api.url,
data=json.dumps(data),
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(service_callback_api.bearer_token)
},
timeout=60
)
current_app.logger.info('send_delivery_status_to_service sending {} to {}, response {}'.format(
notification_id,
service_callback_api.url,
response.status_code
))
response.raise_for_status()
except RequestException as e:
current_app.logger.warning(
"send_inbound_sms_to_service request failed for service_id: {} and url: {}. exc: {}".format(
notification_id,
service_callback_api.url,
e
)
)
if not isinstance(e, HTTPError) or e.response.status_code >= 500:
try:
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
current_app.logger.exception('Retry: send_inbound_sms_to_service has retried the max number of times')

View File

@@ -68,7 +68,6 @@ from app.notifications.process_notifications import persist_notification
from app.service.utils import service_allowed_to_send_to
from app.statsd_decorators import statsd
from notifications_utils.s3 import s3upload
from app.dao.service_callback_api_dao import get_service_callback_api_for_service
@worker_process_shutdown.connect
@@ -500,31 +499,36 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id):
"date_received": inbound_sms.provider_date.strftime(DATETIME_FORMAT)
}
_post_status_update(self, inbound_api, data, 'send_inbound_sms_to_service')
@notify_celery.task(bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300)
@statsd(namespace="tasks")
def send_delivery_status_to_service(self, notification_id):
# TODO: do we need to do rate limit this?
notification = get_notification_by_id(notification_id)
service_callback_api = get_service_callback_api_for_service(service_id=notification.service_id)
if not service_callback_api:
# No delivery receipt API info set
return
data = {
"id": str(notification_id),
"reference": str(notification.client_reference),
"to": notification.to,
"status": notification.status,
"created_at": notification.created_at.strftime(DATETIME_FORMAT), # the time GOV.UK email sent the request
"updated_at": notification.updated_at.strftime(DATETIME_FORMAT), # the last time the status was updated
"sent_at": notification.sent_at.strftime(DATETIME_FORMAT), # the time the email was sent
"notification_type": notification.notification_type
}
_post_status_update(self, service_callback_api, data, 'send_delivery_receipt_to_service')
try:
response = request(
method="POST",
url=inbound_api.url,
data=json.dumps(data),
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(inbound_api.bearer_token)
},
timeout=60
)
current_app.logger.info('send_inbound_sms_to_service sending {} to {}, response {}'.format(
inbound_sms_id,
inbound_api.url,
response.status_code
))
response.raise_for_status()
except RequestException as e:
current_app.logger.warning(
"send_inbound_sms_to_service request failed for service_id: {} and url: {}. exc: {}".format(
service_id,
inbound_api.url,
e
)
)
if not isinstance(e, HTTPError) or e.response.status_code >= 500:
try:
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
current_app.logger.exception('Retry: send_inbound_sms_to_service has retried the max number of times')
@notify_celery.task(name='process-incomplete-jobs')
@@ -562,41 +566,3 @@ def process_incomplete_job(job_id):
process_row(row_number, recipient, personalisation, template, job, job.service)
job_complete(job, job.service, template, resumed=True)
def _post_status_update(self, callback_api, data, callback_name):
try:
response = request(
method="POST",
url=callback_api.url,
data=json.dumps(data),
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(callback_api.bearer_token)
},
timeout=60
)
current_app.logger.info('{} sending {} to {}, response {}'.format(
callback_name,
callback_api.service_id,
callback_api.url,
response.status_code
))
response.raise_for_status()
except RequestException as e:
current_app.logger.warning(
"service_name request failed for service_id: {} and url: {}. exc: {}".format(
callback_api.service_id,
callback_api.url,
e
)
)
if not isinstance(e, HTTPError) or e.response.status_code >= 500:
try:
self.retry(queue=QueueNames.RETRY,
exc='Unable to {} for service_id: {} and url: {}. \n{}'.format(
callback_name, callback_api.service_id, callback_api.url, e))
except self.MaxRetriesExceededError:
current_app.logger.exception('Retry: {} has retried the max number of times', callback_name)

View File

@@ -1,7 +1,7 @@
import json
from datetime import datetime
from app.celery.callback_tasks import process_ses_results
from app.celery.process_ses_receipts_tasks import process_ses_results
from tests.app.db import create_notification
@@ -18,7 +18,7 @@ def test_process_ses_results(sample_email_template):
def test_process_ses_results_does_not_retry_if_errors(notify_db, mocker):
mocked = mocker.patch('app.celery.callback_tasks.process_ses_results.retry')
mocked = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry')
response = json.loads(ses_notification_callback())
process_ses_results(response=response)
assert mocked.call_count == 0
@@ -26,7 +26,7 @@ def test_process_ses_results_does_not_retry_if_errors(notify_db, mocker):
def test_process_ses_results_retry_called(notify_db, mocker):
mocker.patch("app.dao.notifications_dao.update_notification_status_by_reference", side_effect=Exception("EXPECTED"))
mocked = mocker.patch('app.celery.callback_tasks.process_ses_results.retry')
mocked = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry')
response = json.loads(ses_notification_callback())
process_ses_results(response=response)
assert mocked.call_count != 0

View File

@@ -0,0 +1,185 @@
import json
from datetime import datetime
import pytest
import requests_mock
from requests import RequestException
from app import (DATETIME_FORMAT)
from tests.app.conftest import (
sample_service as create_sample_service,
sample_template as create_sample_template,
)
from tests.app.db import (
create_notification,
create_user,
create_service_callback_api
)
from app.celery.service_callback_tasks import send_delivery_status_to_service
@pytest.mark.parametrize("notification_type",
["email", "letter", "sms"])
def test_send_delivery_status_to_service_post_https_request_to_service(notify_db,
notify_db_session,
notification_type):
user = create_user()
service = create_sample_service(notify_db, notify_db_session, user=user, restricted=True)
callback_api = create_service_callback_api(service=service, url="https://some.service.gov.uk/",
bearer_token="something_unique")
template = create_sample_template(
notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello'
)
datestr = datetime(2017, 6, 20)
notification = create_notification(template=template,
created_at=datestr,
updated_at=datestr,
sent_at=datestr,
status='sent'
)
with requests_mock.Mocker() as request_mock:
request_mock.post(callback_api.url,
json={},
status_code=200)
send_delivery_status_to_service(notification.id)
mock_data = {
"id": str(notification.id),
"reference": str(notification.client_reference),
"to": notification.to,
"status": notification.status,
"created_at": datestr.strftime(DATETIME_FORMAT), # the time GOV.UK email sent the request
"updated_at": datestr.strftime(DATETIME_FORMAT), # the last time the status was updated
"sent_at": datestr.strftime(DATETIME_FORMAT), # the time the email was sent
"notification_type": notification_type
}
assert request_mock.call_count == 1
assert request_mock.request_history[0].url == callback_api.url
assert request_mock.request_history[0].method == 'POST'
assert request_mock.request_history[0].text == json.dumps(mock_data)
assert request_mock.request_history[0].headers["Content-type"] == "application/json"
assert request_mock.request_history[0].headers["Authorization"] == "Bearer {}".format(callback_api.bearer_token)
@pytest.mark.parametrize("notification_type",
["email", "letter", "sms"])
def test_send_delivery_status_to_service_does_not_sent_request_when_service_callback_api_does_not_exist(
notify_db, notify_db_session, mocker, notification_type):
service = create_sample_service(notify_db, notify_db_session, restricted=True)
template = create_sample_template(
notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello'
)
datestr = datetime(2017, 6, 20)
notification = create_notification(template=template,
created_at=datestr,
updated_at=datestr,
sent_at=datestr,
status='sent'
)
mocked = mocker.patch("requests.request")
send_delivery_status_to_service(notification.id)
mocked.call_count == 0
@pytest.mark.parametrize("notification_type",
["email", "letter", "sms"])
def test_send_delivery_status_to_service_retries_if_request_returns_500(notify_db,
notify_db_session,
mocker,
notification_type):
user = create_user()
service = create_sample_service(notify_db, notify_db_session, user=user, restricted=True)
template = create_sample_template(
notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello'
)
callback_api = create_service_callback_api(service=service, url="https://some.service.gov.uk/",
bearer_token="something_unique")
datestr = datetime(2017, 6, 20)
notification = create_notification(template=template,
created_at=datestr,
updated_at=datestr,
sent_at=datestr,
status='sent'
)
mocked = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.retry')
with requests_mock.Mocker() as request_mock:
request_mock.post(callback_api.url,
json={},
status_code=500)
send_delivery_status_to_service(notification.id)
assert mocked.call_count == 1
assert mocked.call_args[1]['queue'] == 'retry-tasks'
@pytest.mark.parametrize("notification_type",
["email", "letter", "sms"])
def test_send_delivery_status_to_service_retries_if_request_throws_unknown(notify_db,
notify_db_session,
mocker,
notification_type):
user = create_user()
service = create_sample_service(notify_db, notify_db_session, user=user, restricted=True)
template = create_sample_template(
notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello'
)
create_service_callback_api(service=service, url="https://some.service.gov.uk/",
bearer_token="something_unique")
datestr = datetime(2017, 6, 20)
notification = create_notification(template=template,
created_at=datestr,
updated_at=datestr,
sent_at=datestr,
status='sent'
)
mocked = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.retry')
mocker.patch("app.celery.tasks.request", side_effect=RequestException())
send_delivery_status_to_service(notification.id)
assert mocked.call_count == 1
assert mocked.call_args[1]['queue'] == 'retry-tasks'
@pytest.mark.parametrize("notification_type",
["email", "letter", "sms"])
def test_send_delivery_status_to_service_does_not_retries_if_request_returns_404(notify_db,
notify_db_session,
mocker,
notification_type):
user = create_user()
service = create_sample_service(notify_db, notify_db_session, user=user, restricted=True)
template = create_sample_template(
notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello'
)
callback_api = create_service_callback_api(service=service, url="https://some.service.gov.uk/",
bearer_token="something_unique")
datestr = datetime(2017, 6, 20)
notification = create_notification(template=template,
created_at=datestr,
updated_at=datestr,
sent_at=datestr,
status='sent'
)
mocked = mocker.patch('app.celery.service_callback_tasks.send_delivery_status_to_service.retry')
with requests_mock.Mocker() as request_mock:
request_mock.post(callback_api.url,
json={},
status_code=404)
send_delivery_status_to_service(notification.id)
mocked.call_count == 0

View File

@@ -30,7 +30,6 @@ from app.celery.tasks import (
get_template_class,
s3,
send_inbound_sms_to_service,
send_delivery_status_to_service
)
from app.config import QueueNames
from app.dao import jobs_dao, services_dao
@@ -67,7 +66,6 @@ from tests.app.db import (
create_user,
create_reply_to_email,
create_service_with_defined_sms_sender,
create_service_callback_api
)
@@ -1250,171 +1248,6 @@ def test_send_inbound_sms_to_service_does_not_retries_if_request_returns_404(not
mocked.call_count == 0
@pytest.mark.parametrize("notification_type",
["email", "letter", "sms"])
def test_send_delivery_status_to_service_post_https_request_to_service(notify_db,
notify_db_session,
notification_type):
user = create_user()
service = create_sample_service(notify_db, notify_db_session, user=user, restricted=True)
callback_api = create_service_callback_api(service=service, url="https://some.service.gov.uk/",
bearer_token="something_unique")
template = create_sample_template(
notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello'
)
datestr = datetime(2017, 6, 20)
notification = create_notification(template=template,
created_at=datestr,
updated_at=datestr,
sent_at=datestr,
status='sent'
)
with requests_mock.Mocker() as request_mock:
request_mock.post(callback_api.url,
json={},
status_code=200)
send_delivery_status_to_service(notification.id)
mock_data = {
"id": str(notification.id),
"reference": str(notification.client_reference),
"to": notification.to,
"status": notification.status,
"created_at": datestr.strftime(DATETIME_FORMAT), # the time GOV.UK email sent the request
"updated_at": datestr.strftime(DATETIME_FORMAT), # the last time the status was updated
"sent_at": datestr.strftime(DATETIME_FORMAT), # the time the email was sent
"notification_type": notification_type
}
assert request_mock.call_count == 1
assert request_mock.request_history[0].url == callback_api.url
assert request_mock.request_history[0].method == 'POST'
assert request_mock.request_history[0].text == json.dumps(mock_data)
assert request_mock.request_history[0].headers["Content-type"] == "application/json"
assert request_mock.request_history[0].headers["Authorization"] == "Bearer {}".format(callback_api.bearer_token)
@pytest.mark.parametrize("notification_type",
["email", "letter", "sms"])
def test_send_delivery_status_to_service_does_not_sent_request_when_service_callback_api_does_not_exist(
notify_db, notify_db_session, mocker, notification_type):
service = create_sample_service(notify_db, notify_db_session, restricted=True)
template = create_sample_template(
notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello'
)
datestr = datetime(2017, 6, 20)
notification = create_notification(template=template,
created_at=datestr,
updated_at=datestr,
sent_at=datestr,
status='sent'
)
mocked = mocker.patch("requests.request")
send_delivery_status_to_service(notification.id)
mocked.call_count == 0
@pytest.mark.parametrize("notification_type",
["email", "letter", "sms"])
def test_send_delivery_status_to_service_retries_if_request_returns_500(notify_db,
notify_db_session,
mocker,
notification_type):
user = create_user()
service = create_sample_service(notify_db, notify_db_session, user=user, restricted=True)
template = create_sample_template(
notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello'
)
callback_api = create_service_callback_api(service=service, url="https://some.service.gov.uk/",
bearer_token="something_unique")
datestr = datetime(2017, 6, 20)
notification = create_notification(template=template,
created_at=datestr,
updated_at=datestr,
sent_at=datestr,
status='sent'
)
mocked = mocker.patch('app.celery.tasks.send_delivery_status_to_service.retry')
with requests_mock.Mocker() as request_mock:
request_mock.post(callback_api.url,
json={},
status_code=500)
send_delivery_status_to_service(notification.id)
assert mocked.call_count == 1
assert mocked.call_args[1]['queue'] == 'retry-tasks'
@pytest.mark.parametrize("notification_type",
["email", "letter", "sms"])
def test_send_delivery_status_to_service_retries_if_request_throws_unknown(notify_db,
notify_db_session,
mocker,
notification_type):
user = create_user()
service = create_sample_service(notify_db, notify_db_session, user=user, restricted=True)
template = create_sample_template(
notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello'
)
create_service_callback_api(service=service, url="https://some.service.gov.uk/",
bearer_token="something_unique")
datestr = datetime(2017, 6, 20)
notification = create_notification(template=template,
created_at=datestr,
updated_at=datestr,
sent_at=datestr,
status='sent'
)
mocked = mocker.patch('app.celery.tasks.send_delivery_status_to_service.retry')
mocker.patch("app.celery.tasks.request", side_effect=RequestException())
send_delivery_status_to_service(notification.id)
assert mocked.call_count == 1
assert mocked.call_args[1]['queue'] == 'retry-tasks'
@pytest.mark.parametrize("notification_type",
["email", "letter", "sms"])
def test_send_delivery_status_to_service_does_not_retries_if_request_returns_404(notify_db,
notify_db_session,
mocker,
notification_type):
user = create_user()
service = create_sample_service(notify_db, notify_db_session, user=user, restricted=True)
template = create_sample_template(
notify_db, notify_db_session, service=service, template_type=notification_type, subject_line='Hello'
)
callback_api = create_service_callback_api(service=service, url="https://some.service.gov.uk/",
bearer_token="something_unique")
datestr = datetime(2017, 6, 20)
notification = create_notification(template=template,
created_at=datestr,
updated_at=datestr,
sent_at=datestr,
status='sent'
)
mocked = mocker.patch('app.celery.tasks.send_inbound_sms_to_service.retry')
with requests_mock.Mocker() as request_mock:
request_mock.post(callback_api.url,
json={},
status_code=404)
send_delivery_status_to_service(notification.id)
mocked.call_count == 0
def test_check_job_status_task_does_not_raise_error(sample_template):
create_job(
template=sample_template,