diff --git a/app/aws/s3.py b/app/aws/s3.py index 48a54e709..927eb0ad1 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -72,7 +72,7 @@ def remove_s3_object(bucket_name, object_key): def remove_transformed_dvla_file(job_id): - bucket_name = current_app.config['DVLA_UPLOAD_BUCKET_NAME'] + bucket_name = current_app.config['DVLA_BUCKETS']['job'] file_location = '{}-dvla-job.text'.format(job_id) obj = get_s3_object(bucket_name, file_location) return obj.delete() diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index d23eaee0a..ca429a2c0 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -28,7 +28,9 @@ from app.dao.notifications_dao import ( is_delivery_slow_for_provider, delete_notifications_created_more_than_a_week_ago_by_type, dao_get_scheduled_notifications, - set_scheduled_notification_to_processed) + set_scheduled_notification_to_processed, + dao_set_created_live_letter_api_notifications_to_pending, +) from app.dao.statistics_dao import dao_timeout_job_statistics from app.dao.provider_details_dao import ( get_current_provider, @@ -325,21 +327,28 @@ def run_letter_jobs(): @notify_celery.task(name="run-letter-notifications") @statsd(namespace="tasks") def run_letter_notifications(): - notifications = dao_get_created_letter_api_notifications_that_dont_belong_to_jobs() + current_time = datetime.utcnow().isoformat() + + notifications = dao_set_created_live_letter_api_notifications_to_pending() file_contents = create_dvla_file_contents_for_notifications(notifications) + + file_location = '{}-dvla-notifications.txt'.format(current_time) s3upload( filedata=file_contents + '\n', region=current_app.config['AWS_REGION'], - bucket_name=current_app.config['DVLA_UPLOAD_BUCKET_NAME'], - file_location='2017-09-12-dvla-notifications.txt' + bucket_name=current_app.config['DVLA_BUCKETS']['notification'], + file_location=file_location ) - # set noti statuses to pending or something - notify_celery.send_task( name=TaskNames.DVLA_NOTIFICATIONS, - kwargs={'date': '2017-09-12'}, + kwargs={'filename': file_location}, queue=QueueNames.PROCESS_FTP ) - current_app.logger.info("Queued {} ready letter job ids onto {}".format(len(notifications), QueueNames.PROCESS_FTP)) + current_app.logger.info( + "Queued {} ready letter api notifications onto {}".format( + len(notifications), + QueueNames.PROCESS_FTP + ) + ) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 71896224b..95d0c4a57 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -27,7 +27,7 @@ from app.dao.jobs_dao import ( all_notifications_are_created_for_job, dao_get_all_notifications_for_job, dao_update_job_status) -from app.dao.notifications_dao import get_notification_by_id, dao_update_notifications_sent_to_dvla +from app.dao.notifications_dao import get_notification_by_id, dao_update_notifications_for_job_to_sent_to_dvla from app.dao.provider_details_dao import get_current_provider from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count @@ -279,11 +279,11 @@ def persist_letter( def build_dvla_file(self, job_id): try: if all_notifications_are_created_for_job(job_id): - file_contents = create_dvla_file_contents(job_id) + file_contents = create_dvla_file_contents_for_job(job_id) s3upload( filedata=file_contents + '\n', region=current_app.config['AWS_REGION'], - bucket_name=current_app.config['DVLA_UPLOAD_BUCKET_NAME'], + bucket_name=current_app.config['DVLA_BUCKETS']['job'], file_location="{}-dvla-job.text".format(job_id) ) dao_update_job_status(job_id, JOB_STATUS_READY_TO_SEND) @@ -302,15 +302,22 @@ def update_job_to_sent_to_dvla(self, job_id): # and update all notifications for this job to sending, provider = DVLA provider = get_current_provider(LETTER_TYPE) - updated_count = dao_update_notifications_sent_to_dvla(job_id, provider.identifier) + updated_count = dao_update_notifications_for_job_to_sent_to_dvla(job_id, provider.identifier) dao_update_job_status(job_id, JOB_STATUS_SENT_TO_DVLA) current_app.logger.info("Updated {} letter notifications to sending. " "Updated {} job to {}".format(updated_count, job_id, JOB_STATUS_SENT_TO_DVLA)) +@notify_celery.task(bind=True, name='update-notifications-to-sent') +@statsd(namespace="tasks") def update_notifications_to_sent_to_dvla(self, notification_references): + # This task will be called by the FTP app to update notifications as sent to DVLA + provider = get_current_provider(LETTER_TYPE) + updated_count = dao_update_notifications_for_job_to_sent_to_dvla(references, provider.identifier) + + current_app.logger.info("Updated {} letter notifications to sending. ".format(updated_count)) @notify_celery.task(bind=True, name='update-letter-job-to-error') diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 17e388422..b390d5abf 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -460,7 +460,22 @@ def is_delivery_slow_for_provider( @statsd(namespace="dao") @transactional -def dao_update_notifications_sent_to_dvla(job_id, provider): +def dao_update_notifications_for_job_to_sent_to_dvla(job_id, provider): + now = datetime.utcnow() + updated_count = db.session.query( + Notification).filter(Notification.job_id == job_id).update( + {'status': NOTIFICATION_SENDING, "sent_by": provider, "sent_at": now}) + + db.session.query( + NotificationHistory).filter(NotificationHistory.job_id == job_id).update( + {'status': NOTIFICATION_SENDING, "sent_by": provider, "sent_at": now, "updated_at": now}) + + return updated_count + + +@statsd(namespace="dao") +@transactional +def dao_update_notifications_by_reference_sent_to_dvla(notification_references, provider): now = datetime.utcnow() updated_count = db.session.query( Notification).filter(Notification.job_id == job_id).update( @@ -555,3 +570,29 @@ def dao_get_total_notifications_sent_per_day_for_performance_platform(start_date NotificationHistory.key_type != KEY_TYPE_TEST, NotificationHistory.notification_type != LETTER_TYPE ).one() + + +def dao_set_created_live_letter_api_notifications_to_pending(): + """ + Sets all past scheduled jobs to pending, and then returns them for further processing. + + this is used in the run_scheduled_jobs task, so we put a FOR UPDATE lock on the job table for the duration of + the transaction so that if the task is run more than once concurrently, one task will block the other select + from completing until it commits. + """ + return db.session.query( + Notification.id + ).filter( + Notification.notification_type == LETTER_TYPE, + Notification.status == NOTIFICATION_CREATED, + Notification.key_type == KEY_TYPE_NORMAL, + ).with_for_update( + ).all() + + for notification in notifications: + notification.notification_status = NOTIFICATION_PENDING + + db.session.add_all(notifications) + db.session.commit() + + return notifications diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py index b44d66fc8..d6e4c1b75 100644 --- a/tests/app/aws/test_s3.py +++ b/tests/app/aws/test_s3.py @@ -39,7 +39,7 @@ def test_remove_transformed_dvla_file_makes_correct_call(notify_api, mocker): remove_transformed_dvla_file(fake_uuid) s3_mock.assert_has_calls([ - call(current_app.config['DVLA_UPLOAD_BUCKET_NAME'], '{}-dvla-job.text'.format(fake_uuid)), + call(current_app.config['DVLA_BUCKETS']['job'], '{}-dvla-job.text'.format(fake_uuid)), call().delete() ]) diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index e17b1e05a..48b4ac3b7 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1058,7 +1058,7 @@ def test_build_dvla_file(sample_letter_template, mocker): mocked_upload.assert_called_once_with( filedata="dvla|string\ndvla|string\n", region=current_app.config['AWS_REGION'], - bucket_name=current_app.config['DVLA_UPLOAD_BUCKET_NAME'], + bucket_name=current_app.config['DVLA_BUCKETS']['job'], file_location="{}-dvla-job.text".format(job.id) ) assert Job.query.get(job.id).job_status == 'ready to send' diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index 7d191da56..2b4e9479c 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -40,7 +40,7 @@ from app.dao.notifications_dao import ( dao_delete_notifications_and_history_by_id, dao_timeout_notifications, is_delivery_slow_for_provider, - dao_update_notifications_sent_to_dvla, + dao_update_notifications_for_job_to_sent_to_dvla, dao_get_notifications_by_to_field, dao_created_scheduled_notification, dao_get_scheduled_notifications, set_scheduled_notification_to_processed) @@ -1713,11 +1713,11 @@ def test_slow_provider_delivery_does_not_return_for_standard_delivery_time( assert not slow_delivery -def test_dao_update_notifications_sent_to_dvla(notify_db, notify_db_session, sample_letter_template): +def test_dao_update_notifications_for_job_to_sent_to_dvla(notify_db, notify_db_session, sample_letter_template): job = sample_job(notify_db=notify_db, notify_db_session=notify_db_session, template=sample_letter_template) notification = create_notification(template=sample_letter_template, job=job) - updated_count = dao_update_notifications_sent_to_dvla(job_id=job.id, provider='some provider') + updated_count = dao_update_notifications_for_job_to_sent_to_dvla(job_id=job.id, provider='some provider') assert updated_count == 1 updated_notification = Notification.query.get(notification.id) @@ -1732,7 +1732,7 @@ def test_dao_update_notifications_sent_to_dvla(notify_db, notify_db_session, sam assert history.updated_at -def test_dao_update_notifications_sent_to_dvla_does_update_history_if_test_key(sample_letter_job): +def test_dao_update_notifications_for_job_to_sent_to_dvla_does_update_history_if_test_key(sample_letter_job): api_key = create_api_key(sample_letter_job.service, key_type=KEY_TYPE_TEST) notification = create_notification( sample_letter_job.template, @@ -1740,7 +1740,10 @@ def test_dao_update_notifications_sent_to_dvla_does_update_history_if_test_key(s api_key=api_key ) - updated_count = dao_update_notifications_sent_to_dvla(job_id=sample_letter_job.id, provider='some provider') + updated_count = dao_update_notifications_for_job_to_sent_to_dvla( + job_id=sample_letter_job.id, + provider='some provider' + ) assert updated_count == 1 updated_notification = Notification.query.get(notification.id)