From 832005efef733ed1d485535888c7b12964495c6c Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Wed, 5 Apr 2017 16:23:41 +0100 Subject: [PATCH] Updates to the delete CSV file job to reduce the number of eligible jobs in any run MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - previously this was unbounded, so it got all jobs older then 7 days. In excess of 75,000 🔥 - this meant that the job took (a) a long time and (b) a lot memory and (c) doing the same thing every day These changes mean that the job has a 2 day eligible window for jobs, minimising the number of eligible jobs in a run, whilst still retaining some leeway in event if it failing one night. In principle the job runs early morning on a given day. The previous 7 days are left along, and then the previous 2 days worth of files are deleted: so: runs on 31st 30,29,28,27,26,25,24 are ignored 23,22 jobs here have files deleted 21 and earlier are ignored. --- app/celery/scheduled_tasks.py | 4 +-- app/dao/jobs_dao.py | 5 ++-- tests/app/celery/test_scheduled_tasks.py | 18 +++++++----- tests/app/dao/test_jobs_dao.py | 37 ++++++++++++++++-------- 4 files changed, 40 insertions(+), 24 deletions(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index b8e4eb3e8..b76122819 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -10,7 +10,7 @@ from app.aws import s3 from app import notify_celery from app import performance_platform_client from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago -from app.dao.jobs_dao import dao_set_scheduled_jobs_to_pending, dao_get_jobs_older_than +from app.dao.jobs_dao import dao_set_scheduled_jobs_to_pending, dao_get_jobs_older_than_limited_by from app.dao.notifications_dao import ( delete_notifications_created_more_than_a_week_ago, dao_timeout_notifications, @@ -28,7 +28,7 @@ from app.celery.tasks import process_job @notify_celery.task(name="remove_csv_files") @statsd(namespace="tasks") def remove_csv_files(): - jobs = dao_get_jobs_older_than(7) + jobs = dao_get_jobs_older_than_limited_by() for job in jobs: s3.remove_job_from_s3(job.service_id, job.id) current_app.logger.info("Job ID {} has been removed from s3.".format(job.id)) diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index 1387bb65f..fa30fb41a 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -116,7 +116,8 @@ def dao_update_job(job): db.session.commit() -def dao_get_jobs_older_than(limit_days): +def dao_get_jobs_older_than_limited_by(older_than=7, limit_days=2): return Job.query.filter( - cast(Job.created_at, sql_date) < days_ago(limit_days) + cast(Job.created_at, sql_date) < days_ago(older_than), + cast(Job.created_at, sql_date) >= days_ago(older_than + limit_days) ).order_by(desc(Job.created_at)).all() diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index eb94839aa..b534e7df0 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -203,17 +203,19 @@ def test_should_update_all_scheduled_jobs_and_put_on_queue(notify_db, notify_db_ def test_will_remove_csv_files_for_jobs_older_than_seven_days(notify_db, notify_db_session, mocker): mocker.patch('app.celery.scheduled_tasks.s3.remove_job_from_s3') - one_millisecond_before_midnight = datetime(2016, 10, 9, 23, 59, 59, 999) - midnight = datetime(2016, 10, 10, 0, 0, 0, 0) - one_millisecond_past_midnight = datetime(2016, 10, 10, 0, 0, 0, 1) + eligible_job_1 = datetime(2016, 10, 10, 23, 59, 59, 000) + eligible_job_2 = datetime(2016, 10, 9, 00, 00, 00, 000) + in_eligible_job_too_new = datetime(2016, 10, 11, 00, 00, 00, 000) + in_eligible_job_too_old = datetime(2016, 10, 8, 23, 59, 59, 999) - job_1 = create_sample_job(notify_db, notify_db_session, created_at=one_millisecond_before_midnight) - create_sample_job(notify_db, notify_db_session, created_at=midnight) - create_sample_job(notify_db, notify_db_session, created_at=one_millisecond_past_midnight) + job_1 = create_sample_job(notify_db, notify_db_session, created_at=eligible_job_1) + job_2 = create_sample_job(notify_db, notify_db_session, created_at=eligible_job_2) + create_sample_job(notify_db, notify_db_session, created_at=in_eligible_job_too_new) + create_sample_job(notify_db, notify_db_session, created_at=in_eligible_job_too_old) - with freeze_time('2016-10-17T00:00:00'): + with freeze_time('2016-10-18T10:00:00'): remove_csv_files() - s3.remove_job_from_s3.assert_called_once_with(job_1.service_id, job_1.id) + s3.remove_job_from_s3.assert_has_calls([call(job_1.service_id, job_1.id), call(job_2.service_id, job_2.id)]) def test_send_daily_performance_stats_calls_does_not_send_if_inactive( diff --git a/tests/app/dao/test_jobs_dao.py b/tests/app/dao/test_jobs_dao.py index b51480237..34b92b1d5 100644 --- a/tests/app/dao/test_jobs_dao.py +++ b/tests/app/dao/test_jobs_dao.py @@ -12,9 +12,8 @@ from app.dao.jobs_dao import ( dao_set_scheduled_jobs_to_pending, dao_get_future_scheduled_job_by_id_and_service_id, dao_get_notification_outcomes_for_job, - dao_get_jobs_older_than, all_notifications_are_created_for_job, - dao_get_all_notifications_for_job) + dao_get_all_notifications_for_job, dao_get_jobs_older_than_limited_by) from app.models import Job from tests.app.conftest import sample_notification as create_notification @@ -268,19 +267,33 @@ def test_get_future_scheduled_job_gets_a_job_yet_to_send(sample_scheduled_job): assert result.id == sample_scheduled_job.id -def test_should_get_jobs_older_than_seven_days(notify_db, notify_db_session): - one_millisecond_before_midnight = datetime(2016, 10, 9, 23, 59, 59, 999) - midnight = datetime(2016, 10, 10, 0, 0, 0, 0) - one_millisecond_past_midnight = datetime(2016, 10, 10, 0, 0, 0, 1) +def test_should_get_jobs_seven_days_old(notify_db, notify_db_session): + # job runs at some point on each day + # shouldn't matter when, we are deleting things 7 days ago + job_run_time = '2016-10-31T10:00:00' - job_1 = create_job(notify_db, notify_db_session, created_at=one_millisecond_before_midnight) - create_job(notify_db, notify_db_session, created_at=midnight) - create_job(notify_db, notify_db_session, created_at=one_millisecond_past_midnight) + # running on the 31st means the previous 7 days are ignored - with freeze_time('2016-10-17T00:00:00'): - jobs = dao_get_jobs_older_than(7) - assert len(jobs) == 1 + # 2 day window for delete jobs + # 7 days of files to skip includes the 30,29,28,27,26,25,24th, so the.... + last_possible_time_for_eligible_job = '2016-10-23T23:59:59' + first_possible_time_for_eligible_job = '2016-10-22T00:00:00' + + job_1 = create_job(notify_db, notify_db_session, created_at=last_possible_time_for_eligible_job) + job_2 = create_job(notify_db, notify_db_session, created_at=first_possible_time_for_eligible_job) + + # bookmarks for jobs that should be ignored + last_possible_time_for_ineligible_job = '2016-10-24T00:00:00' + create_job(notify_db, notify_db_session, created_at=last_possible_time_for_ineligible_job) + + first_possible_time_for_ineligible_job = '2016-10-21T23:59:59' + create_job(notify_db, notify_db_session, created_at=first_possible_time_for_ineligible_job) + + with freeze_time(job_run_time): + jobs = dao_get_jobs_older_than_limited_by() + assert len(jobs) == 2 assert jobs[0].id == job_1.id + assert jobs[1].id == job_2.id def test_get_jobs_for_service_is_paginated(notify_db, notify_db_session, sample_service, sample_template):