notify-162 delete incomplete s3 uploads (#276)

Co-authored-by: Kenneth Kehl <@kkehl@flexion.us>
This commit is contained in:
Kenneth Kehl
2023-05-23 08:31:30 -07:00
committed by GitHub
parent 49a73a2238
commit 6f6061455c
6 changed files with 59 additions and 1 deletions

View File

@@ -65,3 +65,14 @@ def remove_job_from_s3(service_id, job_id):
def remove_s3_object(bucket_name, object_key, access_key, secret_key, region):
obj = get_s3_object(bucket_name, object_key, access_key, secret_key, region)
return obj.delete()
def remove_csv_object(object_key):
obj = get_s3_object(
current_app.config['CSV_UPLOAD_BUCKET']['bucket'],
object_key,
current_app.config['CSV_UPLOAD_BUCKET']['access_key_id'],
current_app.config['CSV_UPLOAD_BUCKET']['secret_access_key'],
current_app.config['CSV_UPLOAD_BUCKET']['region']
)
return obj.delete()

View File

@@ -6,6 +6,7 @@ from sqlalchemy.exc import SQLAlchemyError
from app import notify_celery
from app.aws import s3
from app.aws.s3 import remove_csv_object
from app.celery.process_ses_receipts_tasks import check_and_queue_callback_task
from app.config import QueueNames
from app.cronitor import cronitor
@@ -14,6 +15,7 @@ from app.dao.inbound_sms_dao import delete_inbound_sms_older_than_retention
from app.dao.jobs_dao import (
dao_archive_job,
dao_get_jobs_older_than_data_retention,
dao_get_unfinished_jobs,
)
from app.dao.notifications_dao import (
dao_get_notifications_processing_time_stats,
@@ -42,6 +44,19 @@ def _remove_csv_files(job_types):
current_app.logger.info("Job ID {} has been removed from s3.".format(job.id))
@notify_celery.task(name="cleanup-unfinished-jobs")
def cleanup_unfinished_jobs():
now = datetime.utcnow()
jobs = dao_get_unfinished_jobs()
for job in jobs:
# The query already checks that the processing_finished time is null, so here we are saying
# if it started more than 4 hours ago, that's too long
acceptable_finish_time = job.processing_started + timedelta(minutes=5)
if now > acceptable_finish_time:
remove_csv_object(job.original_file_name)
dao_archive_job(job)
@notify_celery.task(name="delete-notifications-older-than-retention")
def delete_notifications_older_than_retention():
delete_email_notifications_older_than_retention.apply_async(queue=QueueNames.REPORTING)
@@ -161,6 +176,7 @@ def delete_inbound_sms():
@notify_celery.task(name='save-daily-notification-processing-time')
@cronitor("save-daily-notification-processing-time")
def save_daily_notification_processing_time(local_date=None):
# local_date is a string in the format of "YYYY-MM-DD"
if local_date is None:
# if a date is not provided, we run against yesterdays data

View File

@@ -18,6 +18,7 @@ from sqlalchemy.orm.exc import NoResultFound
from app import db
from app.aws import s3
from app.celery.nightly_tasks import cleanup_unfinished_jobs
from app.celery.tasks import process_row
from app.dao.annual_billing_dao import (
dao_create_or_update_annual_billing_for_year,
@@ -464,6 +465,12 @@ def fix_billable_units():
print("End fix_billable_units")
@notify_command(name='delete-unfinished-jobs')
def delete_unfinished_jobs():
cleanup_unfinished_jobs()
print("End cleanup_unfinished_jobs")
@notify_command(name='process-row-from-job')
@click.option('-j', '--job_id', required=True, help='Job id')
@click.option('-n', '--job_row_number', type=int, required=True, help='Job id')

View File

@@ -240,6 +240,11 @@ class Config(object):
'schedule': crontab(hour=2, minute=0),
'options': {'queue': QueueNames.PERIODIC}
},
'cleanup-unfinished-jobs': {
'task': 'cleanup-unfinished-jobs',
'schedule': crontab(hour=0, minute=5),
'options': {'queue': QueueNames.PERIODIC}
},
'remove_sms_email_jobs': {
'task': 'remove_sms_email_jobs',
'schedule': crontab(hour=4, minute=0),

View File

@@ -43,6 +43,10 @@ def dao_get_job_by_service_id_and_job_id(service_id, job_id):
return Job.query.filter_by(service_id=service_id, id=job_id).one()
def dao_get_unfinished_jobs():
return Job.query.filter(Job.processing_finished.is_(None)).all()
def dao_get_jobs_by_service_id(
service_id,
*,

View File

@@ -7,6 +7,7 @@ from freezegun import freeze_time
from app.celery import nightly_tasks
from app.celery.nightly_tasks import (
_delete_notifications_older_than_retention_by_type,
cleanup_unfinished_jobs,
delete_email_notifications_older_than_retention,
delete_inbound_sms,
delete_sms_notifications_older_than_retention,
@@ -15,7 +16,7 @@ from app.celery.nightly_tasks import (
save_daily_notification_processing_time,
timeout_notifications,
)
from app.models import EMAIL_TYPE, SMS_TYPE, FactProcessingTime
from app.models import EMAIL_TYPE, SMS_TYPE, FactProcessingTime, Job
from tests.app.db import (
create_job,
create_notification,
@@ -313,3 +314,17 @@ def test_delete_notifications_task_calls_task_for_services_that_have_sent_notifi
'datetime_to_delete_before': datetime(2021, 3, 26, 4, 0)
}),
])
def test_cleanup_unfinished_jobs(mocker):
mock_s3 = mocker.patch('app.celery.nightly_tasks.remove_csv_object')
mock_dao_archive = mocker.patch('app.celery.nightly_tasks.dao_archive_job')
mock_dao = mocker.patch('app.celery.nightly_tasks.dao_get_unfinished_jobs')
mock_job_unfinished = Job()
mock_job_unfinished.processing_started = datetime(2023, 1, 1, 0, 0, 0)
mock_job_unfinished.original_file_name = "blah"
mock_dao.return_value = [mock_job_unfinished]
cleanup_unfinished_jobs()
mock_s3.assert_called_once_with('blah')
mock_dao_archive.assert_called_once_with(mock_job_unfinished)