diff --git a/app/aws/__init__.py b/app/aws/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index b0cb57660..c960c3b20 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -3,10 +3,10 @@ from datetime import datetime, timedelta from flask import current_app from sqlalchemy.exc import SQLAlchemyError +from app.aws import s3 from app import notify_celery -from app.clients import STATISTICS_FAILURE from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago -from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job +from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job, dao_get_jobs_older_than from app.dao.notifications_dao import delete_notifications_created_more_than_a_week_ago, get_notifications, \ update_notification_status_by_id from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago @@ -15,6 +15,15 @@ from app.models import JOB_STATUS_PENDING from app.celery.tasks import process_job +@notify_celery.task(name="remove_csv_files") +@statsd(namespace="tasks") +def remove_csv_files(): + jobs = dao_get_jobs_older_than(7) + for job in jobs: + s3.remove_job_from_s3(job.service_id, job.id) + current_app.logger.info("Job ID {} has been removed from s3.".format(job.id)) + + @notify_celery.task(name="run-scheduled-jobs") @statsd(namespace="tasks") def run_scheduled_jobs(): diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 29fffcb42..eee951dfd 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -101,20 +101,11 @@ def process_job(job_id): job.processing_started = start job.processing_finished = finished dao_update_job(job) - remove_job.apply_async((str(job_id),), queue='remove-job') current_app.logger.info( "Job {} created at {} started at {} finished at {}".format(job_id, job.created_at, start, finished) ) -@notify_celery.task(name="remove-job") -@statsd(namespace="tasks") -def remove_job(job_id): - job = dao_get_job_by_id(job_id) - s3.remove_job_from_s3(job.service.id, str(job_id)) - current_app.logger.info("Job {} has been removed from s3.".format(job_id)) - - @notify_celery.task(bind=True, name="send-sms", max_retries=5, default_retry_delay=300) @statsd(namespace="tasks") def send_sms(self, diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index 790b9cb18..b4cfbde4d 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -66,3 +66,9 @@ def dao_create_job(job): def dao_update_job(job): db.session.add(job) db.session.commit() + + +def dao_get_jobs_older_than(limit_days): + return Job.query.filter( + cast(Job.created_at, sql_date) < days_ago(limit_days) + ).order_by(desc(Job.created_at)).all() diff --git a/config.py b/config.py index 415487d7f..6b2be3721 100644 --- a/config.py +++ b/config.py @@ -107,6 +107,11 @@ class Config(object): 'task': 'timeout-sending-notifications', 'schedule': crontab(minute=0, hour='0,1,2'), 'options': {'queue': 'periodic'} + }, + 'remove_csv_files': { + 'task': 'remove_csv_files', + 'schedule': crontab(minute=1, hour='0,1,2'), + 'options': {'queue': 'periodic'} } } CELERY_QUEUES = [ diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 893988490..aa5345ff2 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -1,9 +1,11 @@ from datetime import datetime, timedelta from flask import current_app - +from freezegun import freeze_time +from app.celery.scheduled_tasks import s3 from app.celery import scheduled_tasks from app.celery.scheduled_tasks import (delete_verify_codes, + remove_csv_files, delete_successful_notifications, delete_failed_notifications, delete_invitations, @@ -21,6 +23,7 @@ def test_should_have_decorated_tasks_functions(): assert timeout_notifications.__wrapped__.__name__ == 'timeout_notifications' assert delete_invitations.__wrapped__.__name__ == 'delete_invitations' assert run_scheduled_jobs.__wrapped__.__name__ == 'run_scheduled_jobs' + assert remove_csv_files.__wrapped__.__name__ == 'remove_csv_files' def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker): @@ -120,3 +123,19 @@ def test_should_update_all_scheduled_jobs_and_put_on_queue(notify_db, notify_db_ call([str(job_2.id)], queue='process-job'), call([str(job_1.id)], queue='process-job') ]) + + +def test_will_remove_csv_files_for_jobs_older_than_seven_days(notify_db, notify_db_session, mocker): + mocker.patch('app.celery.scheduled_tasks.s3.remove_job_from_s3') + + one_millisecond_before_midnight = datetime(2016, 10, 9, 23, 59, 59, 999) + midnight = datetime(2016, 10, 10, 0, 0, 0, 0) + one_millisecond_past_midnight = datetime(2016, 10, 10, 0, 0, 0, 1) + + job_1 = sample_job(notify_db, notify_db_session, created_at=one_millisecond_before_midnight) + sample_job(notify_db, notify_db_session, created_at=midnight) + sample_job(notify_db, notify_db_session, created_at=one_millisecond_past_midnight) + + with freeze_time('2016-10-17T00:00:00'): + remove_csv_files() + s3.remove_job_from_s3.assert_called_once_with(job_1.service_id, job_1.id) diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 8426947a1..aa792776f 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -10,7 +10,7 @@ from sqlalchemy.orm.exc import NoResultFound from app import (encryption, DATETIME_FORMAT) from app.celery import provider_tasks from app.celery import tasks -from app.celery.tasks import s3, remove_job +from app.celery.tasks import s3 from app.celery.tasks import ( send_sms, process_job, @@ -55,13 +55,12 @@ def _notification_json(template, to, personalisation=None, job_id=None, row_numb def test_should_have_decorated_tasks_functions(): assert process_job.__wrapped__.__name__ == 'process_job' - assert remove_job.__wrapped__.__name__ == 'remove_job' assert send_sms.__wrapped__.__name__ == 'send_sms' assert send_email.__wrapped__.__name__ == 'send_email' @freeze_time("2016-01-01 11:09:00.061258") -def test_should_process_sms_job(sample_job, mocker, mock_celery_remove_job): +def test_should_process_sms_job(sample_job, mocker): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) mocker.patch('app.celery.tasks.send_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -91,8 +90,7 @@ def test_should_process_sms_job(sample_job, mocker, mock_celery_remove_job): @freeze_time("2016-01-01 11:09:00.061258") def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, notify_db_session, - mocker, - mock_celery_remove_job): + mocker): service = sample_service(notify_db, notify_db_session, limit=9) job = sample_job(notify_db, notify_db_session, service=service, notification_count=10) @@ -107,13 +105,11 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, job = jobs_dao.dao_get_job_by_id(job.id) assert job.status == 'sending limits exceeded' tasks.send_sms.apply_async.assert_not_called() - mock_celery_remove_job.assert_not_called() def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify_db, notify_db_session, - mocker, - mock_celery_remove_job): + mocker): service = sample_service(notify_db, notify_db_session, limit=1) job = sample_job(notify_db, notify_db_session, service=service) @@ -130,7 +126,6 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify assert job.status == 'sending limits exceeded' s3.get_job_from_s3.assert_not_called() tasks.send_sms.apply_async.assert_not_called() - mock_celery_remove_job.assert_not_called() def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(notify_db, notify_db_session, mocker): @@ -175,8 +170,7 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not @freeze_time("2016-01-01 11:09:00.061258") def test_should_process_email_job_if_exactly_on_send_limits(notify_db, notify_db_session, - mocker, - mock_celery_remove_job): + mocker): service = sample_service(notify_db, notify_db_session, limit=10) template = sample_email_template(notify_db, notify_db_session, service=service) job = sample_job(notify_db, notify_db_session, service=service, template=template, notification_count=10) @@ -203,10 +197,9 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db, ), queue="db-email" ) - mock_celery_remove_job.assert_called_once_with((str(job.id),), queue="remove-job") -def test_should_not_create_send_task_for_empty_file(sample_job, mocker, mock_celery_remove_job): +def test_should_not_create_send_task_for_empty_file(sample_job, mocker): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('empty')) mocker.patch('app.celery.tasks.send_sms.apply_async') @@ -218,11 +211,10 @@ def test_should_not_create_send_task_for_empty_file(sample_job, mocker, mock_cel ) job = jobs_dao.dao_get_job_by_id(sample_job.id) assert job.status == 'finished' - tasks.send_sms.apply_async.assert_not_called @freeze_time("2016-01-01 11:09:00.061258") -def test_should_process_email_job(sample_email_job, mocker, mock_celery_remove_job): +def test_should_process_email_job(sample_email_job, mocker): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('email')) mocker.patch('app.celery.tasks.send_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -249,13 +241,11 @@ def test_should_process_email_job(sample_email_job, mocker, mock_celery_remove_j ) job = jobs_dao.dao_get_job_by_id(sample_email_job.id) assert job.status == 'finished' - mock_celery_remove_job.assert_called_once_with((str(job.id),), queue="remove-job") def test_should_process_all_sms_job(sample_job, sample_job_with_placeholdered_template, - mocker, - mock_celery_remove_job): + mocker): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) mocker.patch('app.celery.tasks.send_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") diff --git a/tests/app/conftest.py b/tests/app/conftest.py index 08784a1dc..5dcdf3be8 100644 --- a/tests/app/conftest.py +++ b/tests/app/conftest.py @@ -479,11 +479,6 @@ def mock_encryption(mocker): return mocker.patch('app.encryption.encrypt', return_value="something_encrypted") -@pytest.fixture(scope='function') -def mock_celery_remove_job(mocker): - return mocker.patch('app.celery.tasks.remove_job.apply_async') - - @pytest.fixture(scope='function') def sample_invited_user(notify_db, notify_db_session, diff --git a/tests/app/dao/test_jobs_dao.py b/tests/app/dao/test_jobs_dao.py index 2b4728be1..ee116c8ca 100644 --- a/tests/app/dao/test_jobs_dao.py +++ b/tests/app/dao/test_jobs_dao.py @@ -1,5 +1,6 @@ from datetime import (datetime, timedelta) import uuid +from freezegun import freeze_time from app.dao.jobs_dao import ( dao_get_job_by_service_id_and_job_id, @@ -8,8 +9,8 @@ from app.dao.jobs_dao import ( dao_get_jobs_by_service_id, dao_get_scheduled_jobs, dao_get_future_scheduled_job_by_id_and_service_id, - dao_get_notification_outcomes_for_job -) + dao_get_notification_outcomes_for_job, + dao_get_jobs_older_than) from app.models import Job from tests.app.conftest import sample_notification, sample_job, sample_service @@ -255,3 +256,18 @@ def test_get_scheduled_jobs_gets_ignores_jobs_scheduled_in_the_future(sample_sch def test_get_future_scheduled_job_gets_a_job_yet_to_send(sample_scheduled_job): result = dao_get_future_scheduled_job_by_id_and_service_id(sample_scheduled_job.id, sample_scheduled_job.service_id) assert result.id == sample_scheduled_job.id + + +def test_should_get_jobs_older_than_seven_days(notify_db, notify_db_session): + one_millisecond_before_midnight = datetime(2016, 10, 9, 23, 59, 59, 999) + midnight = datetime(2016, 10, 10, 0, 0, 0, 0) + one_millisecond_past_midnight = datetime(2016, 10, 10, 0, 0, 0, 1) + + job_1 = sample_job(notify_db, notify_db_session, created_at=one_millisecond_before_midnight) + sample_job(notify_db, notify_db_session, created_at=midnight) + sample_job(notify_db, notify_db_session, created_at=one_millisecond_past_midnight) + + with freeze_time('2016-10-17T00:00:00'): + jobs = dao_get_jobs_older_than(7) + assert len(jobs) == 1 + assert jobs[0].id == job_1.id diff --git a/tests/app/public_contracts/test_POST_notification.py b/tests/app/public_contracts/test_POST_notification.py index ce333cef3..734a4cefe 100644 --- a/tests/app/public_contracts/test_POST_notification.py +++ b/tests/app/public_contracts/test_POST_notification.py @@ -5,7 +5,7 @@ from tests import create_authorization_header def test_post_sms_contract(client, mocker, sample_template): - mocker.patch('app.celery.tasks.send_sms.apply_async') + mocker.patch('app.celery.tasks.send_sms_to_provider.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") data = { @@ -25,7 +25,7 @@ def test_post_sms_contract(client, mocker, sample_template): def test_post_email_contract(client, mocker, sample_email_template): - mocker.patch('app.celery.tasks.send_email.apply_async') + mocker.patch('app.celery.tasks.send_email_to_provider.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") data = {