Add task to process a DVLA response file:

* Currently we do nothing with the parsed response. We will
* update the status of the notifications in a separate PR
This commit is contained in:
Imdad Ahad
2017-05-04 10:31:18 +01:00
parent 4d0b90bbe2
commit f766f90207
4 changed files with 77 additions and 12 deletions

View File

@@ -4,20 +4,24 @@ from flask import current_app
FILE_LOCATION_STRUCTURE = 'service-{}-notify/{}.csv'
def get_s3_job_object(bucket_name, file_location):
def get_s3_object(bucket_name, file_location):
s3 = resource('s3')
return s3.Object(bucket_name, file_location)
s3_object = s3.Object(bucket_name, file_location)
return s3_object.get()['Body'].read()
def get_job_from_s3(service_id, job_id):
bucket_name = current_app.config['CSV_UPLOAD_BUCKET_NAME']
file_location = FILE_LOCATION_STRUCTURE.format(service_id, job_id)
obj = get_s3_job_object(bucket_name, file_location)
return obj.get()['Body'].read().decode('utf-8')
job = _job_from_s3(service_id, job_id)
return job
def remove_job_from_s3(service_id, job_id):
job = _job_from_s3(service_id, job_id)
return job.delete()
def _job_from_s3():
bucket_name = current_app.config['CSV_UPLOAD_BUCKET_NAME']
file_location = FILE_LOCATION_STRUCTURE.format(service_id, job_id)
obj = get_s3_job_object(bucket_name, file_location)
return obj.delete()
obj = get_s3_object(bucket_name, file_location).decode('utf-8')
return obj

View File

@@ -1,6 +1,7 @@
import random
from datetime import (datetime)
from collections import namedtuple
from flask import current_app
from notifications_utils.recipients import (
@@ -23,7 +24,10 @@ from app.dao.jobs_dao import (
all_notifications_are_created_for_job,
dao_get_all_notifications_for_job,
dao_update_job_status)
from app.dao.notifications_dao import get_notification_by_id, dao_update_notifications_sent_to_dvla
from app.dao.notifications_dao import (
get_notification_by_id,
dao_update_notifications_sent_to_dvla
)
from app.dao.provider_details_dao import get_current_provider
from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count
from app.dao.templates_dao import dao_get_template_by_id
@@ -354,3 +358,29 @@ def get_template_class(template_type):
# since we don't need rendering capabilities (we only need to extract placeholders) both email and letter can
# use the same base template
return WithSubjectTemplate
@notify_celery.task(bind=True, name='update-letter-notifications-statuses')
@statsd(namespace="tasks")
def update_letter_notifications_statuses(self, filename):
response_file = s3.get_s3_object('development-notifications-csv-upload', filename).decode('utf-8')
lines = response_file.splitlines()
notification_updates = []
try:
NotificationUpdate = namedtuple('NotificationUpdate', ['reference', 'status', 'page_count', 'cost_threshold'])
for line in lines:
notification_updates.append(NotificationUpdate(*line.split('|')))
except TypeError:
current_app.logger.exception('DVLA response file has an invalid format')
raise
else:
if notification_updates:
for update in notification_updates:
current_app.logger.error(str(update))
# TODO: Update notifications with desired status
return notification_updates
else:
current_app.logger.exception('DVLA response file contained no updates')

View File

@@ -27,7 +27,8 @@ from app.models import (
NOTIFICATION_PERMANENT_FAILURE,
KEY_TYPE_NORMAL, KEY_TYPE_TEST,
LETTER_TYPE,
NOTIFICATION_SENT)
NOTIFICATION_SENT
)
from app.dao.dao_utils import transactional
from app.statsd_decorators import statsd

View File

@@ -12,15 +12,19 @@ from celery.exceptions import Retry
from app import (encryption, DATETIME_FORMAT)
from app.celery import provider_tasks
from app.celery import tasks
from app.celery.tasks import s3, build_dvla_file, create_dvla_file_contents, update_dvla_job_to_error
from app.celery.tasks import (
s3,
build_dvla_file,
create_dvla_file_contents,
update_dvla_job_to_error,
process_job,
process_row,
send_sms,
send_email,
persist_letter,
get_template_class,
update_job_to_sent_to_dvla
update_job_to_sent_to_dvla,
update_letter_notifications_statuses
)
from app.dao import jobs_dao, services_dao
from app.models import (
@@ -1071,3 +1075,29 @@ def test_update_dvla_job_to_error(sample_letter_template, sample_letter_job):
assert not n.sent_by
assert 'error' == Job.query.filter_by(id=sample_letter_job.id).one().job_status
def test_update_letter_notifications_statuses_raises_for_invalid_format(notify_api, mocker):
invalid_file = b'ref-foo|Sent|1|Unsorted\nref-bar|Sent|2'
mocker.patch('app.celery.tasks.s3.get_s3_object', return_value=invalid_file)
with pytest.raises(TypeError):
update_letter_notifications_statuses(filename='foo.txt')
def test_update_letter_notifications_statuses_builds_updates_list(notify_api, mocker):
valid_file = b'ref-foo|Sent|1|Unsorted\nref-bar|Sent|2|Sorted'
mocker.patch('app.celery.tasks.s3.get_s3_object', return_value=valid_file)
updates = update_letter_notifications_statuses(filename='foo.txt')
assert len(updates) == 2
assert updates[0].reference == 'ref-foo'
assert updates[0].status == 'Sent'
assert updates[0].page_count == '1'
assert updates[0].cost_threshold == 'Unsorted'
assert updates[1].reference == 'ref-bar'
assert updates[1].status == 'Sent'
assert updates[1].page_count == '2'
assert updates[1].cost_threshold == 'Sorted'