mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-23 08:51:30 -05:00
Merge pull request #1019 from alphagov/imdad-feat-remove-dvla-files-on-schedule
Remove letter jobs separately
This commit is contained in:
@@ -32,8 +32,8 @@ from app.config import QueueNames
|
|||||||
|
|
||||||
@notify_celery.task(name="remove_csv_files")
|
@notify_celery.task(name="remove_csv_files")
|
||||||
@statsd(namespace="tasks")
|
@statsd(namespace="tasks")
|
||||||
def remove_csv_files():
|
def remove_csv_files(job_types):
|
||||||
jobs = dao_get_jobs_older_than_limited_by()
|
jobs = dao_get_jobs_older_than_limited_by(job_types=job_types)
|
||||||
for job in jobs:
|
for job in jobs:
|
||||||
s3.remove_job_from_s3(job.service_id, job.id)
|
s3.remove_job_from_s3(job.service_id, job.id)
|
||||||
current_app.logger.info("Job ID {} has been removed from s3.".format(job.id))
|
current_app.logger.info("Job ID {} has been removed from s3.".format(job.id))
|
||||||
|
|||||||
@@ -3,7 +3,10 @@ from celery.schedules import crontab
|
|||||||
from kombu import Exchange, Queue
|
from kombu import Exchange, Queue
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from app.models import KEY_TYPE_NORMAL, KEY_TYPE_TEAM, KEY_TYPE_TEST
|
from app.models import (
|
||||||
|
EMAIL_TYPE, SMS_TYPE, LETTER_TYPE,
|
||||||
|
KEY_TYPE_NORMAL, KEY_TYPE_TEAM, KEY_TYPE_TEST
|
||||||
|
)
|
||||||
|
|
||||||
if os.environ.get('VCAP_SERVICES'):
|
if os.environ.get('VCAP_SERVICES'):
|
||||||
# on cloudfoundry, config is a json blob in VCAP_SERVICES - unpack it, and populate
|
# on cloudfoundry, config is a json blob in VCAP_SERVICES - unpack it, and populate
|
||||||
@@ -189,10 +192,17 @@ class Config(object):
|
|||||||
'schedule': crontab(minute=0, hour=3),
|
'schedule': crontab(minute=0, hour=3),
|
||||||
'options': {'queue': QueueNames.PERIODIC}
|
'options': {'queue': QueueNames.PERIODIC}
|
||||||
},
|
},
|
||||||
'remove_csv_files': {
|
'remove_sms_email_jobs': {
|
||||||
'task': 'remove_csv_files',
|
'task': 'remove_csv_files',
|
||||||
'schedule': crontab(minute=0, hour=4),
|
'schedule': crontab(minute=0, hour=4),
|
||||||
'options': {'queue': QueueNames.PERIODIC}
|
'options': {'queue': QueueNames.PERIODIC},
|
||||||
|
'kwargs': {'job_types': [EMAIL_TYPE, SMS_TYPE]}
|
||||||
|
},
|
||||||
|
'remove_letter_jobs': {
|
||||||
|
'task': 'remove_csv_files',
|
||||||
|
'schedule': crontab(minute=20, hour=4),
|
||||||
|
'options': {'queue': QueueNames.PERIODIC},
|
||||||
|
'kwargs': {'job_types': [LETTER_TYPE]}
|
||||||
},
|
},
|
||||||
'timeout-job-statistics': {
|
'timeout-job-statistics': {
|
||||||
'task': 'timeout-job-statistics',
|
'task': 'timeout-job-statistics',
|
||||||
|
|||||||
@@ -1,17 +1,15 @@
|
|||||||
from datetime import datetime
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
from sqlalchemy import func, desc, asc, cast, Date as sql_date
|
from sqlalchemy import func, desc, asc, cast, Date as sql_date
|
||||||
|
|
||||||
from app import db
|
from app import db
|
||||||
from app.dao import days_ago
|
from app.dao import days_ago
|
||||||
from app.models import (Job,
|
from app.models import (
|
||||||
Notification,
|
Job, JobStatistics, Notification, NotificationHistory, Template,
|
||||||
NotificationHistory,
|
JOB_STATUS_SCHEDULED, JOB_STATUS_PENDING,
|
||||||
Template,
|
EMAIL_TYPE, SMS_TYPE, LETTER_TYPE
|
||||||
JOB_STATUS_SCHEDULED,
|
)
|
||||||
JOB_STATUS_PENDING,
|
|
||||||
LETTER_TYPE, JobStatistics)
|
|
||||||
from app.statsd_decorators import statsd
|
from app.statsd_decorators import statsd
|
||||||
|
|
||||||
|
|
||||||
@@ -129,10 +127,14 @@ def dao_update_job_status(job_id, status):
|
|||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
def dao_get_jobs_older_than_limited_by(older_than=7, limit_days=2):
|
def dao_get_jobs_older_than_limited_by(job_types, older_than=7, limit_days=2):
|
||||||
return Job.query.filter(
|
end_date = datetime.utcnow() - timedelta(days=older_than)
|
||||||
cast(Job.created_at, sql_date) < days_ago(older_than),
|
start_date = end_date - timedelta(days=limit_days)
|
||||||
cast(Job.created_at, sql_date) >= days_ago(older_than + limit_days)
|
|
||||||
|
return Job.query.join(Template).filter(
|
||||||
|
Job.created_at < end_date,
|
||||||
|
Job.created_at >= start_date,
|
||||||
|
Template.template_type.in_(job_types)
|
||||||
).order_by(desc(Job.created_at)).all()
|
).order_by(desc(Job.created_at)).all()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -30,9 +30,12 @@ from app.dao.provider_details_dao import (
|
|||||||
dao_update_provider_details,
|
dao_update_provider_details,
|
||||||
get_current_provider
|
get_current_provider
|
||||||
)
|
)
|
||||||
from app.models import Service, Template
|
from app.models import (
|
||||||
|
Service, Template,
|
||||||
|
SMS_TYPE, LETTER_TYPE
|
||||||
|
)
|
||||||
from app.utils import get_london_midnight_in_utc
|
from app.utils import get_london_midnight_in_utc
|
||||||
from tests.app.db import create_notification, create_service
|
from tests.app.db import create_notification, create_service, create_template, create_job
|
||||||
from tests.app.conftest import (
|
from tests.app.conftest import (
|
||||||
sample_job as create_sample_job,
|
sample_job as create_sample_job,
|
||||||
sample_notification_history as create_notification_history,
|
sample_notification_history as create_notification_history,
|
||||||
@@ -214,22 +217,33 @@ def test_should_update_all_scheduled_jobs_and_put_on_queue(notify_db, notify_db_
|
|||||||
])
|
])
|
||||||
|
|
||||||
|
|
||||||
def test_will_remove_csv_files_for_jobs_older_than_seven_days(notify_db, notify_db_session, mocker):
|
@freeze_time('2016-10-18T10:00:00')
|
||||||
|
def test_will_remove_csv_files_for_jobs_older_than_seven_days(
|
||||||
|
notify_db, notify_db_session, mocker, sample_template
|
||||||
|
):
|
||||||
mocker.patch('app.celery.scheduled_tasks.s3.remove_job_from_s3')
|
mocker.patch('app.celery.scheduled_tasks.s3.remove_job_from_s3')
|
||||||
|
"""
|
||||||
|
Jobs older than seven days are deleted, but only two day's worth (two-day window)
|
||||||
|
"""
|
||||||
|
seven_days_ago = datetime.utcnow() - timedelta(days=7)
|
||||||
|
just_under_seven_days = seven_days_ago + timedelta(seconds=1)
|
||||||
|
eight_days_ago = seven_days_ago - timedelta(days=1)
|
||||||
|
nine_days_ago = eight_days_ago - timedelta(days=1)
|
||||||
|
just_under_nine_days = nine_days_ago + timedelta(seconds=1)
|
||||||
|
nine_days_one_second_ago = nine_days_ago - timedelta(seconds=1)
|
||||||
|
|
||||||
eligible_job_1 = datetime(2016, 10, 10, 23, 59, 59, 000)
|
create_sample_job(notify_db, notify_db_session, created_at=nine_days_one_second_ago)
|
||||||
eligible_job_2 = datetime(2016, 10, 9, 00, 00, 00, 000)
|
job1_to_delete = create_sample_job(notify_db, notify_db_session, created_at=eight_days_ago)
|
||||||
in_eligible_job_too_new = datetime(2016, 10, 11, 00, 00, 00, 000)
|
job2_to_delete = create_sample_job(notify_db, notify_db_session, created_at=just_under_nine_days)
|
||||||
in_eligible_job_too_old = datetime(2016, 10, 8, 23, 59, 59, 999)
|
create_sample_job(notify_db, notify_db_session, created_at=seven_days_ago)
|
||||||
|
create_sample_job(notify_db, notify_db_session, created_at=just_under_seven_days)
|
||||||
|
|
||||||
job_1 = create_sample_job(notify_db, notify_db_session, created_at=eligible_job_1)
|
remove_csv_files(job_types=[sample_template.template_type])
|
||||||
job_2 = create_sample_job(notify_db, notify_db_session, created_at=eligible_job_2)
|
|
||||||
create_sample_job(notify_db, notify_db_session, created_at=in_eligible_job_too_new)
|
|
||||||
create_sample_job(notify_db, notify_db_session, created_at=in_eligible_job_too_old)
|
|
||||||
|
|
||||||
with freeze_time('2016-10-18T10:00:00'):
|
assert s3.remove_job_from_s3.call_args_list == [
|
||||||
remove_csv_files()
|
call(job1_to_delete.service_id, job1_to_delete.id),
|
||||||
assert s3.remove_job_from_s3.call_args_list == [call(job_1.service_id, job_1.id), call(job_2.service_id, job_2.id)]
|
call(job2_to_delete.service_id, job2_to_delete.id)
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def test_send_daily_performance_stats_calls_does_not_send_if_inactive(
|
def test_send_daily_performance_stats_calls_does_not_send_if_inactive(
|
||||||
@@ -453,3 +467,24 @@ def test_should_call_delete_inbound_sms_older_than_seven_days(notify_api, mocker
|
|||||||
mocker.patch('app.celery.scheduled_tasks.delete_inbound_sms_created_more_than_a_week_ago')
|
mocker.patch('app.celery.scheduled_tasks.delete_inbound_sms_created_more_than_a_week_ago')
|
||||||
delete_inbound_sms_older_than_seven_days()
|
delete_inbound_sms_older_than_seven_days()
|
||||||
assert scheduled_tasks.delete_inbound_sms_created_more_than_a_week_ago.call_count == 1
|
assert scheduled_tasks.delete_inbound_sms_created_more_than_a_week_ago.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
@freeze_time('2017-01-01 10:00:00')
|
||||||
|
def test_remove_csv_files_filters_by_type(mocker, sample_service):
|
||||||
|
mocker.patch('app.celery.scheduled_tasks.s3.remove_job_from_s3')
|
||||||
|
"""
|
||||||
|
Jobs older than seven days are deleted, but only two day's worth (two-day window)
|
||||||
|
"""
|
||||||
|
letter_template = create_template(service=sample_service, template_type=LETTER_TYPE)
|
||||||
|
sms_template = create_template(service=sample_service, template_type=SMS_TYPE)
|
||||||
|
|
||||||
|
eight_days_ago = datetime.utcnow() - timedelta(days=8)
|
||||||
|
|
||||||
|
job_to_delete = create_job(template=letter_template, created_at=eight_days_ago)
|
||||||
|
create_job(template=sms_template, created_at=eight_days_ago)
|
||||||
|
|
||||||
|
remove_csv_files(job_types=[LETTER_TYPE])
|
||||||
|
|
||||||
|
assert s3.remove_job_from_s3.call_args_list == [
|
||||||
|
call(job_to_delete.service_id, job_to_delete.id),
|
||||||
|
]
|
||||||
|
|||||||
@@ -280,16 +280,18 @@ def sample_team_api_key(notify_db, notify_db_session, service=None):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='function')
|
@pytest.fixture(scope='function')
|
||||||
def sample_job(notify_db,
|
def sample_job(
|
||||||
notify_db_session,
|
notify_db,
|
||||||
service=None,
|
notify_db_session,
|
||||||
template=None,
|
service=None,
|
||||||
notification_count=1,
|
template=None,
|
||||||
created_at=None,
|
notification_count=1,
|
||||||
job_status='pending',
|
created_at=None,
|
||||||
scheduled_for=None,
|
job_status='pending',
|
||||||
processing_started=None,
|
scheduled_for=None,
|
||||||
original_file_name='some.csv'):
|
processing_started=None,
|
||||||
|
original_file_name='some.csv'
|
||||||
|
):
|
||||||
if service is None:
|
if service is None:
|
||||||
service = sample_service(notify_db, notify_db_session)
|
service = sample_service(notify_db, notify_db_session)
|
||||||
if template is None:
|
if template is None:
|
||||||
|
|||||||
@@ -17,7 +17,10 @@ from app.dao.jobs_dao import (
|
|||||||
dao_update_job_status,
|
dao_update_job_status,
|
||||||
dao_get_all_notifications_for_job,
|
dao_get_all_notifications_for_job,
|
||||||
dao_get_jobs_older_than_limited_by)
|
dao_get_jobs_older_than_limited_by)
|
||||||
from app.models import Job, JobStatistics
|
from app.models import (
|
||||||
|
Job, JobStatistics,
|
||||||
|
EMAIL_TYPE, SMS_TYPE, LETTER_TYPE
|
||||||
|
)
|
||||||
|
|
||||||
from tests.app.conftest import sample_notification as create_notification
|
from tests.app.conftest import sample_notification as create_notification
|
||||||
from tests.app.conftest import sample_job as create_job
|
from tests.app.conftest import sample_job as create_job
|
||||||
@@ -285,33 +288,30 @@ def test_get_future_scheduled_job_gets_a_job_yet_to_send(sample_scheduled_job):
|
|||||||
assert result.id == sample_scheduled_job.id
|
assert result.id == sample_scheduled_job.id
|
||||||
|
|
||||||
|
|
||||||
def test_should_get_jobs_seven_days_old(notify_db, notify_db_session):
|
@freeze_time('2016-10-31 10:00:00')
|
||||||
# job runs at some point on each day
|
def test_should_get_jobs_seven_days_old(notify_db, notify_db_session, sample_template):
|
||||||
# shouldn't matter when, we are deleting things 7 days ago
|
"""
|
||||||
job_run_time = '2016-10-31T10:00:00'
|
Jobs older than seven days are deleted, but only two day's worth (two-day window)
|
||||||
|
"""
|
||||||
|
seven_days_ago = datetime.utcnow() - timedelta(days=7)
|
||||||
|
within_seven_days = seven_days_ago + timedelta(seconds=1)
|
||||||
|
|
||||||
# running on the 31st means the previous 7 days are ignored
|
eight_days_ago = seven_days_ago - timedelta(days=1)
|
||||||
|
|
||||||
# 2 day window for delete jobs
|
nine_days_ago = eight_days_ago - timedelta(days=2)
|
||||||
# 7 days of files to skip includes the 30,29,28,27,26,25,24th, so the....
|
nine_days_one_second_ago = nine_days_ago - timedelta(seconds=1)
|
||||||
last_possible_time_for_eligible_job = '2016-10-23T23:59:59'
|
|
||||||
first_possible_time_for_eligible_job = '2016-10-22T00:00:00'
|
|
||||||
|
|
||||||
job_1 = create_job(notify_db, notify_db_session, created_at=last_possible_time_for_eligible_job)
|
job = partial(create_job, notify_db, notify_db_session)
|
||||||
job_2 = create_job(notify_db, notify_db_session, created_at=first_possible_time_for_eligible_job)
|
job(created_at=seven_days_ago)
|
||||||
|
job(created_at=within_seven_days)
|
||||||
|
job_to_delete = job(created_at=eight_days_ago)
|
||||||
|
job(created_at=nine_days_ago)
|
||||||
|
job(created_at=nine_days_one_second_ago)
|
||||||
|
|
||||||
# bookmarks for jobs that should be ignored
|
jobs = dao_get_jobs_older_than_limited_by(job_types=[sample_template.template_type])
|
||||||
last_possible_time_for_ineligible_job = '2016-10-24T00:00:00'
|
|
||||||
create_job(notify_db, notify_db_session, created_at=last_possible_time_for_ineligible_job)
|
|
||||||
|
|
||||||
first_possible_time_for_ineligible_job = '2016-10-21T23:59:59'
|
assert len(jobs) == 1
|
||||||
create_job(notify_db, notify_db_session, created_at=first_possible_time_for_ineligible_job)
|
assert jobs[0].id == job_to_delete.id
|
||||||
|
|
||||||
with freeze_time(job_run_time):
|
|
||||||
jobs = dao_get_jobs_older_than_limited_by()
|
|
||||||
assert len(jobs) == 2
|
|
||||||
assert jobs[0].id == job_1.id
|
|
||||||
assert jobs[1].id == job_2.id
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_jobs_for_service_is_paginated(notify_db, notify_db_session, sample_service, sample_template):
|
def test_get_jobs_for_service_is_paginated(notify_db, notify_db_session, sample_service, sample_template):
|
||||||
@@ -391,3 +391,23 @@ def test_dao_update_job_status(sample_job):
|
|||||||
updated_job = Job.query.get(sample_job.id)
|
updated_job = Job.query.get(sample_job.id)
|
||||||
assert updated_job.job_status == 'sent to dvla'
|
assert updated_job.job_status == 'sent to dvla'
|
||||||
assert updated_job.updated_at
|
assert updated_job.updated_at
|
||||||
|
|
||||||
|
|
||||||
|
@freeze_time('2016-10-31 10:00:00')
|
||||||
|
def test_should_get_jobs_seven_days_old_filters_type(notify_db, notify_db_session):
|
||||||
|
eight_days_ago = datetime.utcnow() - timedelta(days=8)
|
||||||
|
letter_template = create_template(notify_db, notify_db_session, template_type=LETTER_TYPE)
|
||||||
|
sms_template = create_template(notify_db, notify_db_session, template_type=SMS_TYPE)
|
||||||
|
email_template = create_template(notify_db, notify_db_session, template_type=EMAIL_TYPE)
|
||||||
|
|
||||||
|
job = partial(create_job, notify_db, notify_db_session, created_at=eight_days_ago)
|
||||||
|
job_to_remain = job(template=letter_template)
|
||||||
|
job(template=sms_template)
|
||||||
|
job(template=email_template)
|
||||||
|
|
||||||
|
jobs = dao_get_jobs_older_than_limited_by(
|
||||||
|
job_types=[EMAIL_TYPE, SMS_TYPE]
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(jobs) == 2
|
||||||
|
assert job_to_remain.id not in [job.id for job in jobs]
|
||||||
|
|||||||
Reference in New Issue
Block a user