mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-16 02:02:13 -05:00
Move scheduled tasks into their own module.
There is no change to the functionality - only moving the code around.
This commit is contained in:
94
app/celery/scheduled_tasks.py
Normal file
94
app/celery/scheduled_tasks.py
Normal file
@@ -0,0 +1,94 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from flask import current_app
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from app import notify_celery
|
||||
from app.clients import STATISTICS_FAILURE
|
||||
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
|
||||
from app.dao.notifications_dao import delete_notifications_created_more_than_a_week_ago, get_notifications, \
|
||||
update_notification_status_by_id
|
||||
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-verify-codes")
|
||||
def delete_verify_codes():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_codes_older_created_more_than_a_day_ago()
|
||||
current_app.logger.info(
|
||||
"Delete job started {} finished {} deleted {} verify codes".format(start, datetime.utcnow(), deleted)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.info("Failed to delete verify codes")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-successful-notifications")
|
||||
def delete_successful_notifications():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_notifications_created_more_than_a_week_ago('delivered')
|
||||
current_app.logger.info(
|
||||
"Delete job started {} finished {} deleted {} successful notifications".format(
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
deleted
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.info("Failed to delete successful notifications")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-failed-notifications")
|
||||
def delete_failed_notifications():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_notifications_created_more_than_a_week_ago('failed')
|
||||
deleted += delete_notifications_created_more_than_a_week_ago('technical-failure')
|
||||
deleted += delete_notifications_created_more_than_a_week_ago('temporary-failure')
|
||||
deleted += delete_notifications_created_more_than_a_week_ago('permanent-failure')
|
||||
current_app.logger.info(
|
||||
"Delete job started {} finished {} deleted {} failed notifications".format(
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
deleted
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.info("Failed to delete failed notifications")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-invitations")
|
||||
def delete_invitations():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_invitations_created_more_than_two_days_ago()
|
||||
current_app.logger.info(
|
||||
"Delete job started {} finished {} deleted {} invitations".format(start, datetime.utcnow(), deleted)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.info("Failed to delete invitations")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name='timeout-sending-notifications')
|
||||
def timeout_notifications():
|
||||
notifications = get_notifications(filter_dict={'status': 'sending'})
|
||||
now = datetime.utcnow()
|
||||
for noti in notifications:
|
||||
try:
|
||||
if (now - noti.created_at) > timedelta(
|
||||
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
|
||||
):
|
||||
update_notification_status_by_id(noti.id, 'temporary-failure', STATISTICS_FAILURE)
|
||||
current_app.logger.info((
|
||||
"Timeout period reached for notification ({})"
|
||||
", status has been updated.").format(noti.id))
|
||||
except Exception as e:
|
||||
current_app.logger.exception(e)
|
||||
current_app.logger.error((
|
||||
"Exception raised trying to timeout notification ({})"
|
||||
", skipping notification update.").format(noti.id))
|
||||
@@ -1,27 +1,16 @@
|
||||
import itertools
|
||||
from datetime import (datetime, timedelta)
|
||||
from datetime import (datetime)
|
||||
|
||||
from flask import current_app
|
||||
from monotonic import monotonic
|
||||
from notifications_utils.recipients import (
|
||||
RecipientCSV,
|
||||
allowed_to_send_to
|
||||
)
|
||||
from notifications_utils.template import Template
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from app import clients, statsd_client
|
||||
from app.clients import STATISTICS_FAILURE
|
||||
from app.clients.email import EmailClientException
|
||||
from app.dao.services_dao import dao_fetch_service_by_id
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
from app.dao.provider_details_dao import get_provider_details_by_notification_type
|
||||
from app.celery.provider_tasks import send_sms_to_provider
|
||||
from app.celery.research_mode_tasks import send_email_response
|
||||
|
||||
from notifications_utils.template import Template
|
||||
|
||||
from notifications_utils.recipients import (
|
||||
RecipientCSV,
|
||||
validate_and_format_phone_number,
|
||||
allowed_to_send_to
|
||||
)
|
||||
|
||||
from app import (
|
||||
create_uuid,
|
||||
DATETIME_FORMAT,
|
||||
@@ -29,26 +18,23 @@ from app import (
|
||||
notify_celery,
|
||||
encryption
|
||||
)
|
||||
|
||||
from app.aws import s3
|
||||
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
||||
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
|
||||
|
||||
from app.dao.notifications_dao import (
|
||||
dao_create_notification,
|
||||
dao_update_notification,
|
||||
delete_notifications_created_more_than_a_week_ago,
|
||||
dao_get_notification_statistics_for_service_and_day,
|
||||
update_provider_stats,
|
||||
get_notifications,
|
||||
update_notification_status_by_id
|
||||
)
|
||||
|
||||
from app.celery.provider_tasks import send_sms_to_provider
|
||||
from app.celery.research_mode_tasks import send_email_response
|
||||
from app.clients.email import EmailClientException
|
||||
from app.dao.jobs_dao import (
|
||||
dao_update_job,
|
||||
dao_get_job_by_id
|
||||
)
|
||||
|
||||
from app.dao.notifications_dao import (
|
||||
dao_create_notification,
|
||||
dao_update_notification,
|
||||
dao_get_notification_statistics_for_service_and_day,
|
||||
update_provider_stats
|
||||
)
|
||||
from app.dao.provider_details_dao import get_provider_details_by_notification_type
|
||||
from app.dao.services_dao import dao_fetch_service_by_id
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
from app.models import (
|
||||
Notification,
|
||||
TEMPLATE_TYPE_EMAIL,
|
||||
@@ -56,69 +42,6 @@ from app.models import (
|
||||
)
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-verify-codes")
|
||||
def delete_verify_codes():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_codes_older_created_more_than_a_day_ago()
|
||||
current_app.logger.info(
|
||||
"Delete job started {} finished {} deleted {} verify codes".format(start, datetime.utcnow(), deleted)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.info("Failed to delete verify codes")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-successful-notifications")
|
||||
def delete_successful_notifications():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_notifications_created_more_than_a_week_ago('delivered')
|
||||
current_app.logger.info(
|
||||
"Delete job started {} finished {} deleted {} successful notifications".format(
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
deleted
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.info("Failed to delete successful notifications")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-failed-notifications")
|
||||
def delete_failed_notifications():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_notifications_created_more_than_a_week_ago('failed')
|
||||
deleted += delete_notifications_created_more_than_a_week_ago('technical-failure')
|
||||
deleted += delete_notifications_created_more_than_a_week_ago('temporary-failure')
|
||||
deleted += delete_notifications_created_more_than_a_week_ago('permanent-failure')
|
||||
current_app.logger.info(
|
||||
"Delete job started {} finished {} deleted {} failed notifications".format(
|
||||
start,
|
||||
datetime.utcnow(),
|
||||
deleted
|
||||
)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.info("Failed to delete failed notifications")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-invitations")
|
||||
def delete_invitations():
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
deleted = delete_invitations_created_more_than_two_days_ago()
|
||||
current_app.logger.info(
|
||||
"Delete job started {} finished {} deleted {} invitations".format(start, datetime.utcnow(), deleted)
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.info("Failed to delete invitations")
|
||||
raise
|
||||
|
||||
|
||||
@notify_celery.task(name="process-job")
|
||||
def process_job(job_id):
|
||||
task_start = monotonic()
|
||||
@@ -357,23 +280,3 @@ def provider_to_use(notification_type, notification_id):
|
||||
raise Exception("No active {} providers".format(notification_type))
|
||||
|
||||
return clients.get_client_by_name_and_type(active_providers_in_order[0].identifier, notification_type)
|
||||
|
||||
|
||||
@notify_celery.task(name='timeout-sending-notifications')
|
||||
def timeout_notifications():
|
||||
notifications = get_notifications(filter_dict={'status': 'sending'})
|
||||
now = datetime.utcnow()
|
||||
for noti in notifications:
|
||||
try:
|
||||
if (now - noti.created_at) > timedelta(
|
||||
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
|
||||
):
|
||||
update_notification_status_by_id(noti.id, 'temporary-failure', STATISTICS_FAILURE)
|
||||
current_app.logger.info((
|
||||
"Timeout period reached for notification ({})"
|
||||
", status has been updated.").format(noti.id))
|
||||
except Exception as e:
|
||||
current_app.logger.exception(e)
|
||||
current_app.logger.error((
|
||||
"Exception raised trying to timeout notification ({})"
|
||||
", skipping notification update.").format(noti.id))
|
||||
|
||||
74
tests/app/celery/test_scheduled_tasks.py
Normal file
74
tests/app/celery/test_scheduled_tasks.py
Normal file
@@ -0,0 +1,74 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from flask import current_app
|
||||
|
||||
from app.celery import scheduled_tasks
|
||||
from app.celery.scheduled_tasks import (delete_verify_codes,
|
||||
delete_successful_notifications,
|
||||
delete_failed_notifications,
|
||||
delete_invitations,
|
||||
timeout_notifications)
|
||||
from tests.app.conftest import sample_notification
|
||||
|
||||
|
||||
def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker):
|
||||
mocked = mocker.patch('app.celery.scheduled_tasksgit .delete_notifications_created_more_than_a_week_ago')
|
||||
delete_successful_notifications()
|
||||
assert mocked.assert_called_with('delivered')
|
||||
assert scheduled_tasks.delete_notifications_created_more_than_a_week_ago.call_count == 1
|
||||
|
||||
|
||||
def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker):
|
||||
mocker.patch('app.celery.scheduled_tasks.delete_notifications_created_more_than_a_week_ago')
|
||||
delete_failed_notifications()
|
||||
assert scheduled_tasks.delete_notifications_created_more_than_a_week_ago.call_count == 4
|
||||
|
||||
|
||||
def test_should_call_delete_codes_on_delete_verify_codes_task(notify_api, mocker):
|
||||
mocker.patch('app.celery.scheduled_tasks.delete_codes_older_created_more_than_a_day_ago')
|
||||
delete_verify_codes()
|
||||
assert scheduled_tasks.delete_codes_older_created_more_than_a_day_ago.call_count == 1
|
||||
|
||||
|
||||
def test_should_call_delete_invotations_on_delete_invitations_task(notify_api, mocker):
|
||||
mocker.patch('app.celery.scheduled_tasks.delete_invitations_created_more_than_two_days_ago')
|
||||
delete_invitations()
|
||||
assert scheduled_tasks.delete_invitations_created_more_than_two_days_ago.call_count == 1
|
||||
|
||||
|
||||
def test_update_status_of_notifications_after_timeout(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service,
|
||||
sample_template,
|
||||
mmg_provider):
|
||||
with notify_api.test_request_context():
|
||||
not1 = sample_notification(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
service=sample_service,
|
||||
template=sample_template,
|
||||
status='sending',
|
||||
created_at=datetime.utcnow() - timedelta(
|
||||
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') + 10))
|
||||
timeout_notifications()
|
||||
assert not1.status == 'temporary-failure'
|
||||
|
||||
|
||||
def test_not_update_status_of_notification_before_timeout(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service,
|
||||
sample_template,
|
||||
mmg_provider):
|
||||
with notify_api.test_request_context():
|
||||
not1 = sample_notification(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
service=sample_service,
|
||||
template=sample_template,
|
||||
status='sending',
|
||||
created_at=datetime.utcnow() - timedelta(
|
||||
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') - 10))
|
||||
timeout_notifications()
|
||||
assert not1.status == 'sending'
|
||||
@@ -1,32 +1,27 @@
|
||||
import uuid
|
||||
import pytest
|
||||
from flask import current_app
|
||||
from mock import ANY
|
||||
from datetime import datetime
|
||||
|
||||
import pytest
|
||||
from freezegun import freeze_time
|
||||
from mock import ANY
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from app import (aws_ses_client, encryption, DATETIME_FORMAT, statsd_client)
|
||||
from app.celery import provider_tasks
|
||||
from app.celery import tasks
|
||||
from app.celery.research_mode_tasks import send_email_response
|
||||
from app.celery.tasks import s3
|
||||
from app.celery.tasks import (
|
||||
send_sms,
|
||||
process_job,
|
||||
delete_verify_codes,
|
||||
delete_invitations,
|
||||
delete_failed_notifications,
|
||||
delete_successful_notifications,
|
||||
provider_to_use,
|
||||
timeout_notifications,
|
||||
send_email
|
||||
)
|
||||
from app import (aws_ses_client, encryption, DATETIME_FORMAT, statsd_client)
|
||||
from app.celery.research_mode_tasks import send_email_response
|
||||
from app.clients.email.aws_ses import AwsSesClientException
|
||||
from app.dao import notifications_dao, jobs_dao, provider_details_dao
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
from app.celery.tasks import s3
|
||||
from app.celery import tasks
|
||||
from app.dao.provider_statistics_dao import get_provider_statistics
|
||||
from tests.app import load_example_csv
|
||||
from datetime import datetime, timedelta
|
||||
from freezegun import freeze_time
|
||||
from tests.app.conftest import (
|
||||
sample_service,
|
||||
sample_user,
|
||||
@@ -76,31 +71,6 @@ def test_should_return_highest_priority_active_provider(notify_db, notify_db_ses
|
||||
assert provider_to_use('sms', '1234').name == first.identifier
|
||||
|
||||
|
||||
def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker):
|
||||
mocked = mocker.patch('app.celery.tasks.delete_notifications_created_more_than_a_week_ago')
|
||||
delete_successful_notifications()
|
||||
assert mocked.assert_called_with('delivered')
|
||||
assert tasks.delete_notifications_created_more_than_a_week_ago.call_count == 1
|
||||
|
||||
|
||||
def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker):
|
||||
mocker.patch('app.celery.tasks.delete_notifications_created_more_than_a_week_ago')
|
||||
delete_failed_notifications()
|
||||
assert tasks.delete_notifications_created_more_than_a_week_ago.call_count == 4
|
||||
|
||||
|
||||
def test_should_call_delete_codes_on_delete_verify_codes_task(notify_api, mocker):
|
||||
mocker.patch('app.celery.tasks.delete_codes_older_created_more_than_a_day_ago')
|
||||
delete_verify_codes()
|
||||
assert tasks.delete_codes_older_created_more_than_a_day_ago.call_count == 1
|
||||
|
||||
|
||||
def test_should_call_delete_invotations_on_delete_invitations_task(notify_api, mocker):
|
||||
mocker.patch('app.celery.tasks.delete_invitations_created_more_than_two_days_ago')
|
||||
delete_invitations()
|
||||
assert tasks.delete_invitations_created_more_than_two_days_ago.call_count == 1
|
||||
|
||||
|
||||
@freeze_time("2016-01-01 11:09:00.061258")
|
||||
def test_should_process_sms_job(sample_job, mocker, mock_celery_remove_job):
|
||||
mocker.patch('app.statsd_client.incr')
|
||||
@@ -907,41 +877,3 @@ def _notification_json(template, to, personalisation=None, job_id=None, row_numb
|
||||
if row_number:
|
||||
notification['row_number'] = row_number
|
||||
return notification
|
||||
|
||||
|
||||
def test_update_status_of_notifications_after_timeout(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service,
|
||||
sample_template,
|
||||
mmg_provider):
|
||||
with notify_api.test_request_context():
|
||||
not1 = sample_notification(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
service=sample_service,
|
||||
template=sample_template,
|
||||
status='sending',
|
||||
created_at=datetime.utcnow() - timedelta(
|
||||
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') + 10))
|
||||
timeout_notifications()
|
||||
assert not1.status == 'temporary-failure'
|
||||
|
||||
|
||||
def test_not_update_status_of_notification_before_timeout(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service,
|
||||
sample_template,
|
||||
mmg_provider):
|
||||
with notify_api.test_request_context():
|
||||
not1 = sample_notification(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
service=sample_service,
|
||||
template=sample_template,
|
||||
status='sending',
|
||||
created_at=datetime.utcnow() - timedelta(
|
||||
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') - 10))
|
||||
timeout_notifications()
|
||||
assert not1.status == 'sending'
|
||||
|
||||
Reference in New Issue
Block a user