From 6f6061455c8a98835070c4f296c2fe53278900ee Mon Sep 17 00:00:00 2001 From: Kenneth Kehl Date: Tue, 23 May 2023 08:31:30 -0700 Subject: [PATCH] notify-162 delete incomplete s3 uploads (#276) Co-authored-by: Kenneth Kehl <@kkehl@flexion.us> --- app/aws/s3.py | 11 +++++++++++ app/celery/nightly_tasks.py | 16 ++++++++++++++++ app/commands.py | 7 +++++++ app/config.py | 5 +++++ app/dao/jobs_dao.py | 4 ++++ tests/app/celery/test_nightly_tasks.py | 17 ++++++++++++++++- 6 files changed, 59 insertions(+), 1 deletion(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index d7d7da139..d48cbd083 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -65,3 +65,14 @@ def remove_job_from_s3(service_id, job_id): def remove_s3_object(bucket_name, object_key, access_key, secret_key, region): obj = get_s3_object(bucket_name, object_key, access_key, secret_key, region) return obj.delete() + + +def remove_csv_object(object_key): + obj = get_s3_object( + current_app.config['CSV_UPLOAD_BUCKET']['bucket'], + object_key, + current_app.config['CSV_UPLOAD_BUCKET']['access_key_id'], + current_app.config['CSV_UPLOAD_BUCKET']['secret_access_key'], + current_app.config['CSV_UPLOAD_BUCKET']['region'] + ) + return obj.delete() diff --git a/app/celery/nightly_tasks.py b/app/celery/nightly_tasks.py index ce98dd27c..253291fe2 100644 --- a/app/celery/nightly_tasks.py +++ b/app/celery/nightly_tasks.py @@ -6,6 +6,7 @@ from sqlalchemy.exc import SQLAlchemyError from app import notify_celery from app.aws import s3 +from app.aws.s3 import remove_csv_object from app.celery.process_ses_receipts_tasks import check_and_queue_callback_task from app.config import QueueNames from app.cronitor import cronitor @@ -14,6 +15,7 @@ from app.dao.inbound_sms_dao import delete_inbound_sms_older_than_retention from app.dao.jobs_dao import ( dao_archive_job, dao_get_jobs_older_than_data_retention, + dao_get_unfinished_jobs, ) from app.dao.notifications_dao import ( dao_get_notifications_processing_time_stats, @@ -42,6 +44,19 @@ def _remove_csv_files(job_types): current_app.logger.info("Job ID {} has been removed from s3.".format(job.id)) +@notify_celery.task(name="cleanup-unfinished-jobs") +def cleanup_unfinished_jobs(): + now = datetime.utcnow() + jobs = dao_get_unfinished_jobs() + for job in jobs: + # The query already checks that the processing_finished time is null, so here we are saying + # if it started more than 4 hours ago, that's too long + acceptable_finish_time = job.processing_started + timedelta(minutes=5) + if now > acceptable_finish_time: + remove_csv_object(job.original_file_name) + dao_archive_job(job) + + @notify_celery.task(name="delete-notifications-older-than-retention") def delete_notifications_older_than_retention(): delete_email_notifications_older_than_retention.apply_async(queue=QueueNames.REPORTING) @@ -161,6 +176,7 @@ def delete_inbound_sms(): @notify_celery.task(name='save-daily-notification-processing-time') @cronitor("save-daily-notification-processing-time") def save_daily_notification_processing_time(local_date=None): + # local_date is a string in the format of "YYYY-MM-DD" if local_date is None: # if a date is not provided, we run against yesterdays data diff --git a/app/commands.py b/app/commands.py index 7a8c99d7b..18ac0ae6f 100644 --- a/app/commands.py +++ b/app/commands.py @@ -18,6 +18,7 @@ from sqlalchemy.orm.exc import NoResultFound from app import db from app.aws import s3 +from app.celery.nightly_tasks import cleanup_unfinished_jobs from app.celery.tasks import process_row from app.dao.annual_billing_dao import ( dao_create_or_update_annual_billing_for_year, @@ -464,6 +465,12 @@ def fix_billable_units(): print("End fix_billable_units") +@notify_command(name='delete-unfinished-jobs') +def delete_unfinished_jobs(): + cleanup_unfinished_jobs() + print("End cleanup_unfinished_jobs") + + @notify_command(name='process-row-from-job') @click.option('-j', '--job_id', required=True, help='Job id') @click.option('-n', '--job_row_number', type=int, required=True, help='Job id') diff --git a/app/config.py b/app/config.py index d1762168b..2e9f8cf69 100644 --- a/app/config.py +++ b/app/config.py @@ -240,6 +240,11 @@ class Config(object): 'schedule': crontab(hour=2, minute=0), 'options': {'queue': QueueNames.PERIODIC} }, + 'cleanup-unfinished-jobs': { + 'task': 'cleanup-unfinished-jobs', + 'schedule': crontab(hour=0, minute=5), + 'options': {'queue': QueueNames.PERIODIC} + }, 'remove_sms_email_jobs': { 'task': 'remove_sms_email_jobs', 'schedule': crontab(hour=4, minute=0), diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index da829171b..8fd7f22c0 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -43,6 +43,10 @@ def dao_get_job_by_service_id_and_job_id(service_id, job_id): return Job.query.filter_by(service_id=service_id, id=job_id).one() +def dao_get_unfinished_jobs(): + return Job.query.filter(Job.processing_finished.is_(None)).all() + + def dao_get_jobs_by_service_id( service_id, *, diff --git a/tests/app/celery/test_nightly_tasks.py b/tests/app/celery/test_nightly_tasks.py index 0599799e4..dd1191ae9 100644 --- a/tests/app/celery/test_nightly_tasks.py +++ b/tests/app/celery/test_nightly_tasks.py @@ -7,6 +7,7 @@ from freezegun import freeze_time from app.celery import nightly_tasks from app.celery.nightly_tasks import ( _delete_notifications_older_than_retention_by_type, + cleanup_unfinished_jobs, delete_email_notifications_older_than_retention, delete_inbound_sms, delete_sms_notifications_older_than_retention, @@ -15,7 +16,7 @@ from app.celery.nightly_tasks import ( save_daily_notification_processing_time, timeout_notifications, ) -from app.models import EMAIL_TYPE, SMS_TYPE, FactProcessingTime +from app.models import EMAIL_TYPE, SMS_TYPE, FactProcessingTime, Job from tests.app.db import ( create_job, create_notification, @@ -313,3 +314,17 @@ def test_delete_notifications_task_calls_task_for_services_that_have_sent_notifi 'datetime_to_delete_before': datetime(2021, 3, 26, 4, 0) }), ]) + + +def test_cleanup_unfinished_jobs(mocker): + mock_s3 = mocker.patch('app.celery.nightly_tasks.remove_csv_object') + mock_dao_archive = mocker.patch('app.celery.nightly_tasks.dao_archive_job') + mock_dao = mocker.patch('app.celery.nightly_tasks.dao_get_unfinished_jobs') + mock_job_unfinished = Job() + mock_job_unfinished.processing_started = datetime(2023, 1, 1, 0, 0, 0) + mock_job_unfinished.original_file_name = "blah" + + mock_dao.return_value = [mock_job_unfinished] + cleanup_unfinished_jobs() + mock_s3.assert_called_once_with('blah') + mock_dao_archive.assert_called_once_with(mock_job_unfinished)