From 7dd3c1df5a51ed77adf64b4d801b138b4cf6b13b Mon Sep 17 00:00:00 2001 From: Leo Hemsted Date: Fri, 15 Sep 2017 17:46:08 +0100 Subject: [PATCH] set letter notifications to pending while notify-ftp does its stuff this means that if the task is accidentally ran twice (eg we autoscale notify-celery-worker-beat to 2), it won't send letters twice. Additionally, update some function names and config variables to make it clear that they are referring to letter jobs, rather than all letter content --- app/aws/s3.py | 2 +- app/celery/scheduled_tasks.py | 25 +++++++---- app/celery/tasks.py | 15 +++++-- app/dao/notifications_dao.py | 43 ++++++++++++++++++- tests/app/aws/test_s3.py | 2 +- tests/app/celery/test_tasks.py | 2 +- .../notification_dao/test_notification_dao.py | 13 +++--- 7 files changed, 81 insertions(+), 21 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index 48a54e709..927eb0ad1 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -72,7 +72,7 @@ def remove_s3_object(bucket_name, object_key): def remove_transformed_dvla_file(job_id): - bucket_name = current_app.config['DVLA_UPLOAD_BUCKET_NAME'] + bucket_name = current_app.config['DVLA_BUCKETS']['job'] file_location = '{}-dvla-job.text'.format(job_id) obj = get_s3_object(bucket_name, file_location) return obj.delete() diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index d23eaee0a..ca429a2c0 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -28,7 +28,9 @@ from app.dao.notifications_dao import ( is_delivery_slow_for_provider, delete_notifications_created_more_than_a_week_ago_by_type, dao_get_scheduled_notifications, - set_scheduled_notification_to_processed) + set_scheduled_notification_to_processed, + dao_set_created_live_letter_api_notifications_to_pending, +) from app.dao.statistics_dao import dao_timeout_job_statistics from app.dao.provider_details_dao import ( get_current_provider, @@ -325,21 +327,28 @@ def run_letter_jobs(): @notify_celery.task(name="run-letter-notifications") @statsd(namespace="tasks") def run_letter_notifications(): - notifications = dao_get_created_letter_api_notifications_that_dont_belong_to_jobs() + current_time = datetime.utcnow().isoformat() + + notifications = dao_set_created_live_letter_api_notifications_to_pending() file_contents = create_dvla_file_contents_for_notifications(notifications) + + file_location = '{}-dvla-notifications.txt'.format(current_time) s3upload( filedata=file_contents + '\n', region=current_app.config['AWS_REGION'], - bucket_name=current_app.config['DVLA_UPLOAD_BUCKET_NAME'], - file_location='2017-09-12-dvla-notifications.txt' + bucket_name=current_app.config['DVLA_BUCKETS']['notification'], + file_location=file_location ) - # set noti statuses to pending or something - notify_celery.send_task( name=TaskNames.DVLA_NOTIFICATIONS, - kwargs={'date': '2017-09-12'}, + kwargs={'filename': file_location}, queue=QueueNames.PROCESS_FTP ) - current_app.logger.info("Queued {} ready letter job ids onto {}".format(len(notifications), QueueNames.PROCESS_FTP)) + current_app.logger.info( + "Queued {} ready letter api notifications onto {}".format( + len(notifications), + QueueNames.PROCESS_FTP + ) + ) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 71896224b..95d0c4a57 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -27,7 +27,7 @@ from app.dao.jobs_dao import ( all_notifications_are_created_for_job, dao_get_all_notifications_for_job, dao_update_job_status) -from app.dao.notifications_dao import get_notification_by_id, dao_update_notifications_sent_to_dvla +from app.dao.notifications_dao import get_notification_by_id, dao_update_notifications_for_job_to_sent_to_dvla from app.dao.provider_details_dao import get_current_provider from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count @@ -279,11 +279,11 @@ def persist_letter( def build_dvla_file(self, job_id): try: if all_notifications_are_created_for_job(job_id): - file_contents = create_dvla_file_contents(job_id) + file_contents = create_dvla_file_contents_for_job(job_id) s3upload( filedata=file_contents + '\n', region=current_app.config['AWS_REGION'], - bucket_name=current_app.config['DVLA_UPLOAD_BUCKET_NAME'], + bucket_name=current_app.config['DVLA_BUCKETS']['job'], file_location="{}-dvla-job.text".format(job_id) ) dao_update_job_status(job_id, JOB_STATUS_READY_TO_SEND) @@ -302,15 +302,22 @@ def update_job_to_sent_to_dvla(self, job_id): # and update all notifications for this job to sending, provider = DVLA provider = get_current_provider(LETTER_TYPE) - updated_count = dao_update_notifications_sent_to_dvla(job_id, provider.identifier) + updated_count = dao_update_notifications_for_job_to_sent_to_dvla(job_id, provider.identifier) dao_update_job_status(job_id, JOB_STATUS_SENT_TO_DVLA) current_app.logger.info("Updated {} letter notifications to sending. " "Updated {} job to {}".format(updated_count, job_id, JOB_STATUS_SENT_TO_DVLA)) +@notify_celery.task(bind=True, name='update-notifications-to-sent') +@statsd(namespace="tasks") def update_notifications_to_sent_to_dvla(self, notification_references): + # This task will be called by the FTP app to update notifications as sent to DVLA + provider = get_current_provider(LETTER_TYPE) + updated_count = dao_update_notifications_for_job_to_sent_to_dvla(references, provider.identifier) + + current_app.logger.info("Updated {} letter notifications to sending. ".format(updated_count)) @notify_celery.task(bind=True, name='update-letter-job-to-error') diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 17e388422..b390d5abf 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -460,7 +460,22 @@ def is_delivery_slow_for_provider( @statsd(namespace="dao") @transactional -def dao_update_notifications_sent_to_dvla(job_id, provider): +def dao_update_notifications_for_job_to_sent_to_dvla(job_id, provider): + now = datetime.utcnow() + updated_count = db.session.query( + Notification).filter(Notification.job_id == job_id).update( + {'status': NOTIFICATION_SENDING, "sent_by": provider, "sent_at": now}) + + db.session.query( + NotificationHistory).filter(NotificationHistory.job_id == job_id).update( + {'status': NOTIFICATION_SENDING, "sent_by": provider, "sent_at": now, "updated_at": now}) + + return updated_count + + +@statsd(namespace="dao") +@transactional +def dao_update_notifications_by_reference_sent_to_dvla(notification_references, provider): now = datetime.utcnow() updated_count = db.session.query( Notification).filter(Notification.job_id == job_id).update( @@ -555,3 +570,29 @@ def dao_get_total_notifications_sent_per_day_for_performance_platform(start_date NotificationHistory.key_type != KEY_TYPE_TEST, NotificationHistory.notification_type != LETTER_TYPE ).one() + + +def dao_set_created_live_letter_api_notifications_to_pending(): + """ + Sets all past scheduled jobs to pending, and then returns them for further processing. + + this is used in the run_scheduled_jobs task, so we put a FOR UPDATE lock on the job table for the duration of + the transaction so that if the task is run more than once concurrently, one task will block the other select + from completing until it commits. + """ + return db.session.query( + Notification.id + ).filter( + Notification.notification_type == LETTER_TYPE, + Notification.status == NOTIFICATION_CREATED, + Notification.key_type == KEY_TYPE_NORMAL, + ).with_for_update( + ).all() + + for notification in notifications: + notification.notification_status = NOTIFICATION_PENDING + + db.session.add_all(notifications) + db.session.commit() + + return notifications diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py index b44d66fc8..d6e4c1b75 100644 --- a/tests/app/aws/test_s3.py +++ b/tests/app/aws/test_s3.py @@ -39,7 +39,7 @@ def test_remove_transformed_dvla_file_makes_correct_call(notify_api, mocker): remove_transformed_dvla_file(fake_uuid) s3_mock.assert_has_calls([ - call(current_app.config['DVLA_UPLOAD_BUCKET_NAME'], '{}-dvla-job.text'.format(fake_uuid)), + call(current_app.config['DVLA_BUCKETS']['job'], '{}-dvla-job.text'.format(fake_uuid)), call().delete() ]) diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index e17b1e05a..48b4ac3b7 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1058,7 +1058,7 @@ def test_build_dvla_file(sample_letter_template, mocker): mocked_upload.assert_called_once_with( filedata="dvla|string\ndvla|string\n", region=current_app.config['AWS_REGION'], - bucket_name=current_app.config['DVLA_UPLOAD_BUCKET_NAME'], + bucket_name=current_app.config['DVLA_BUCKETS']['job'], file_location="{}-dvla-job.text".format(job.id) ) assert Job.query.get(job.id).job_status == 'ready to send' diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index 7d191da56..2b4e9479c 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -40,7 +40,7 @@ from app.dao.notifications_dao import ( dao_delete_notifications_and_history_by_id, dao_timeout_notifications, is_delivery_slow_for_provider, - dao_update_notifications_sent_to_dvla, + dao_update_notifications_for_job_to_sent_to_dvla, dao_get_notifications_by_to_field, dao_created_scheduled_notification, dao_get_scheduled_notifications, set_scheduled_notification_to_processed) @@ -1713,11 +1713,11 @@ def test_slow_provider_delivery_does_not_return_for_standard_delivery_time( assert not slow_delivery -def test_dao_update_notifications_sent_to_dvla(notify_db, notify_db_session, sample_letter_template): +def test_dao_update_notifications_for_job_to_sent_to_dvla(notify_db, notify_db_session, sample_letter_template): job = sample_job(notify_db=notify_db, notify_db_session=notify_db_session, template=sample_letter_template) notification = create_notification(template=sample_letter_template, job=job) - updated_count = dao_update_notifications_sent_to_dvla(job_id=job.id, provider='some provider') + updated_count = dao_update_notifications_for_job_to_sent_to_dvla(job_id=job.id, provider='some provider') assert updated_count == 1 updated_notification = Notification.query.get(notification.id) @@ -1732,7 +1732,7 @@ def test_dao_update_notifications_sent_to_dvla(notify_db, notify_db_session, sam assert history.updated_at -def test_dao_update_notifications_sent_to_dvla_does_update_history_if_test_key(sample_letter_job): +def test_dao_update_notifications_for_job_to_sent_to_dvla_does_update_history_if_test_key(sample_letter_job): api_key = create_api_key(sample_letter_job.service, key_type=KEY_TYPE_TEST) notification = create_notification( sample_letter_job.template, @@ -1740,7 +1740,10 @@ def test_dao_update_notifications_sent_to_dvla_does_update_history_if_test_key(s api_key=api_key ) - updated_count = dao_update_notifications_sent_to_dvla(job_id=sample_letter_job.id, provider='some provider') + updated_count = dao_update_notifications_for_job_to_sent_to_dvla( + job_id=sample_letter_job.id, + provider='some provider' + ) assert updated_count == 1 updated_notification = Notification.query.get(notification.id)