Adds new scheduled task to delete CSV files.

Deletes files for jobs older than 7 days, matching delete process for the actual notifications

Runs 1 minutes past the hour at midnight, 1 and 2 am.
This commit is contained in:
Martyn Inglis
2016-09-07 15:36:59 +01:00
parent c3657839e4
commit 852e207478
3 changed files with 36 additions and 3 deletions

View File

@@ -3,10 +3,10 @@ from datetime import datetime, timedelta
from flask import current_app from flask import current_app
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from app.aws import s3
from app import notify_celery from app import notify_celery
from app.clients import STATISTICS_FAILURE
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job, dao_get_jobs_older_than
from app.dao.notifications_dao import delete_notifications_created_more_than_a_week_ago, get_notifications, \ from app.dao.notifications_dao import delete_notifications_created_more_than_a_week_ago, get_notifications, \
update_notification_status_by_id update_notification_status_by_id
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
@@ -15,6 +15,15 @@ from app.models import JOB_STATUS_PENDING
from app.celery.tasks import process_job from app.celery.tasks import process_job
@notify_celery.task(name="remove_csv_files")
@statsd(namespace="tasks")
def remove_csv_files():
jobs = dao_get_jobs_older_than(7)
for job in jobs:
s3.remove_job_from_s3(job.service_id, job.id)
current_app.logger.info("Job ID {} has been removed from s3.".format(job.id))
@notify_celery.task(name="run-scheduled-jobs") @notify_celery.task(name="run-scheduled-jobs")
@statsd(namespace="tasks") @statsd(namespace="tasks")
def run_scheduled_jobs(): def run_scheduled_jobs():

View File

@@ -77,6 +77,11 @@ class Config(object):
'task': 'timeout-sending-notifications', 'task': 'timeout-sending-notifications',
'schedule': crontab(minute=0, hour='0,1,2'), 'schedule': crontab(minute=0, hour='0,1,2'),
'options': {'queue': 'periodic'} 'options': {'queue': 'periodic'}
},
'remove_csv_files': {
'task': 'remove_csv_files',
'schedule': crontab(minute=1, hour='0,1,2'),
'options': {'queue': 'periodic'}
} }
} }
CELERY_QUEUES = [ CELERY_QUEUES = [

View File

@@ -1,9 +1,11 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
from flask import current_app from flask import current_app
from freezegun import freeze_time
from app.celery.scheduled_tasks import s3
from app.celery import scheduled_tasks from app.celery import scheduled_tasks
from app.celery.scheduled_tasks import (delete_verify_codes, from app.celery.scheduled_tasks import (delete_verify_codes,
remove_csv_files,
delete_successful_notifications, delete_successful_notifications,
delete_failed_notifications, delete_failed_notifications,
delete_invitations, delete_invitations,
@@ -21,6 +23,7 @@ def test_should_have_decorated_tasks_functions():
assert timeout_notifications.__wrapped__.__name__ == 'timeout_notifications' assert timeout_notifications.__wrapped__.__name__ == 'timeout_notifications'
assert delete_invitations.__wrapped__.__name__ == 'delete_invitations' assert delete_invitations.__wrapped__.__name__ == 'delete_invitations'
assert run_scheduled_jobs.__wrapped__.__name__ == 'run_scheduled_jobs' assert run_scheduled_jobs.__wrapped__.__name__ == 'run_scheduled_jobs'
assert remove_csv_files.__wrapped__.__name__ == 'remove_csv_files'
def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker): def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker):
@@ -120,3 +123,19 @@ def test_should_update_all_scheduled_jobs_and_put_on_queue(notify_db, notify_db_
call([str(job_2.id)], queue='process-job'), call([str(job_2.id)], queue='process-job'),
call([str(job_1.id)], queue='process-job') call([str(job_1.id)], queue='process-job')
]) ])
def test_will_remove_csv_files_for_jobs_older_than_seven_days(notify_db, notify_db_session, mocker):
mocker.patch('app.celery.scheduled_tasks.s3.remove_job_from_s3')
one_millisecond_before_midnight = datetime(2016, 10, 9, 23, 59, 59, 999)
midnight = datetime(2016, 10, 10, 0, 0, 0, 0)
one_millisecond_past_midnight = datetime(2016, 10, 10, 0, 0, 0, 1)
job_1 = sample_job(notify_db, notify_db_session, created_at=one_millisecond_before_midnight)
sample_job(notify_db, notify_db_session, created_at=midnight)
sample_job(notify_db, notify_db_session, created_at=one_millisecond_past_midnight)
with freeze_time('2016-10-17T00:00:00'):
remove_csv_files()
s3.remove_job_from_s3.assert_called_once_with(job_1.service_id, job_1.id)