mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-16 10:12:32 -05:00
Added new scheduled task
- runs 1 min past the hour, every hour - looks up all scheduled jobs that have a scheduled date in the past and adds them to the normal process job queue - these are then processed as normal
This commit is contained in:
@@ -6,10 +6,30 @@ from sqlalchemy.exc import SQLAlchemyError
|
|||||||
from app import notify_celery
|
from app import notify_celery
|
||||||
from app.clients import STATISTICS_FAILURE
|
from app.clients import STATISTICS_FAILURE
|
||||||
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
|
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
|
||||||
|
from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job
|
||||||
from app.dao.notifications_dao import delete_notifications_created_more_than_a_week_ago, get_notifications, \
|
from app.dao.notifications_dao import delete_notifications_created_more_than_a_week_ago, get_notifications, \
|
||||||
update_notification_status_by_id
|
update_notification_status_by_id
|
||||||
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
||||||
from app.statsd_decorators import statsd
|
from app.statsd_decorators import statsd
|
||||||
|
from app.models import JOB_STATUS_PENDING
|
||||||
|
from app.celery.tasks import process_job
|
||||||
|
|
||||||
|
|
||||||
|
@notify_celery.task(name="run-scheduled-jobs")
|
||||||
|
@statsd(namespace="tasks")
|
||||||
|
def run_scheduled_jobs():
|
||||||
|
try:
|
||||||
|
jobs = dao_get_scheduled_jobs()
|
||||||
|
for job in jobs:
|
||||||
|
job.job_status = JOB_STATUS_PENDING
|
||||||
|
dao_update_job(job)
|
||||||
|
process_job.apply_async([str(job.id)], queue="process-job")
|
||||||
|
current_app.logger.info(
|
||||||
|
"Job ID {} added to process job queue".format(job.id)
|
||||||
|
)
|
||||||
|
except SQLAlchemyError as e:
|
||||||
|
current_app.logger.exception("Failed to run scheduled jobs", e)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
@notify_celery.task(name="delete-verify-codes")
|
@notify_celery.task(name="delete-verify-codes")
|
||||||
@@ -21,8 +41,8 @@ def delete_verify_codes():
|
|||||||
current_app.logger.info(
|
current_app.logger.info(
|
||||||
"Delete job started {} finished {} deleted {} verify codes".format(start, datetime.utcnow(), deleted)
|
"Delete job started {} finished {} deleted {} verify codes".format(start, datetime.utcnow(), deleted)
|
||||||
)
|
)
|
||||||
except SQLAlchemyError:
|
except SQLAlchemyError as e:
|
||||||
current_app.logger.info("Failed to delete verify codes")
|
current_app.logger.exception("Failed to delete verify codes", e)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
@@ -39,8 +59,8 @@ def delete_successful_notifications():
|
|||||||
deleted
|
deleted
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
except SQLAlchemyError:
|
except SQLAlchemyError as e:
|
||||||
current_app.logger.info("Failed to delete successful notifications")
|
current_app.logger.exception("Failed to delete successful notifications", e)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
@@ -60,8 +80,8 @@ def delete_failed_notifications():
|
|||||||
deleted
|
deleted
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
except SQLAlchemyError:
|
except SQLAlchemyError as e:
|
||||||
current_app.logger.info("Failed to delete failed notifications")
|
current_app.logger.exception("Failed to delete failed notifications", e)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
@@ -74,8 +94,8 @@ def delete_invitations():
|
|||||||
current_app.logger.info(
|
current_app.logger.info(
|
||||||
"Delete job started {} finished {} deleted {} invitations".format(start, datetime.utcnow(), deleted)
|
"Delete job started {} finished {} deleted {} invitations".format(start, datetime.utcnow(), deleted)
|
||||||
)
|
)
|
||||||
except SQLAlchemyError:
|
except SQLAlchemyError as e:
|
||||||
current_app.logger.info("Failed to delete invitations")
|
current_app.logger.exception("Failed to delete invitations", e)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
@@ -88,7 +108,7 @@ def timeout_notifications():
|
|||||||
for noti in notifications:
|
for noti in notifications:
|
||||||
try:
|
try:
|
||||||
if (now - noti.created_at) > timedelta(
|
if (now - noti.created_at) > timedelta(
|
||||||
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
|
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
|
||||||
):
|
):
|
||||||
# TODO: think about making this a bulk update rather than one at a time.
|
# TODO: think about making this a bulk update rather than one at a time.
|
||||||
updated = update_notification_status_by_id(noti.id, 'temporary-failure', STATISTICS_FAILURE)
|
updated = update_notification_status_by_id(noti.id, 'temporary-failure', STATISTICS_FAILURE)
|
||||||
@@ -98,5 +118,5 @@ def timeout_notifications():
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
current_app.logger.exception(e)
|
current_app.logger.exception(e)
|
||||||
current_app.logger.error((
|
current_app.logger.error((
|
||||||
"Exception raised trying to timeout notification ({})"
|
"Exception raised trying to timeout notification ({})"
|
||||||
", skipping notification update.").format(noti.id))
|
", skipping notification update.").format(noti.id))
|
||||||
|
|||||||
@@ -50,6 +50,11 @@ class Config(object):
|
|||||||
CELERY_TASK_SERIALIZER = 'json'
|
CELERY_TASK_SERIALIZER = 'json'
|
||||||
CELERY_IMPORTS = ('app.celery.tasks', 'app.celery.scheduled_tasks')
|
CELERY_IMPORTS = ('app.celery.tasks', 'app.celery.scheduled_tasks')
|
||||||
CELERYBEAT_SCHEDULE = {
|
CELERYBEAT_SCHEDULE = {
|
||||||
|
'run-scheduled-jobs': {
|
||||||
|
'task': 'run-scheduled-jobs',
|
||||||
|
'schedule': crontab(minutes=1),
|
||||||
|
'options': {'queue': 'periodic'}
|
||||||
|
},
|
||||||
'delete-verify-codes': {
|
'delete-verify-codes': {
|
||||||
'task': 'delete-verify-codes',
|
'task': 'delete-verify-codes',
|
||||||
'schedule': timedelta(minutes=63),
|
'schedule': timedelta(minutes=63),
|
||||||
|
|||||||
@@ -7,8 +7,11 @@ from app.celery.scheduled_tasks import (delete_verify_codes,
|
|||||||
delete_successful_notifications,
|
delete_successful_notifications,
|
||||||
delete_failed_notifications,
|
delete_failed_notifications,
|
||||||
delete_invitations,
|
delete_invitations,
|
||||||
timeout_notifications)
|
timeout_notifications,
|
||||||
from tests.app.conftest import sample_notification
|
run_scheduled_jobs)
|
||||||
|
from app.dao.jobs_dao import dao_get_job_by_id
|
||||||
|
from tests.app.conftest import sample_notification, sample_job
|
||||||
|
from mock import call
|
||||||
|
|
||||||
|
|
||||||
def test_should_have_decorated_tasks_functions():
|
def test_should_have_decorated_tasks_functions():
|
||||||
@@ -17,10 +20,11 @@ def test_should_have_decorated_tasks_functions():
|
|||||||
assert delete_failed_notifications.__wrapped__.__name__ == 'delete_failed_notifications'
|
assert delete_failed_notifications.__wrapped__.__name__ == 'delete_failed_notifications'
|
||||||
assert timeout_notifications.__wrapped__.__name__ == 'timeout_notifications'
|
assert timeout_notifications.__wrapped__.__name__ == 'timeout_notifications'
|
||||||
assert delete_invitations.__wrapped__.__name__ == 'delete_invitations'
|
assert delete_invitations.__wrapped__.__name__ == 'delete_invitations'
|
||||||
|
assert run_scheduled_jobs.__wrapped__.__name__ == 'run_scheduled_jobs'
|
||||||
|
|
||||||
|
|
||||||
def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker):
|
def test_should_call_delete_notifications_more_than_week_in_task(notify_api, mocker):
|
||||||
mocked = mocker.patch('app.celery.scheduled_tasksgit .delete_notifications_created_more_than_a_week_ago')
|
mocked = mocker.patch('app.celery.scheduled_tasks.delete_notifications_created_more_than_a_week_ago')
|
||||||
delete_successful_notifications()
|
delete_successful_notifications()
|
||||||
assert mocked.assert_called_with('delivered')
|
assert mocked.assert_called_with('delivered')
|
||||||
assert scheduled_tasks.delete_notifications_created_more_than_a_week_ago.call_count == 1
|
assert scheduled_tasks.delete_notifications_created_more_than_a_week_ago.call_count == 1
|
||||||
@@ -80,3 +84,39 @@ def test_not_update_status_of_notification_before_timeout(notify_api,
|
|||||||
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') - 10))
|
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') - 10))
|
||||||
timeout_notifications()
|
timeout_notifications()
|
||||||
assert not1.status == 'sending'
|
assert not1.status == 'sending'
|
||||||
|
|
||||||
|
|
||||||
|
def test_should_update_scheduled_jobs_and_put_on_queue(notify_db, notify_db_session, mocker):
|
||||||
|
mocked = mocker.patch('app.celery.tasks.process_job.apply_async')
|
||||||
|
|
||||||
|
one_minute_in_the_past = datetime.utcnow() - timedelta(minutes=1)
|
||||||
|
job = sample_job(notify_db, notify_db_session, scheduled_for=one_minute_in_the_past, job_status='scheduled')
|
||||||
|
|
||||||
|
run_scheduled_jobs()
|
||||||
|
|
||||||
|
updated_job = dao_get_job_by_id(job.id)
|
||||||
|
assert updated_job.job_status == 'pending'
|
||||||
|
mocked.assert_called_with([str(job.id)], queue='process-job')
|
||||||
|
|
||||||
|
|
||||||
|
def test_should_update_all_scheduled_jobs_and_put_on_queue(notify_db, notify_db_session, mocker):
|
||||||
|
mocked = mocker.patch('app.celery.tasks.process_job.apply_async')
|
||||||
|
|
||||||
|
one_minute_in_the_past = datetime.utcnow() - timedelta(minutes=1)
|
||||||
|
ten_minutes_in_the_past = datetime.utcnow() - timedelta(minutes=10)
|
||||||
|
twenty_minutes_in_the_past = datetime.utcnow() - timedelta(minutes=20)
|
||||||
|
job_1 = sample_job(notify_db, notify_db_session, scheduled_for=one_minute_in_the_past, job_status='scheduled')
|
||||||
|
job_2 = sample_job(notify_db, notify_db_session, scheduled_for=ten_minutes_in_the_past, job_status='scheduled')
|
||||||
|
job_3 = sample_job(notify_db, notify_db_session, scheduled_for=twenty_minutes_in_the_past, job_status='scheduled')
|
||||||
|
|
||||||
|
run_scheduled_jobs()
|
||||||
|
|
||||||
|
assert dao_get_job_by_id(job_1.id).job_status == 'pending'
|
||||||
|
assert dao_get_job_by_id(job_2.id).job_status == 'pending'
|
||||||
|
assert dao_get_job_by_id(job_2.id).job_status == 'pending'
|
||||||
|
|
||||||
|
mocked.assert_has_calls([
|
||||||
|
call([str(job_3.id)], queue='process-job'),
|
||||||
|
call([str(job_2.id)], queue='process-job'),
|
||||||
|
call([str(job_1.id)], queue='process-job')
|
||||||
|
])
|
||||||
|
|||||||
Reference in New Issue
Block a user