mirror of
https://github.com/GSA/notifications-api.git
synced 2026-01-02 05:41:57 -05:00
Merge pull request #894 from alphagov/refactor-s3-upload
Refactor s3 upload
This commit is contained in:
@@ -1,21 +0,0 @@
|
||||
import uuid
|
||||
import boto3
|
||||
from itsdangerous import URLSafeSerializer
|
||||
from flask import current_app
|
||||
|
||||
|
||||
def add_notification_to_queue(service_id, template_id, type_, notification):
|
||||
q = boto3.resource(
|
||||
'sqs', region_name=current_app.config['AWS_REGION']
|
||||
).create_queue(QueueName="{}_{}".format(
|
||||
current_app.config['NOTIFICATION_QUEUE_PREFIX'],
|
||||
str(service_id)))
|
||||
notification_id = str(uuid.uuid4())
|
||||
serializer = URLSafeSerializer(current_app.config.get('SECRET_KEY'))
|
||||
encrypted = serializer.dumps(notification, current_app.config.get('DANGEROUS_SALT'))
|
||||
q.send_message(MessageBody=encrypted,
|
||||
MessageAttributes={'type': {'StringValue': type_, 'DataType': 'String'},
|
||||
'notification_id': {'StringValue': notification_id, 'DataType': 'String'},
|
||||
'service_id': {'StringValue': str(service_id), 'DataType': 'String'},
|
||||
'template_id': {'StringValue': str(template_id), 'DataType': 'String'}})
|
||||
return notification_id
|
||||
@@ -1,7 +1,5 @@
|
||||
import random
|
||||
|
||||
import botocore
|
||||
from boto3 import resource
|
||||
from datetime import (datetime)
|
||||
|
||||
from flask import current_app
|
||||
@@ -24,8 +22,7 @@ from app.dao.jobs_dao import (
|
||||
all_notifications_are_created_for_job,
|
||||
dao_get_all_notifications_for_job,
|
||||
dao_update_job_status)
|
||||
from app.dao.notifications_dao import get_notification_by_id, dao_update_notification, \
|
||||
dao_update_notifications_sent_to_dvla
|
||||
from app.dao.notifications_dao import get_notification_by_id, dao_update_notifications_sent_to_dvla
|
||||
from app.dao.provider_details_dao import get_current_provider
|
||||
from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
@@ -39,10 +36,11 @@ from app.models import (
|
||||
JOB_STATUS_IN_PROGRESS,
|
||||
JOB_STATUS_FINISHED,
|
||||
JOB_STATUS_READY_TO_SEND,
|
||||
JOB_STATUS_SENT_TO_DVLA, NOTIFICATION_SENDING, Notification)
|
||||
JOB_STATUS_SENT_TO_DVLA)
|
||||
from app.notifications.process_notifications import persist_notification
|
||||
from app.service.utils import service_allowed_to_send_to
|
||||
from app.statsd_decorators import statsd
|
||||
from notifications_utils.s3 import s3upload
|
||||
|
||||
|
||||
@notify_celery.task(name="process-job")
|
||||
@@ -272,7 +270,7 @@ def persist_letter(
|
||||
handle_exception(self, notification, notification_id, e)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="build-dvla-file", countdown=30, max_retries=15, default_retry_delay=300)
|
||||
@notify_celery.task(bind=True, name="build-dvla-file", countdown=60, max_retries=15, default_retry_delay=300)
|
||||
@statsd(namespace="tasks")
|
||||
def build_dvla_file(self, job_id):
|
||||
try:
|
||||
@@ -285,6 +283,7 @@ def build_dvla_file(self, job_id):
|
||||
file_location="{}-dvla-job.text".format(job_id)
|
||||
)
|
||||
dao_update_job_status(job_id, JOB_STATUS_READY_TO_SEND)
|
||||
notify_celery.send_task("aggregrate-dvla-files", ([str(job_id)], ), queue='aggregate-dvla-files')
|
||||
else:
|
||||
current_app.logger.info("All notifications for job {} are not persisted".format(job_id))
|
||||
self.retry(queue="retry", exc="All notifications for job {} are not persisted".format(job_id))
|
||||
@@ -322,37 +321,6 @@ def create_dvla_file_contents(job_id):
|
||||
return file_contents
|
||||
|
||||
|
||||
def s3upload(filedata, region, bucket_name, file_location):
|
||||
# TODO: move this method to utils. Will need to change the filedata from here to send contents in filedata['data']
|
||||
_s3 = resource('s3')
|
||||
# contents = filedata['data']
|
||||
contents = filedata
|
||||
|
||||
exists = True
|
||||
try:
|
||||
_s3.meta.client.head_bucket(
|
||||
Bucket=bucket_name)
|
||||
except botocore.exceptions.ClientError as e:
|
||||
error_code = int(e.response['Error']['Code'])
|
||||
if error_code == 404:
|
||||
exists = False
|
||||
else:
|
||||
current_app.logger.error(
|
||||
"Unable to create s3 bucket {}".format(bucket_name))
|
||||
raise e
|
||||
|
||||
if not exists:
|
||||
_s3.create_bucket(Bucket=bucket_name,
|
||||
CreateBucketConfiguration={'LocationConstraint': region})
|
||||
|
||||
upload_id = create_uuid()
|
||||
upload_file_name = file_location
|
||||
key = _s3.Object(bucket_name, upload_file_name)
|
||||
key.put(Body=contents, ServerSideEncryption='AES256')
|
||||
|
||||
return upload_id
|
||||
|
||||
|
||||
def handle_exception(task, notification, notification_id, exc):
|
||||
if not get_notification_by_id(notification_id):
|
||||
retry_msg = '{task} notification for job {job} row number {row} and notification id {noti}'.format(
|
||||
|
||||
@@ -29,6 +29,6 @@ notifications-python-client>=3.1,<3.2
|
||||
awscli>=1.11,<1.12
|
||||
awscli-cwlogs>=1.4,<1.5
|
||||
|
||||
git+https://github.com/alphagov/notifications-utils.git@15.0.2#egg=notifications-utils==15.0.2
|
||||
git+https://github.com/alphagov/notifications-utils.git@15.0.4#egg=notifications-utils==15.0.4
|
||||
|
||||
git+https://github.com/alphagov/boto.git@2.43.0-patch3#egg=boto==2.43.0-patch3
|
||||
|
||||
@@ -979,6 +979,7 @@ def test_build_dvla_file(sample_letter_template, mocker):
|
||||
create_notification(template=job.template, job=job)
|
||||
create_notification(template=job.template, job=job)
|
||||
mocked_upload = mocker.patch("app.celery.tasks.s3upload")
|
||||
mocked_send_task = mocker.patch("app.celery.tasks.notify_celery.send_task")
|
||||
mocked_letter_template = mocker.patch("app.celery.tasks.LetterDVLATemplate")
|
||||
mocked_letter_template_instance = mocked_letter_template.return_value
|
||||
mocked_letter_template_instance.__str__.return_value = "dvla|string"
|
||||
@@ -991,6 +992,7 @@ def test_build_dvla_file(sample_letter_template, mocker):
|
||||
file_location="{}-dvla-job.text".format(job.id)
|
||||
)
|
||||
assert Job.query.get(job.id).job_status == 'ready to send'
|
||||
mocked_send_task.assert_called_once_with("aggregrate-dvla-files", ([str(job.id)], ), queue='aggregate-dvla-files')
|
||||
|
||||
|
||||
def test_build_dvla_file_retries_if_all_notifications_are_not_created(sample_letter_template, mocker):
|
||||
@@ -998,6 +1000,7 @@ def test_build_dvla_file_retries_if_all_notifications_are_not_created(sample_let
|
||||
create_notification(template=job.template, job=job)
|
||||
|
||||
mocked = mocker.patch("app.celery.tasks.s3upload")
|
||||
mocked_send_task = mocker.patch("app.celery.tasks.notify_celery.send_task")
|
||||
mocker.patch('app.celery.tasks.build_dvla_file.retry', side_effect=Retry)
|
||||
with pytest.raises(Retry):
|
||||
build_dvla_file(job.id)
|
||||
@@ -1006,6 +1009,7 @@ def test_build_dvla_file_retries_if_all_notifications_are_not_created(sample_let
|
||||
tasks.build_dvla_file.retry.assert_called_with(queue='retry',
|
||||
exc="All notifications for job {} are not persisted".format(job.id))
|
||||
assert Job.query.get(job.id).job_status == 'in progress'
|
||||
mocked_send_task.assert_not_called()
|
||||
|
||||
|
||||
def test_create_dvla_file_contents(sample_letter_template, mocker):
|
||||
|
||||
Reference in New Issue
Block a user