set letter notifications to pending while notify-ftp does its stuff

this means that if the task is accidentally ran twice (eg we autoscale
notify-celery-worker-beat to 2), it won't send letters twice.

Additionally, update some function names and config variables to make
it clear that they are referring to letter jobs, rather than all letter
content
This commit is contained in:
Leo Hemsted
2017-09-15 17:46:08 +01:00
parent f61ccd8ff0
commit 7dd3c1df5a
7 changed files with 81 additions and 21 deletions

View File

@@ -72,7 +72,7 @@ def remove_s3_object(bucket_name, object_key):
def remove_transformed_dvla_file(job_id):
bucket_name = current_app.config['DVLA_UPLOAD_BUCKET_NAME']
bucket_name = current_app.config['DVLA_BUCKETS']['job']
file_location = '{}-dvla-job.text'.format(job_id)
obj = get_s3_object(bucket_name, file_location)
return obj.delete()

View File

@@ -28,7 +28,9 @@ from app.dao.notifications_dao import (
is_delivery_slow_for_provider,
delete_notifications_created_more_than_a_week_ago_by_type,
dao_get_scheduled_notifications,
set_scheduled_notification_to_processed)
set_scheduled_notification_to_processed,
dao_set_created_live_letter_api_notifications_to_pending,
)
from app.dao.statistics_dao import dao_timeout_job_statistics
from app.dao.provider_details_dao import (
get_current_provider,
@@ -325,21 +327,28 @@ def run_letter_jobs():
@notify_celery.task(name="run-letter-notifications")
@statsd(namespace="tasks")
def run_letter_notifications():
notifications = dao_get_created_letter_api_notifications_that_dont_belong_to_jobs()
current_time = datetime.utcnow().isoformat()
notifications = dao_set_created_live_letter_api_notifications_to_pending()
file_contents = create_dvla_file_contents_for_notifications(notifications)
file_location = '{}-dvla-notifications.txt'.format(current_time)
s3upload(
filedata=file_contents + '\n',
region=current_app.config['AWS_REGION'],
bucket_name=current_app.config['DVLA_UPLOAD_BUCKET_NAME'],
file_location='2017-09-12-dvla-notifications.txt'
bucket_name=current_app.config['DVLA_BUCKETS']['notification'],
file_location=file_location
)
# set noti statuses to pending or something
notify_celery.send_task(
name=TaskNames.DVLA_NOTIFICATIONS,
kwargs={'date': '2017-09-12'},
kwargs={'filename': file_location},
queue=QueueNames.PROCESS_FTP
)
current_app.logger.info("Queued {} ready letter job ids onto {}".format(len(notifications), QueueNames.PROCESS_FTP))
current_app.logger.info(
"Queued {} ready letter api notifications onto {}".format(
len(notifications),
QueueNames.PROCESS_FTP
)
)

View File

@@ -27,7 +27,7 @@ from app.dao.jobs_dao import (
all_notifications_are_created_for_job,
dao_get_all_notifications_for_job,
dao_update_job_status)
from app.dao.notifications_dao import get_notification_by_id, dao_update_notifications_sent_to_dvla
from app.dao.notifications_dao import get_notification_by_id, dao_update_notifications_for_job_to_sent_to_dvla
from app.dao.provider_details_dao import get_current_provider
from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service
from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count
@@ -279,11 +279,11 @@ def persist_letter(
def build_dvla_file(self, job_id):
try:
if all_notifications_are_created_for_job(job_id):
file_contents = create_dvla_file_contents(job_id)
file_contents = create_dvla_file_contents_for_job(job_id)
s3upload(
filedata=file_contents + '\n',
region=current_app.config['AWS_REGION'],
bucket_name=current_app.config['DVLA_UPLOAD_BUCKET_NAME'],
bucket_name=current_app.config['DVLA_BUCKETS']['job'],
file_location="{}-dvla-job.text".format(job_id)
)
dao_update_job_status(job_id, JOB_STATUS_READY_TO_SEND)
@@ -302,15 +302,22 @@ def update_job_to_sent_to_dvla(self, job_id):
# and update all notifications for this job to sending, provider = DVLA
provider = get_current_provider(LETTER_TYPE)
updated_count = dao_update_notifications_sent_to_dvla(job_id, provider.identifier)
updated_count = dao_update_notifications_for_job_to_sent_to_dvla(job_id, provider.identifier)
dao_update_job_status(job_id, JOB_STATUS_SENT_TO_DVLA)
current_app.logger.info("Updated {} letter notifications to sending. "
"Updated {} job to {}".format(updated_count, job_id, JOB_STATUS_SENT_TO_DVLA))
@notify_celery.task(bind=True, name='update-notifications-to-sent')
@statsd(namespace="tasks")
def update_notifications_to_sent_to_dvla(self, notification_references):
# This task will be called by the FTP app to update notifications as sent to DVLA
provider = get_current_provider(LETTER_TYPE)
updated_count = dao_update_notifications_for_job_to_sent_to_dvla(references, provider.identifier)
current_app.logger.info("Updated {} letter notifications to sending. ".format(updated_count))
@notify_celery.task(bind=True, name='update-letter-job-to-error')

View File

@@ -460,7 +460,22 @@ def is_delivery_slow_for_provider(
@statsd(namespace="dao")
@transactional
def dao_update_notifications_sent_to_dvla(job_id, provider):
def dao_update_notifications_for_job_to_sent_to_dvla(job_id, provider):
now = datetime.utcnow()
updated_count = db.session.query(
Notification).filter(Notification.job_id == job_id).update(
{'status': NOTIFICATION_SENDING, "sent_by": provider, "sent_at": now})
db.session.query(
NotificationHistory).filter(NotificationHistory.job_id == job_id).update(
{'status': NOTIFICATION_SENDING, "sent_by": provider, "sent_at": now, "updated_at": now})
return updated_count
@statsd(namespace="dao")
@transactional
def dao_update_notifications_by_reference_sent_to_dvla(notification_references, provider):
now = datetime.utcnow()
updated_count = db.session.query(
Notification).filter(Notification.job_id == job_id).update(
@@ -555,3 +570,29 @@ def dao_get_total_notifications_sent_per_day_for_performance_platform(start_date
NotificationHistory.key_type != KEY_TYPE_TEST,
NotificationHistory.notification_type != LETTER_TYPE
).one()
def dao_set_created_live_letter_api_notifications_to_pending():
"""
Sets all past scheduled jobs to pending, and then returns them for further processing.
this is used in the run_scheduled_jobs task, so we put a FOR UPDATE lock on the job table for the duration of
the transaction so that if the task is run more than once concurrently, one task will block the other select
from completing until it commits.
"""
return db.session.query(
Notification.id
).filter(
Notification.notification_type == LETTER_TYPE,
Notification.status == NOTIFICATION_CREATED,
Notification.key_type == KEY_TYPE_NORMAL,
).with_for_update(
).all()
for notification in notifications:
notification.notification_status = NOTIFICATION_PENDING
db.session.add_all(notifications)
db.session.commit()
return notifications