diff --git a/app/aws_sqs.py b/app/aws_sqs.py deleted file mode 100644 index ea362024c..000000000 --- a/app/aws_sqs.py +++ /dev/null @@ -1,21 +0,0 @@ -import uuid -import boto3 -from itsdangerous import URLSafeSerializer -from flask import current_app - - -def add_notification_to_queue(service_id, template_id, type_, notification): - q = boto3.resource( - 'sqs', region_name=current_app.config['AWS_REGION'] - ).create_queue(QueueName="{}_{}".format( - current_app.config['NOTIFICATION_QUEUE_PREFIX'], - str(service_id))) - notification_id = str(uuid.uuid4()) - serializer = URLSafeSerializer(current_app.config.get('SECRET_KEY')) - encrypted = serializer.dumps(notification, current_app.config.get('DANGEROUS_SALT')) - q.send_message(MessageBody=encrypted, - MessageAttributes={'type': {'StringValue': type_, 'DataType': 'String'}, - 'notification_id': {'StringValue': notification_id, 'DataType': 'String'}, - 'service_id': {'StringValue': str(service_id), 'DataType': 'String'}, - 'template_id': {'StringValue': str(template_id), 'DataType': 'String'}}) - return notification_id diff --git a/app/celery/tasks.py b/app/celery/tasks.py index f507b34b7..b43daaa70 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -1,7 +1,5 @@ import random -import botocore -from boto3 import resource from datetime import (datetime) from flask import current_app @@ -24,8 +22,7 @@ from app.dao.jobs_dao import ( all_notifications_are_created_for_job, dao_get_all_notifications_for_job, dao_update_job_status) -from app.dao.notifications_dao import get_notification_by_id, dao_update_notification, \ - dao_update_notifications_sent_to_dvla +from app.dao.notifications_dao import get_notification_by_id, dao_update_notifications_sent_to_dvla from app.dao.provider_details_dao import get_current_provider from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count from app.dao.templates_dao import dao_get_template_by_id @@ -39,10 +36,11 @@ from app.models import ( JOB_STATUS_IN_PROGRESS, JOB_STATUS_FINISHED, JOB_STATUS_READY_TO_SEND, - JOB_STATUS_SENT_TO_DVLA, NOTIFICATION_SENDING, Notification) + JOB_STATUS_SENT_TO_DVLA) from app.notifications.process_notifications import persist_notification from app.service.utils import service_allowed_to_send_to from app.statsd_decorators import statsd +from notifications_utils.s3 import s3upload @notify_celery.task(name="process-job") @@ -272,7 +270,7 @@ def persist_letter( handle_exception(self, notification, notification_id, e) -@notify_celery.task(bind=True, name="build-dvla-file", countdown=30, max_retries=15, default_retry_delay=300) +@notify_celery.task(bind=True, name="build-dvla-file", countdown=60, max_retries=15, default_retry_delay=300) @statsd(namespace="tasks") def build_dvla_file(self, job_id): try: @@ -285,6 +283,7 @@ def build_dvla_file(self, job_id): file_location="{}-dvla-job.text".format(job_id) ) dao_update_job_status(job_id, JOB_STATUS_READY_TO_SEND) + notify_celery.send_task("aggregrate-dvla-files", ([str(job_id)], ), queue='aggregate-dvla-files') else: current_app.logger.info("All notifications for job {} are not persisted".format(job_id)) self.retry(queue="retry", exc="All notifications for job {} are not persisted".format(job_id)) @@ -322,37 +321,6 @@ def create_dvla_file_contents(job_id): return file_contents -def s3upload(filedata, region, bucket_name, file_location): - # TODO: move this method to utils. Will need to change the filedata from here to send contents in filedata['data'] - _s3 = resource('s3') - # contents = filedata['data'] - contents = filedata - - exists = True - try: - _s3.meta.client.head_bucket( - Bucket=bucket_name) - except botocore.exceptions.ClientError as e: - error_code = int(e.response['Error']['Code']) - if error_code == 404: - exists = False - else: - current_app.logger.error( - "Unable to create s3 bucket {}".format(bucket_name)) - raise e - - if not exists: - _s3.create_bucket(Bucket=bucket_name, - CreateBucketConfiguration={'LocationConstraint': region}) - - upload_id = create_uuid() - upload_file_name = file_location - key = _s3.Object(bucket_name, upload_file_name) - key.put(Body=contents, ServerSideEncryption='AES256') - - return upload_id - - def handle_exception(task, notification, notification_id, exc): if not get_notification_by_id(notification_id): retry_msg = '{task} notification for job {job} row number {row} and notification id {noti}'.format( diff --git a/requirements.txt b/requirements.txt index f047a5d05..0557ed9dc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,6 +29,6 @@ notifications-python-client>=3.1,<3.2 awscli>=1.11,<1.12 awscli-cwlogs>=1.4,<1.5 -git+https://github.com/alphagov/notifications-utils.git@15.0.2#egg=notifications-utils==15.0.2 +git+https://github.com/alphagov/notifications-utils.git@15.0.4#egg=notifications-utils==15.0.4 git+https://github.com/alphagov/boto.git@2.43.0-patch3#egg=boto==2.43.0-patch3 diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 72fd676b2..2dba7e4ca 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -979,6 +979,7 @@ def test_build_dvla_file(sample_letter_template, mocker): create_notification(template=job.template, job=job) create_notification(template=job.template, job=job) mocked_upload = mocker.patch("app.celery.tasks.s3upload") + mocked_send_task = mocker.patch("app.celery.tasks.notify_celery.send_task") mocked_letter_template = mocker.patch("app.celery.tasks.LetterDVLATemplate") mocked_letter_template_instance = mocked_letter_template.return_value mocked_letter_template_instance.__str__.return_value = "dvla|string" @@ -991,6 +992,7 @@ def test_build_dvla_file(sample_letter_template, mocker): file_location="{}-dvla-job.text".format(job.id) ) assert Job.query.get(job.id).job_status == 'ready to send' + mocked_send_task.assert_called_once_with("aggregrate-dvla-files", ([str(job.id)], ), queue='aggregate-dvla-files') def test_build_dvla_file_retries_if_all_notifications_are_not_created(sample_letter_template, mocker): @@ -998,6 +1000,7 @@ def test_build_dvla_file_retries_if_all_notifications_are_not_created(sample_let create_notification(template=job.template, job=job) mocked = mocker.patch("app.celery.tasks.s3upload") + mocked_send_task = mocker.patch("app.celery.tasks.notify_celery.send_task") mocker.patch('app.celery.tasks.build_dvla_file.retry', side_effect=Retry) with pytest.raises(Retry): build_dvla_file(job.id) @@ -1006,6 +1009,7 @@ def test_build_dvla_file_retries_if_all_notifications_are_not_created(sample_let tasks.build_dvla_file.retry.assert_called_with(queue='retry', exc="All notifications for job {} are not persisted".format(job.id)) assert Job.query.get(job.id).job_status == 'in progress' + mocked_send_task.assert_not_called() def test_create_dvla_file_contents(sample_letter_template, mocker):