From 0ecd6f27379d41dbdfe80e28db5054b85ec4eeb4 Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Wed, 7 Sep 2016 15:35:12 +0100 Subject: [PATCH 1/4] Removed the 'delete job' tasks from the tasks.py file - this was called after processing a job. - now to be called on a schedule. --- app/aws/__init__.py | 0 app/celery/tasks.py | 9 --------- tests/app/celery/test_tasks.py | 26 ++++++++------------------ tests/app/conftest.py | 5 ----- 4 files changed, 8 insertions(+), 32 deletions(-) create mode 100644 app/aws/__init__.py diff --git a/app/aws/__init__.py b/app/aws/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/app/celery/tasks.py b/app/celery/tasks.py index d655c28ce..f60ae45c9 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -101,20 +101,11 @@ def process_job(job_id): job.processing_started = start job.processing_finished = finished dao_update_job(job) - remove_job.apply_async((str(job_id),), queue='remove-job') current_app.logger.info( "Job {} created at {} started at {} finished at {}".format(job_id, job.created_at, start, finished) ) -@notify_celery.task(name="remove-job") -@statsd(namespace="tasks") -def remove_job(job_id): - job = dao_get_job_by_id(job_id) - s3.remove_job_from_s3(job.service.id, str(job_id)) - current_app.logger.info("Job {} has been removed from s3.".format(job_id)) - - @notify_celery.task(bind=True, name="send-sms", max_retries=5, default_retry_delay=300) @statsd(namespace="tasks") def send_sms(self, diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 8426947a1..aa792776f 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -10,7 +10,7 @@ from sqlalchemy.orm.exc import NoResultFound from app import (encryption, DATETIME_FORMAT) from app.celery import provider_tasks from app.celery import tasks -from app.celery.tasks import s3, remove_job +from app.celery.tasks import s3 from app.celery.tasks import ( send_sms, process_job, @@ -55,13 +55,12 @@ def _notification_json(template, to, personalisation=None, job_id=None, row_numb def test_should_have_decorated_tasks_functions(): assert process_job.__wrapped__.__name__ == 'process_job' - assert remove_job.__wrapped__.__name__ == 'remove_job' assert send_sms.__wrapped__.__name__ == 'send_sms' assert send_email.__wrapped__.__name__ == 'send_email' @freeze_time("2016-01-01 11:09:00.061258") -def test_should_process_sms_job(sample_job, mocker, mock_celery_remove_job): +def test_should_process_sms_job(sample_job, mocker): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) mocker.patch('app.celery.tasks.send_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -91,8 +90,7 @@ def test_should_process_sms_job(sample_job, mocker, mock_celery_remove_job): @freeze_time("2016-01-01 11:09:00.061258") def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, notify_db_session, - mocker, - mock_celery_remove_job): + mocker): service = sample_service(notify_db, notify_db_session, limit=9) job = sample_job(notify_db, notify_db_session, service=service, notification_count=10) @@ -107,13 +105,11 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, job = jobs_dao.dao_get_job_by_id(job.id) assert job.status == 'sending limits exceeded' tasks.send_sms.apply_async.assert_not_called() - mock_celery_remove_job.assert_not_called() def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify_db, notify_db_session, - mocker, - mock_celery_remove_job): + mocker): service = sample_service(notify_db, notify_db_session, limit=1) job = sample_job(notify_db, notify_db_session, service=service) @@ -130,7 +126,6 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify assert job.status == 'sending limits exceeded' s3.get_job_from_s3.assert_not_called() tasks.send_sms.apply_async.assert_not_called() - mock_celery_remove_job.assert_not_called() def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(notify_db, notify_db_session, mocker): @@ -175,8 +170,7 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not @freeze_time("2016-01-01 11:09:00.061258") def test_should_process_email_job_if_exactly_on_send_limits(notify_db, notify_db_session, - mocker, - mock_celery_remove_job): + mocker): service = sample_service(notify_db, notify_db_session, limit=10) template = sample_email_template(notify_db, notify_db_session, service=service) job = sample_job(notify_db, notify_db_session, service=service, template=template, notification_count=10) @@ -203,10 +197,9 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db, ), queue="db-email" ) - mock_celery_remove_job.assert_called_once_with((str(job.id),), queue="remove-job") -def test_should_not_create_send_task_for_empty_file(sample_job, mocker, mock_celery_remove_job): +def test_should_not_create_send_task_for_empty_file(sample_job, mocker): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('empty')) mocker.patch('app.celery.tasks.send_sms.apply_async') @@ -218,11 +211,10 @@ def test_should_not_create_send_task_for_empty_file(sample_job, mocker, mock_cel ) job = jobs_dao.dao_get_job_by_id(sample_job.id) assert job.status == 'finished' - tasks.send_sms.apply_async.assert_not_called @freeze_time("2016-01-01 11:09:00.061258") -def test_should_process_email_job(sample_email_job, mocker, mock_celery_remove_job): +def test_should_process_email_job(sample_email_job, mocker): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('email')) mocker.patch('app.celery.tasks.send_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -249,13 +241,11 @@ def test_should_process_email_job(sample_email_job, mocker, mock_celery_remove_j ) job = jobs_dao.dao_get_job_by_id(sample_email_job.id) assert job.status == 'finished' - mock_celery_remove_job.assert_called_once_with((str(job.id),), queue="remove-job") def test_should_process_all_sms_job(sample_job, sample_job_with_placeholdered_template, - mocker, - mock_celery_remove_job): + mocker): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) mocker.patch('app.celery.tasks.send_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") diff --git a/tests/app/conftest.py b/tests/app/conftest.py index 08784a1dc..5dcdf3be8 100644 --- a/tests/app/conftest.py +++ b/tests/app/conftest.py @@ -479,11 +479,6 @@ def mock_encryption(mocker): return mocker.patch('app.encryption.encrypt', return_value="something_encrypted") -@pytest.fixture(scope='function') -def mock_celery_remove_job(mocker): - return mocker.patch('app.celery.tasks.remove_job.apply_async') - - @pytest.fixture(scope='function') def sample_invited_user(notify_db, notify_db_session, From f506f6751e011172e0b0790fbc668321203d5e93 Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Wed, 7 Sep 2016 15:35:25 +0100 Subject: [PATCH 2/4] Removed unused import --- app/notifications/rest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/notifications/rest.py b/app/notifications/rest.py index f6abd950d..86b423e5c 100644 --- a/app/notifications/rest.py +++ b/app/notifications/rest.py @@ -13,7 +13,7 @@ from notifications_utils.recipients import allowed_to_send_to, first_column_head from notifications_utils.template import Template from notifications_utils.renderers import PassThrough from app.clients.email.aws_ses import get_aws_responses -from app import api_user, encryption, create_uuid, DATETIME_FORMAT, DATE_FORMAT, statsd_client +from app import api_user, encryption, create_uuid, DATETIME_FORMAT, statsd_client from app.dao.services_dao import dao_fetch_todays_stats_for_service from app.models import KEY_TYPE_TEAM, KEY_TYPE_TEST from app.dao import ( From c3657839e4268a4eff26da1cfb801dea6111b485 Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Wed, 7 Sep 2016 15:36:07 +0100 Subject: [PATCH 3/4] New jobs dao method to get jobs that are older than a certain number of days. Used in deleting CSV files scheduled task --- app/dao/jobs_dao.py | 6 ++++++ tests/app/dao/test_jobs_dao.py | 20 ++++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index 790b9cb18..b4cfbde4d 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -66,3 +66,9 @@ def dao_create_job(job): def dao_update_job(job): db.session.add(job) db.session.commit() + + +def dao_get_jobs_older_than(limit_days): + return Job.query.filter( + cast(Job.created_at, sql_date) < days_ago(limit_days) + ).order_by(desc(Job.created_at)).all() diff --git a/tests/app/dao/test_jobs_dao.py b/tests/app/dao/test_jobs_dao.py index 2b4728be1..ee116c8ca 100644 --- a/tests/app/dao/test_jobs_dao.py +++ b/tests/app/dao/test_jobs_dao.py @@ -1,5 +1,6 @@ from datetime import (datetime, timedelta) import uuid +from freezegun import freeze_time from app.dao.jobs_dao import ( dao_get_job_by_service_id_and_job_id, @@ -8,8 +9,8 @@ from app.dao.jobs_dao import ( dao_get_jobs_by_service_id, dao_get_scheduled_jobs, dao_get_future_scheduled_job_by_id_and_service_id, - dao_get_notification_outcomes_for_job -) + dao_get_notification_outcomes_for_job, + dao_get_jobs_older_than) from app.models import Job from tests.app.conftest import sample_notification, sample_job, sample_service @@ -255,3 +256,18 @@ def test_get_scheduled_jobs_gets_ignores_jobs_scheduled_in_the_future(sample_sch def test_get_future_scheduled_job_gets_a_job_yet_to_send(sample_scheduled_job): result = dao_get_future_scheduled_job_by_id_and_service_id(sample_scheduled_job.id, sample_scheduled_job.service_id) assert result.id == sample_scheduled_job.id + + +def test_should_get_jobs_older_than_seven_days(notify_db, notify_db_session): + one_millisecond_before_midnight = datetime(2016, 10, 9, 23, 59, 59, 999) + midnight = datetime(2016, 10, 10, 0, 0, 0, 0) + one_millisecond_past_midnight = datetime(2016, 10, 10, 0, 0, 0, 1) + + job_1 = sample_job(notify_db, notify_db_session, created_at=one_millisecond_before_midnight) + sample_job(notify_db, notify_db_session, created_at=midnight) + sample_job(notify_db, notify_db_session, created_at=one_millisecond_past_midnight) + + with freeze_time('2016-10-17T00:00:00'): + jobs = dao_get_jobs_older_than(7) + assert len(jobs) == 1 + assert jobs[0].id == job_1.id From 852e207478e02281bb281d86eb16c85cf55b6859 Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Wed, 7 Sep 2016 15:36:59 +0100 Subject: [PATCH 4/4] Adds new scheduled task to delete CSV files. Deletes files for jobs older than 7 days, matching delete process for the actual notifications Runs 1 minutes past the hour at midnight, 1 and 2 am. --- app/celery/scheduled_tasks.py | 13 +++++++++++-- config.py | 5 +++++ tests/app/celery/test_scheduled_tasks.py | 21 ++++++++++++++++++++- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index b0cb57660..c960c3b20 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -3,10 +3,10 @@ from datetime import datetime, timedelta from flask import current_app from sqlalchemy.exc import SQLAlchemyError +from app.aws import s3 from app import notify_celery -from app.clients import STATISTICS_FAILURE from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago -from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job +from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job, dao_get_jobs_older_than from app.dao.notifications_dao import delete_notifications_created_more_than_a_week_ago, get_notifications, \ update_notification_status_by_id from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago @@ -15,6 +15,15 @@ from app.models import JOB_STATUS_PENDING from app.celery.tasks import process_job +@notify_celery.task(name="remove_csv_files") +@statsd(namespace="tasks") +def remove_csv_files(): + jobs = dao_get_jobs_older_than(7) + for job in jobs: + s3.remove_job_from_s3(job.service_id, job.id) + current_app.logger.info("Job ID {} has been removed from s3.".format(job.id)) + + @notify_celery.task(name="run-scheduled-jobs") @statsd(namespace="tasks") def run_scheduled_jobs(): diff --git a/config.py b/config.py index 44a0b88d1..1df96acf4 100644 --- a/config.py +++ b/config.py @@ -77,6 +77,11 @@ class Config(object): 'task': 'timeout-sending-notifications', 'schedule': crontab(minute=0, hour='0,1,2'), 'options': {'queue': 'periodic'} + }, + 'remove_csv_files': { + 'task': 'remove_csv_files', + 'schedule': crontab(minute=1, hour='0,1,2'), + 'options': {'queue': 'periodic'} } } CELERY_QUEUES = [ diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 893988490..aa5345ff2 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -1,9 +1,11 @@ from datetime import datetime, timedelta from flask import current_app - +from freezegun import freeze_time +from app.celery.scheduled_tasks import s3 from app.celery import scheduled_tasks from app.celery.scheduled_tasks import (delete_verify_codes, + remove_csv_files, delete_successful_notifications, delete_failed_notifications, delete_invitations, @@ -21,6 +23,7 @@ def test_should_have_decorated_tasks_functions(): assert timeout_notifications.__wrapped__.__name__ == 'timeout_notifications' assert delete_invitations.__wrapped__.__name__ == 'delete_invitations' assert run_scheduled_jobs.__wrapped__.__name__ == 'run_scheduled_jobs' + assert remove_csv_files.__wrapped__.__name__ == 'remove_csv_files' def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker): @@ -120,3 +123,19 @@ def test_should_update_all_scheduled_jobs_and_put_on_queue(notify_db, notify_db_ call([str(job_2.id)], queue='process-job'), call([str(job_1.id)], queue='process-job') ]) + + +def test_will_remove_csv_files_for_jobs_older_than_seven_days(notify_db, notify_db_session, mocker): + mocker.patch('app.celery.scheduled_tasks.s3.remove_job_from_s3') + + one_millisecond_before_midnight = datetime(2016, 10, 9, 23, 59, 59, 999) + midnight = datetime(2016, 10, 10, 0, 0, 0, 0) + one_millisecond_past_midnight = datetime(2016, 10, 10, 0, 0, 0, 1) + + job_1 = sample_job(notify_db, notify_db_session, created_at=one_millisecond_before_midnight) + sample_job(notify_db, notify_db_session, created_at=midnight) + sample_job(notify_db, notify_db_session, created_at=one_millisecond_past_midnight) + + with freeze_time('2016-10-17T00:00:00'): + remove_csv_files() + s3.remove_job_from_s3.assert_called_once_with(job_1.service_id, job_1.id)