Merge pull request #452 from alphagov/move-scheduler-tasks

Move scheduler tasks
This commit is contained in:
Rebecca Law
2016-06-21 13:56:40 +01:00
committed by GitHub
6 changed files with 196 additions and 193 deletions

View File

@@ -0,0 +1,94 @@
from datetime import datetime, timedelta
from flask import current_app
from sqlalchemy.exc import SQLAlchemyError
from app import notify_celery
from app.clients import STATISTICS_FAILURE
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
from app.dao.notifications_dao import delete_notifications_created_more_than_a_week_ago, get_notifications, \
update_notification_status_by_id
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
@notify_celery.task(name="delete-verify-codes")
def delete_verify_codes():
try:
start = datetime.utcnow()
deleted = delete_codes_older_created_more_than_a_day_ago()
current_app.logger.info(
"Delete job started {} finished {} deleted {} verify codes".format(start, datetime.utcnow(), deleted)
)
except SQLAlchemyError:
current_app.logger.info("Failed to delete verify codes")
raise
@notify_celery.task(name="delete-successful-notifications")
def delete_successful_notifications():
try:
start = datetime.utcnow()
deleted = delete_notifications_created_more_than_a_week_ago('delivered')
current_app.logger.info(
"Delete job started {} finished {} deleted {} successful notifications".format(
start,
datetime.utcnow(),
deleted
)
)
except SQLAlchemyError:
current_app.logger.info("Failed to delete successful notifications")
raise
@notify_celery.task(name="delete-failed-notifications")
def delete_failed_notifications():
try:
start = datetime.utcnow()
deleted = delete_notifications_created_more_than_a_week_ago('failed')
deleted += delete_notifications_created_more_than_a_week_ago('technical-failure')
deleted += delete_notifications_created_more_than_a_week_ago('temporary-failure')
deleted += delete_notifications_created_more_than_a_week_ago('permanent-failure')
current_app.logger.info(
"Delete job started {} finished {} deleted {} failed notifications".format(
start,
datetime.utcnow(),
deleted
)
)
except SQLAlchemyError:
current_app.logger.info("Failed to delete failed notifications")
raise
@notify_celery.task(name="delete-invitations")
def delete_invitations():
try:
start = datetime.utcnow()
deleted = delete_invitations_created_more_than_two_days_ago()
current_app.logger.info(
"Delete job started {} finished {} deleted {} invitations".format(start, datetime.utcnow(), deleted)
)
except SQLAlchemyError:
current_app.logger.info("Failed to delete invitations")
raise
@notify_celery.task(name='timeout-sending-notifications')
def timeout_notifications():
notifications = get_notifications(filter_dict={'status': 'sending'})
now = datetime.utcnow()
for noti in notifications:
try:
if (now - noti.created_at) > timedelta(
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
):
update_notification_status_by_id(noti.id, 'temporary-failure', STATISTICS_FAILURE)
current_app.logger.info((
"Timeout period reached for notification ({})"
", status has been updated.").format(noti.id))
except Exception as e:
current_app.logger.exception(e)
current_app.logger.error((
"Exception raised trying to timeout notification ({})"
", skipping notification update.").format(noti.id))

View File

