diff --git a/app/aws/s3.py b/app/aws/s3.py index d98529d8a..c2680624d 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -11,6 +11,8 @@ from flask import current_app from app import job_cache, job_cache_lock from app.clients import AWS_CLIENT_CONFIG + +# from app.service.rest import get_service_by_id from notifications_utils import aware_utcnow FILE_LOCATION_STRUCTURE = "service-{}-notify/{}.csv" @@ -162,6 +164,34 @@ def cleanup_old_s3_objects(): current_app.logger.exception( "#delete-old-s3-objects An error occurred while cleaning up old s3 objects", ) + try: + response = s3_client.list_objects_v2(Bucket=bucket_name) + + service_ids = set() + while True: + for obj in response.get("Contents", []): + # Get the service id out of the upload key + key = obj["Key"] + object_arr = key.split("/") + service_id = object_arr[0] + service_id = service_id.replace("-service-notify", "") + service_ids.add(service_id) + if "NextContinuationToken" in response: + response = s3_client.list_objects_v2( + Bucket=bucket_name, + ContinuationToken=response["NextContinuationToken"], + ) + else: + break + retained_services = [] + for service_id in service_ids: + retained_services.append(service_id) + + return service_ids + except Exception as error: + current_app.logger.exception( + f"#delete-old-s3-objects An error occurred while cleaning up old s3 objects: {str(error)}" + ) def get_job_id_from_s3_object_key(key): diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 4a5311c50..61f7b5a20 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -19,6 +19,7 @@ from app.dao.notifications_dao import ( from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id +from app.dao.services_dao import dao_fetch_service_by_id from app.dao.templates_dao import dao_get_template_by_id from app.enums import JobStatus, KeyType, NotificationType from app.errors import TotalRequestsError @@ -496,7 +497,15 @@ def clean_job_cache(): @notify_celery.task(name="delete-old-s3-objects") def delete_old_s3_objects(): - s3.cleanup_old_s3_objects() + + existing_service_ids = s3.cleanup_old_s3_objects() + service_names = [] + for service_id in existing_service_ids: + service = dao_fetch_service_by_id(service_id) + service_names.append(service.name) + current_app.logger.info( + f"#delete-old-s3-objects Services with retained csvs: {service_names}" + ) @notify_celery.task(name="process-incomplete-jobs")