mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-23 08:51:30 -05:00
Updates to the delete CSV file job to reduce the number of eligible jobs in any run
- previously this was unbounded, so it got all jobs older then 7 days. In excess of 75,000 🔥
- this meant that the job took (a) a long time and (b) a lot memory and (c) doing the same thing every day
These changes mean that the job has a 2 day eligible window for jobs, minimising the number of eligible jobs in a run, whilst still retaining some leeway in event if it failing one night.
In principle the job runs early morning on a given day. The previous 7 days are left along, and then the previous 2 days worth of files are deleted:
so:
runs on
31st
30,29,28,27,26,25,24 are ignored
23,22 jobs here have files deleted
21 and earlier are ignored.
This commit is contained in:
@@ -10,7 +10,7 @@ from app.aws import s3
|
||||
from app import notify_celery
|
||||
from app import performance_platform_client
|
||||
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
|
||||
from app.dao.jobs_dao import dao_set_scheduled_jobs_to_pending, dao_get_jobs_older_than
|
||||
from app.dao.jobs_dao import dao_set_scheduled_jobs_to_pending, dao_get_jobs_older_than_limited_by
|
||||
from app.dao.notifications_dao import (
|
||||
delete_notifications_created_more_than_a_week_ago,
|
||||
dao_timeout_notifications,
|
||||
@@ -28,7 +28,7 @@ from app.celery.tasks import process_job
|
||||
@notify_celery.task(name="remove_csv_files")
|
||||
@statsd(namespace="tasks")
|
||||
def remove_csv_files():
|
||||
jobs = dao_get_jobs_older_than(7)
|
||||
jobs = dao_get_jobs_older_than_limited_by()
|
||||
for job in jobs:
|
||||
s3.remove_job_from_s3(job.service_id, job.id)
|
||||
current_app.logger.info("Job ID {} has been removed from s3.".format(job.id))
|
||||
|
||||
@@ -116,7 +116,8 @@ def dao_update_job(job):
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def dao_get_jobs_older_than(limit_days):
|
||||
def dao_get_jobs_older_than_limited_by(older_than=7, limit_days=2):
|
||||
return Job.query.filter(
|
||||
cast(Job.created_at, sql_date) < days_ago(limit_days)
|
||||
cast(Job.created_at, sql_date) < days_ago(older_than),
|
||||
cast(Job.created_at, sql_date) >= days_ago(older_than + limit_days)
|
||||
).order_by(desc(Job.created_at)).all()
|
||||
|
||||
@@ -203,17 +203,19 @@ def test_should_update_all_scheduled_jobs_and_put_on_queue(notify_db, notify_db_
|
||||
def test_will_remove_csv_files_for_jobs_older_than_seven_days(notify_db, notify_db_session, mocker):
|
||||
mocker.patch('app.celery.scheduled_tasks.s3.remove_job_from_s3')
|
||||
|
||||
one_millisecond_before_midnight = datetime(2016, 10, 9, 23, 59, 59, 999)
|
||||
midnight = datetime(2016, 10, 10, 0, 0, 0, 0)
|
||||
one_millisecond_past_midnight = datetime(2016, 10, 10, 0, 0, 0, 1)
|
||||
eligible_job_1 = datetime(2016, 10, 10, 23, 59, 59, 000)
|
||||
eligible_job_2 = datetime(2016, 10, 9, 00, 00, 00, 000)
|
||||
in_eligible_job_too_new = datetime(2016, 10, 11, 00, 00, 00, 000)
|
||||
in_eligible_job_too_old = datetime(2016, 10, 8, 23, 59, 59, 999)
|
||||
|
||||
job_1 = create_sample_job(notify_db, notify_db_session, created_at=one_millisecond_before_midnight)
|
||||
create_sample_job(notify_db, notify_db_session, created_at=midnight)
|
||||
create_sample_job(notify_db, notify_db_session, created_at=one_millisecond_past_midnight)
|
||||
job_1 = create_sample_job(notify_db, notify_db_session, created_at=eligible_job_1)
|
||||
job_2 = create_sample_job(notify_db, notify_db_session, created_at=eligible_job_2)
|
||||
create_sample_job(notify_db, notify_db_session, created_at=in_eligible_job_too_new)
|
||||
create_sample_job(notify_db, notify_db_session, created_at=in_eligible_job_too_old)
|
||||
|
||||
with freeze_time('2016-10-17T00:00:00'):
|
||||
with freeze_time('2016-10-18T10:00:00'):
|
||||
remove_csv_files()
|
||||
s3.remove_job_from_s3.assert_called_once_with(job_1.service_id, job_1.id)
|
||||
s3.remove_job_from_s3.assert_has_calls([call(job_1.service_id, job_1.id), call(job_2.service_id, job_2.id)])
|
||||
|
||||
|
||||
def test_send_daily_performance_stats_calls_does_not_send_if_inactive(
|
||||
|
||||
@@ -12,9 +12,8 @@ from app.dao.jobs_dao import (
|
||||
dao_set_scheduled_jobs_to_pending,
|
||||
dao_get_future_scheduled_job_by_id_and_service_id,
|
||||
dao_get_notification_outcomes_for_job,
|
||||
dao_get_jobs_older_than,
|
||||
all_notifications_are_created_for_job,
|
||||
dao_get_all_notifications_for_job)
|
||||
dao_get_all_notifications_for_job, dao_get_jobs_older_than_limited_by)
|
||||
from app.models import Job
|
||||
|
||||
from tests.app.conftest import sample_notification as create_notification
|
||||
@@ -268,19 +267,33 @@ def test_get_future_scheduled_job_gets_a_job_yet_to_send(sample_scheduled_job):
|
||||
assert result.id == sample_scheduled_job.id
|
||||
|
||||
|
||||
def test_should_get_jobs_older_than_seven_days(notify_db, notify_db_session):
|
||||
one_millisecond_before_midnight = datetime(2016, 10, 9, 23, 59, 59, 999)
|
||||
midnight = datetime(2016, 10, 10, 0, 0, 0, 0)
|
||||
one_millisecond_past_midnight = datetime(2016, 10, 10, 0, 0, 0, 1)
|
||||
def test_should_get_jobs_seven_days_old(notify_db, notify_db_session):
|
||||
# job runs at some point on each day
|
||||
# shouldn't matter when, we are deleting things 7 days ago
|
||||
job_run_time = '2016-10-31T10:00:00'
|
||||
|
||||
job_1 = create_job(notify_db, notify_db_session, created_at=one_millisecond_before_midnight)
|
||||
create_job(notify_db, notify_db_session, created_at=midnight)
|
||||
create_job(notify_db, notify_db_session, created_at=one_millisecond_past_midnight)
|
||||
# running on the 31st means the previous 7 days are ignored
|
||||
|
||||
with freeze_time('2016-10-17T00:00:00'):
|
||||
jobs = dao_get_jobs_older_than(7)
|
||||
assert len(jobs) == 1
|
||||
# 2 day window for delete jobs
|
||||
# 7 days of files to skip includes the 30,29,28,27,26,25,24th, so the....
|
||||
last_possible_time_for_eligible_job = '2016-10-23T23:59:59'
|
||||
first_possible_time_for_eligible_job = '2016-10-22T00:00:00'
|
||||
|
||||
job_1 = create_job(notify_db, notify_db_session, created_at=last_possible_time_for_eligible_job)
|
||||
job_2 = create_job(notify_db, notify_db_session, created_at=first_possible_time_for_eligible_job)
|
||||
|
||||
# bookmarks for jobs that should be ignored
|
||||
last_possible_time_for_ineligible_job = '2016-10-24T00:00:00'
|
||||
create_job(notify_db, notify_db_session, created_at=last_possible_time_for_ineligible_job)
|
||||
|
||||
first_possible_time_for_ineligible_job = '2016-10-21T23:59:59'
|
||||
create_job(notify_db, notify_db_session, created_at=first_possible_time_for_ineligible_job)
|
||||
|
||||
with freeze_time(job_run_time):
|
||||
jobs = dao_get_jobs_older_than_limited_by()
|
||||
assert len(jobs) == 2
|
||||
assert jobs[0].id == job_1.id
|
||||
assert jobs[1].id == job_2.id
|
||||
|
||||
|
||||
def test_get_jobs_for_service_is_paginated(notify_db, notify_db_session, sample_service, sample_template):
|
||||
|
||||
Reference in New Issue
Block a user