diff --git a/app/aws/s3.py b/app/aws/s3.py index b23cca71d..a440745da 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -4,6 +4,7 @@ from flask import current_app import pytz from boto3 import client, resource +import botocore FILE_LOCATION_STRUCTURE = 'service-{}-notify/{}.csv' @@ -18,6 +19,17 @@ def get_s3_object(bucket_name, file_location): return s3.Object(bucket_name, file_location) +def file_exists(bucket_name, file_location): + try: + # try and access metadata of object + get_s3_object(bucket_name, file_location).metadata + return True + except botocore.exceptions.ClientError as e: + if e.response['ResponseMetadata']['HTTPStatusCode'] == 404: + return False + raise + + def get_job_location(service_id, job_id): return ( current_app.config['CSV_UPLOAD_BUCKET_NAME'], diff --git a/app/celery/celery.py b/app/celery/celery.py index 4a9792cf0..56c7f72b2 100644 --- a/app/celery/celery.py +++ b/app/celery/celery.py @@ -1,6 +1,13 @@ import time from celery import Celery, Task +from celery.signals import worker_process_shutdown +from flask import current_app + + +@worker_process_shutdown.connect +def worker_process_shutdown(sender, signal, pid, exitcode, **kwargs): + current_app.logger.info('worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode)) def make_task(app): diff --git a/app/celery/provider_tasks.py b/app/celery/provider_tasks.py index 9a543ea23..113e79559 100644 --- a/app/celery/provider_tasks.py +++ b/app/celery/provider_tasks.py @@ -1,4 +1,3 @@ -from celery.signals import worker_process_shutdown from flask import current_app from notifications_utils.recipients import InvalidEmailError from notifications_utils.statsd_decorators import statsd @@ -13,11 +12,6 @@ from app.exceptions import NotificationTechnicalFailureException from app.models import NOTIFICATION_TECHNICAL_FAILURE -@worker_process_shutdown.connect -def worker_process_shutdown(sender, signal, pid, exitcode): - current_app.logger.info('Provider worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode)) - - @notify_celery.task(bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300) @statsd(namespace="tasks") def deliver_sms(self, notification_id): diff --git a/app/celery/research_mode_tasks.py b/app/celery/research_mode_tasks.py index ac2e1e8dd..b03d8ee69 100644 --- a/app/celery/research_mode_tasks.py +++ b/app/celery/research_mode_tasks.py @@ -1,4 +1,5 @@ -from datetime import datetime +import random +from datetime import datetime, timedelta import json from flask import current_app @@ -7,6 +8,7 @@ from requests import request, RequestException, HTTPError from notifications_utils.s3 import s3upload from app import notify_celery +from app.aws.s3 import file_exists from app.models import SMS_TYPE from app.config import QueueNames from app.celery.process_ses_receipts_tasks import process_ses_results @@ -123,7 +125,18 @@ def firetext_callback(notification_id, to): def create_fake_letter_response_file(self, reference): now = datetime.utcnow() dvla_response_data = '{}|Sent|0|Sorted'.format(reference) - upload_file_name = 'NOTIFY-{}-RSP.TXT'.format(now.strftime('%Y%m%d%H%M%S')) + + # try and find a filename that hasn't been taken yet - from a random time within the last 30 seconds + for i in sorted(range(30), key=lambda _: random.random()): + upload_file_name = 'NOTIFY-{}-RSP.TXT'.format((now - timedelta(seconds=i)).strftime('%Y%m%d%H%M%S')) + if not file_exists(current_app.config['DVLA_RESPONSE_BUCKET_NAME'], upload_file_name): + break + else: + raise ValueError( + 'cant create fake letter response file for {} - too many files for that time already exist on s3'.format( + reference + ) + ) s3upload( filedata=dvla_response_data, diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 53b57aff5..dd15735a2 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -5,7 +5,6 @@ from datetime import ( ) import pytz -from celery.signals import worker_process_shutdown from flask import current_app from notifications_utils.statsd_decorators import statsd from sqlalchemy import and_, func @@ -74,11 +73,6 @@ from app.utils import ( from app.v2.errors import JobIncompleteError -@worker_process_shutdown.connect -def worker_process_shutdown(sender, signal, pid, exitcode): - current_app.logger.info('Scheduled tasks worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode)) - - @notify_celery.task(name="remove_csv_files") @statsd(namespace="tasks") def remove_csv_files(job_types): diff --git a/app/celery/tasks.py b/app/celery/tasks.py index de626a733..dd6d88495 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -2,9 +2,7 @@ import json from datetime import datetime from collections import namedtuple, defaultdict -from celery.signals import worker_process_shutdown from flask import current_app - from notifications_utils.recipients import ( RecipientCSV ) @@ -74,11 +72,6 @@ from app.service.utils import service_allowed_to_send_to from app.utils import convert_utc_to_bst -@worker_process_shutdown.connect -def worker_process_shutdown(sender, signal, pid, exitcode): - current_app.logger.info('Tasks worker shutdown: PID: {} Exitcode: {}'.format(pid, exitcode)) - - @notify_celery.task(name="process-job") @statsd(namespace="tasks") def process_job(job_id): diff --git a/tests/app/celery/test_research_mode_tasks.py b/tests/app/celery/test_research_mode_tasks.py index 77656a62e..7a00f7163 100644 --- a/tests/app/celery/test_research_mode_tasks.py +++ b/tests/app/celery/test_research_mode_tasks.py @@ -1,5 +1,5 @@ import uuid -from unittest.mock import ANY +from unittest.mock import ANY, call from flask import current_app, json from freezegun import freeze_time @@ -15,7 +15,13 @@ from app.celery.research_mode_tasks import ( ses_notification_callback, create_fake_letter_response_file, ) -from tests.conftest import set_config_values +from tests.conftest import set_config_values, Matcher + + +dvla_response_file_matcher = Matcher( + 'dvla_response_file', + lambda x: 'NOTIFY-20180125140000-RSP.TXT' < x <= 'NOTIFY-20180125140030-RSP.TXT' +) def test_make_mmg_callback(notify_api, rmock): @@ -108,11 +114,11 @@ def test_failure_firetext_callback(phone_number): } -@freeze_time("2018-01-25 14:00:00") +@freeze_time("2018-01-25 14:00:30") def test_create_fake_letter_response_file_uploads_response_file_s3( notify_api, mocker): + mocker.patch('app.celery.research_mode_tasks.file_exists', return_value=False) mock_s3upload = mocker.patch('app.celery.research_mode_tasks.s3upload') - filename = 'NOTIFY-20180125140000-RSP.TXT' with requests_mock.Mocker() as request_mock: request_mock.post( @@ -127,15 +133,15 @@ def test_create_fake_letter_response_file_uploads_response_file_s3( filedata='random-ref|Sent|0|Sorted', region=current_app.config['AWS_REGION'], bucket_name=current_app.config['DVLA_RESPONSE_BUCKET_NAME'], - file_location=filename + file_location=dvla_response_file_matcher ) -@freeze_time("2018-01-25 14:00:00") +@freeze_time("2018-01-25 14:00:30") def test_create_fake_letter_response_file_calls_dvla_callback_on_development( notify_api, mocker): + mocker.patch('app.celery.research_mode_tasks.file_exists', return_value=False) mocker.patch('app.celery.research_mode_tasks.s3upload') - filename = 'NOTIFY-20180125140000-RSP.TXT' with set_config_values(notify_api, { 'NOTIFY_ENVIRONMENT': 'development' @@ -152,13 +158,25 @@ def test_create_fake_letter_response_file_calls_dvla_callback_on_development( assert request_mock.last_request.json() == { "Type": "Notification", "MessageId": "some-message-id", - "Message": '{"Records":[{"s3":{"object":{"key":"' + filename + '"}}}]}' + "Message": ANY + } + assert json.loads(request_mock.last_request.json()['Message']) == { + "Records": [ + { + "s3": { + "object": { + "key": dvla_response_file_matcher + } + } + } + ] } -@freeze_time("2018-01-25 14:00:00") +@freeze_time("2018-01-25 14:00:30") def test_create_fake_letter_response_file_does_not_call_dvla_callback_on_preview( notify_api, mocker): + mocker.patch('app.celery.research_mode_tasks.file_exists', return_value=False) mocker.patch('app.celery.research_mode_tasks.s3upload') with set_config_values(notify_api, { @@ -168,3 +186,35 @@ def test_create_fake_letter_response_file_does_not_call_dvla_callback_on_preview create_fake_letter_response_file('random-ref') assert request_mock.last_request is None + + +@freeze_time("2018-01-25 14:00:30") +def test_create_fake_letter_response_file_tries_to_create_files_with_other_filenames(notify_api, mocker): + mock_file_exists = mocker.patch('app.celery.research_mode_tasks.file_exists', side_effect=[True, True, False]) + mock_s3upload = mocker.patch('app.celery.research_mode_tasks.s3upload') + + create_fake_letter_response_file('random-ref') + + assert mock_file_exists.mock_calls == [ + call('test.notify.com-ftp', dvla_response_file_matcher), + call('test.notify.com-ftp', dvla_response_file_matcher), + call('test.notify.com-ftp', dvla_response_file_matcher), + ] + mock_s3upload.assert_called_once_with( + filedata=ANY, + region=ANY, + bucket_name=ANY, + file_location=dvla_response_file_matcher + ) + + +@freeze_time("2018-01-25 14:00:30") +def test_create_fake_letter_response_file_gives_up_after_thirty_times(notify_api, mocker): + mock_file_exists = mocker.patch('app.celery.research_mode_tasks.file_exists', return_value=True) + mock_s3upload = mocker.patch('app.celery.research_mode_tasks.s3upload') + + with pytest.raises(ValueError): + create_fake_letter_response_file('random-ref') + + assert len(mock_file_exists.mock_calls) == 30 + assert not mock_s3upload.called diff --git a/tests/conftest.py b/tests/conftest.py index ee47643ce..e3762afd9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -162,3 +162,15 @@ def set_config_values(app, dict): finally: for key in dict: app.config[key] = old_values[key] + + +class Matcher: + def __init__(self, description, key): + self.description = description + self.key = key + + def __eq__(self, other): + return self.key(other) + + def __repr__(self): + return ''.format(self.description)