@@ -1,26 +1,16 @@
import itertools
from datetime import (datetime, timedelta)
from datetime import (datetime)
from flask import current_app
from monotonic import monotonic
from sqlalchemy.exc import SQLAlchemyError
from app import clients, statsd_client
from app.clients import STATISTICS_FAILURE
from app.clients.email import EmailClientException
from app.dao.services_dao import dao_fetch_service_by_id
from app.dao.templates_dao import dao_get_template_by_id
from app.dao.provider_details_dao import get_provider_details_by_notification_type
from app.celery.provider_tasks import send_sms_to_provider
from app.celery.research_mode_tasks import send_email_response
from notifications_utils.template import Template
from notifications_utils.recipients import (
RecipientCSV,
allowed_to_send_to
)
from notifications_utils.template import Template
from sqlalchemy.exc import SQLAlchemyError
from app import clients, statsd_client
from app import (
create_uuid,
DATETIME_FORMAT,
@@ -28,26 +18,23 @@ from app import (
notify_celery,
encryption
)
from app.aws import s3
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
from app.dao.notifications_dao import (
dao_create_notification,
dao_update_notification,
delete_notifications_created_more_than_a_week_ago,
dao_get_notification_statistics_for_service_and_day,
update_provider_stats,
get_notifications,
update_notification_status_by_id
)
from app.celery.provider_tasks import send_sms_to_provider
from app.celery.research_mode_tasks import send_email_response
from app.clients.email import EmailClientException
from app.dao.jobs_dao import (
dao_update_job,
dao_get_job_by_id
)
from app.dao.notifications_dao import (
dao_create_notification,
dao_update_notification,
dao_get_notification_statistics_for_service_and_day,
update_provider_stats
)
from app.dao.provider_details_dao import get_provider_details_by_notification_type
from app.dao.services_dao import dao_fetch_service_by_id
from app.dao.templates_dao import dao_get_template_by_id
from app.models import (
Notification,
TEMPLATE_TYPE_EMAIL,
@@ -55,69 +42,6 @@ from app.models import (
)
@notify_celery.task(name="delete-verify-codes")
def delete_verify_codes():
try:
start = datetime.utcnow()
deleted = delete_codes_older_created_more_than_a_day_ago()
current_app.logger.info(
"Delete job started {} finished {} deleted {} verify codes".format(start, datetime.utcnow(), deleted)
)
except SQLAlchemyError:
current_app.logger.info("Failed to delete verify codes")
raise
@notify_celery.task(name="delete-successful-notifications")
def delete_successful_notifications():
try:
start = datetime.utcnow()
deleted = delete_notifications_created_more_than_a_week_ago('delivered')
current_app.logger.info(
"Delete job started {} finished {} deleted {} successful notifications".format(
start,
datetime.utcnow(),
deleted
)
)
except SQLAlchemyError:
current_app.logger.info("Failed to delete successful notifications")
raise
@notify_celery.task(name="delete-failed-notifications")
def delete_failed_notifications():
try:
start = datetime.utcnow()
deleted = delete_notifications_created_more_than_a_week_ago('failed')
deleted += delete_notifications_created_more_than_a_week_ago('technical-failure')
deleted += delete_notifications_created_more_than_a_week_ago('temporary-failure')
deleted += delete_notifications_created_more_than_a_week_ago('permanent-failure')
current_app.logger.info(
"Delete job started {} finished {} deleted {} failed notifications".format(
start,
datetime.utcnow(),
deleted
)
)
except SQLAlchemyError:
current_app.logger.info("Failed to delete failed notifications")
raise
@notify_celery.task(name="delete-invitations")
def delete_invitations():
try:
start = datetime.utcnow()
deleted = delete_invitations_created_more_than_two_days_ago()
current_app.logger.info(
"Delete job started {} finished {} deleted {} invitations".format(start, datetime.utcnow(), deleted)
)
except SQLAlchemyError:
current_app.logger.info("Failed to delete invitations")
raise
@notify_celery.task(name="process-job")
def process_job(job_id):
task_start = monotonic()
@@ -358,23 +282,3 @@ def provider_to_use(notification_type, notification_id):
raise Exception("No active {} providers".format(notification_type))
return clients.get_client_by_name_and_type(active_providers_in_order[0].identifier, notification_type)
@notify_celery.task(name='timeout-sending-notifications')
def timeout_notifications():
notifications = get_notifications(filter_dict={'status': 'sending'})
now = datetime.utcnow()
for noti in notifications:
try:
if (now - noti.created_at) > timedelta(
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
):
update_notification_status_by_id(noti.id, 'temporary-failure', STATISTICS_FAILURE)
current_app.logger.info((
"Timeout period reached for notification ({})"
", status has been updated.").format(noti.id))
except Exception as e:
current_app.logger.exception(e)
current_app.logger.error((
"Exception raised trying to timeout notification ({})"
", skipping notification update.").format(noti.id))

View File

@@ -45,7 +45,7 @@ class Config(object):
CELERY_TIMEZONE = 'Europe/London'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_IMPORTS = ('app.celery.tasks',)
CELERY_IMPORTS = ('app.celery.tasks', 'app.celery.scheduled_tasks')
CELERYBEAT_SCHEDULE = {
'delete-verify-codes': {
'task': 'delete-verify-codes',

View File

@@ -0,0 +1,74 @@
from datetime import datetime, timedelta
from flask import current_app
from app.celery import scheduled_tasks
from app.celery.scheduled_tasks import (delete_verify_codes,
delete_successful_notifications,
delete_failed_notifications,
delete_invitations,
timeout_notifications)
from tests.app.conftest import sample_notification
def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker):
mocked = mocker.patch('app.celery.scheduled_tasksgit .delete_notifications_created_more_than_a_week_ago')
delete_successful_notifications()
assert mocked.assert_called_with('delivered')
assert scheduled_tasks.delete_notifications_created_more_than_a_week_ago.call_count == 1
def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker):
mocker.patch('app.celery.scheduled_tasks.delete_notifications_created_more_than_a_week_ago')
delete_failed_notifications()
assert scheduled_tasks.delete_notifications_created_more_than_a_week_ago.call_count == 4
def test_should_call_delete_codes_on_delete_verify_codes_task(notify_api, mocker):
mocker.patch('app.celery.scheduled_tasks.delete_codes_older_created_more_than_a_day_ago')
delete_verify_codes()
assert scheduled_tasks.delete_codes_older_created_more_than_a_day_ago.call_count == 1
def test_should_call_delete_invotations_on_delete_invitations_task(notify_api, mocker):
mocker.patch('app.celery.scheduled_tasks.delete_invitations_created_more_than_two_days_ago')
delete_invitations()
assert scheduled_tasks.delete_invitations_created_more_than_two_days_ago.call_count == 1
def test_update_status_of_notifications_after_timeout(notify_api,
notify_db,
notify_db_session,
sample_service,
sample_template,
mmg_provider):
with notify_api.test_request_context():
not1 = sample_notification(
notify_db,
notify_db_session,
service=sample_service,
template=sample_template,
status='sending',
created_at=datetime.utcnow() - timedelta(
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') + 10))
timeout_notifications()
assert not1.status == 'temporary-failure'
def test_not_update_status_of_notification_before_timeout(notify_api,
notify_db,
notify_db_session,
sample_service,
sample_template,
mmg_provider):
with notify_api.test_request_context():
not1 = sample_notification(
notify_db,
notify_db_session,
service=sample_service,
template=sample_template,
status='sending',
created_at=datetime.utcnow() - timedelta(
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') - 10))
timeout_notifications()
assert not1.status == 'sending'

View File

@@ -1,32 +1,27 @@
import uuid
import pytest
from flask import current_app
from mock import ANY
from datetime import datetime
import pytest
from freezegun import freeze_time
from mock import ANY
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.exc import NoResultFound
from app import (aws_ses_client, encryption, DATETIME_FORMAT, statsd_client)
from app.celery import provider_tasks
from app.celery import tasks
from app.celery.research_mode_tasks import send_email_response
from app.celery.tasks import s3
from app.celery.tasks import (
send_sms,
process_job,
delete_verify_codes,
delete_invitations,
delete_failed_notifications,
delete_successful_notifications,
provider_to_use,
timeout_notifications,
send_email
)
from app import (aws_ses_client, encryption, DATETIME_FORMAT, statsd_client)
from app.celery.research_mode_tasks import send_email_response
from app.clients.email.aws_ses import AwsSesClientException
from app.dao import notifications_dao, jobs_dao, provider_details_dao
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.exc import NoResultFound
from app.celery.tasks import s3
from app.celery import tasks
from app.dao.provider_statistics_dao import get_provider_statistics
from tests.app import load_example_csv
from datetime import datetime, timedelta
from freezegun import freeze_time
from tests.app.conftest import (
sample_service,
sample_user,
@@ -91,31 +86,6 @@ def test_should_return_highest_priority_active_provider(notify_db, notify_db_ses
assert provider_to_use('sms', '1234').name == first.identifier
def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker):
mocked = mocker.patch('app.celery.tasks.delete_notifications_created_more_than_a_week_ago')
delete_successful_notifications()
assert mocked.assert_called_with('delivered')
assert tasks.delete_notifications_created_more_than_a_week_ago.call_count == 1
def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker):
mocker.patch('app.celery.tasks.delete_notifications_created_more_than_a_week_ago')
delete_failed_notifications()
assert tasks.delete_notifications_created_more_than_a_week_ago.call_count == 4
def test_should_call_delete_codes_on_delete_verify_codes_task(notify_api, mocker):
mocker.patch('app.celery.tasks.delete_codes_older_created_more_than_a_day_ago')
delete_verify_codes()
assert tasks.delete_codes_older_created_more_than_a_day_ago.call_count == 1
def test_should_call_delete_invotations_on_delete_invitations_task(notify_api, mocker):
mocker.patch('app.celery.tasks.delete_invitations_created_more_than_two_days_ago')
delete_invitations()
assert tasks.delete_invitations_created_more_than_two_days_ago.call_count == 1
@freeze_time("2016-01-01 11:09:00.061258")
def test_should_process_sms_job(sample_job, mocker, mock_celery_remove_job):
mocker.patch('app.statsd_client.incr')
@@ -909,44 +879,6 @@ def test_should_call_send_not_update_provider_email_stats_if_research_mode(
providers=[ses_provider.identifier]).first()
def test_update_status_of_notifications_after_timeout(notify_api,
notify_db,
notify_db_session,
sample_service,
sample_template,
mmg_provider):
with notify_api.test_request_context():
not1 = sample_notification(
notify_db,
notify_db_session,
service=sample_service,
template=sample_template,
status='sending',
created_at=datetime.utcnow() - timedelta(
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') + 10))
timeout_notifications()
assert not1.status == 'temporary-failure'
def test_not_update_status_of_notification_before_timeout(notify_api,
notify_db,
notify_db_session,
sample_service,
sample_template,
mmg_provider):
with notify_api.test_request_context():
not1 = sample_notification(
notify_db,
notify_db_session,
service=sample_service,
template=sample_template,
status='sending',
created_at=datetime.utcnow() - timedelta(
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') - 10))
timeout_notifications()
assert not1.status == 'sending'
def test_email_template_personalisation_persisted(sample_email_template_with_placeholders, mocker):
encrypted_notification = encryption.encrypt(_notification_json(
sample_email_template_with_placeholders,

View File

@@ -337,7 +337,6 @@ def test_send_user_email_verification(notify_api,
'url': current_app.config['ADMIN_BASE_URL'] + '/verify-email/' + 'the-token'
}
}
print('test message: {}'.format(message))
app.celery.tasks.send_email.apply_async.assert_called_once_with(
(str(current_app.config['NOTIFY_SERVICE_ID']),
'some_uuid',