Merge pull request #2216 from alphagov/rearrange_nightly_tasks

Rearrange nightly tasks and update ft_notification_status table
This commit is contained in:
Pea (Malgorzata Tyczynska)
2018-11-08 14:30:56 +00:00
committed by GitHub
5 changed files with 62 additions and 67 deletions

View File

@@ -15,7 +15,7 @@ from app.dao.fact_notification_status_dao import fetch_notification_status_for_d
@statsd(namespace="tasks")
def create_nightly_billing(day_start=None):
# day_start is a datetime.date() object. e.g.
# 3 days of data counting back from day_start is consolidated
# up to 10 days of data counting back from day_start is consolidated
if day_start is None:
day_start = datetime.today() - timedelta(days=1)
else:
@@ -37,13 +37,13 @@ def create_nightly_billing(day_start=None):
@statsd(namespace="tasks")
def create_nightly_notification_status(day_start=None):
# day_start is a datetime.date() object. e.g.
# 3 days of data counting back from day_start is consolidated
# 4 days of data counting back from day_start is consolidated
if day_start is None:
day_start = datetime.today() - timedelta(days=1)
else:
# When calling the task its a string in the format of "YYYY-MM-DD"
day_start = datetime.strptime(day_start, "%Y-%m-%d")
for i in range(0, 3):
for i in range(0, 4):
process_day = day_start - timedelta(days=i)
transit_data = fetch_notification_status_for_day(process_day=process_day)

View File

@@ -527,6 +527,12 @@ def migrate_data_to_ft_notification_status(start_date, end_date):
while process_date < end_date:
start_time = datetime.now()
# migrate data into ft_notification_status and update if record already exists
db.session.execute(
'delete from ft_notification_status where bst_date = :process_date',
{"process_date": process_date}
)
sql = \
"""
insert into ft_notification_status (bst_date, template_id, service_id, job_id, notification_type, key_type,
@@ -546,9 +552,6 @@ def migrate_data_to_ft_notification_status(start_date, end_date):
and n.created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
group by bst_date, template_id, service_id, job_id, notification_type, key_type, notification_status
order by bst_date
on conflict on constraint ft_notification_status_pkey do update set
notification_count = excluded.notification_count,
updated_at = now()
"""
result = db.session.execute(sql, {"start": process_date, "end": process_date + timedelta(days=1)})
db.session.commit()

View File

@@ -175,49 +175,66 @@ class Config(object):
'schedule': timedelta(minutes=66),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-sms-notifications': {
'task': 'delete-sms-notifications',
'schedule': crontab(hour=0, minute=0),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-email-notifications': {
'task': 'delete-email-notifications',
'schedule': crontab(hour=0, minute=20),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-letter-notifications': {
'task': 'delete-letter-notifications',
'schedule': crontab(hour=0, minute=40),
'options': {'queue': QueueNames.PERIODIC}
},
'delete-inbound-sms': {
'task': 'delete-inbound-sms',
'schedule': crontab(hour=1, minute=0),
'options': {'queue': QueueNames.PERIODIC}
},
'send-daily-performance-platform-stats': {
'task': 'send-daily-performance-platform-stats',
'schedule': crontab(hour=2, minute=0),
'options': {'queue': QueueNames.PERIODIC}
},
'switch-current-sms-provider-on-slow-delivery': {
'task': 'switch-current-sms-provider-on-slow-delivery',
'schedule': crontab(), # Every minute
'options': {'queue': QueueNames.PERIODIC}
},
'check-job-status': {
'task': 'check-job-status',
'schedule': crontab(),
'options': {'queue': QueueNames.PERIODIC}
},
'replay-created-notifications': {
'task': 'replay-created-notifications',
'schedule': crontab(minute='0, 15, 30, 45'),
'options': {'queue': QueueNames.PERIODIC}
},
# nightly tasks:
'timeout-sending-notifications': {
'task': 'timeout-sending-notifications',
'schedule': crontab(hour=3, minute=0),
'schedule': crontab(hour=0, minute=5),
'options': {'queue': QueueNames.PERIODIC}
},
'daily-stats-template-usage-by-month': {
'task': 'daily-stats-template-usage-by-month',
'schedule': crontab(hour=0, minute=10),
'options': {'queue': QueueNames.PERIODIC}
},
'create-nightly-billing': {
'task': 'create-nightly-billing',
'schedule': crontab(hour=3, minute=30),
'schedule': crontab(hour=0, minute=15),
'options': {'queue': QueueNames.PERIODIC}
},
'create-nightly-notification-status': {
'task': 'create-nightly-notification-status',
'schedule': crontab(hour=4, minute=30),
'schedule': crontab(hour=0, minute=30), # after 'timeout-sending-notifications'
'options': {'queue': QueueNames.PERIODIC}
},
'delete-sms-notifications': {
'task': 'delete-sms-notifications',
'schedule': crontab(hour=0, minute=45), # after 'create-nightly-notification-status'
'options': {'queue': QueueNames.PERIODIC}
},
'delete-email-notifications': {
'task': 'delete-email-notifications',
'schedule': crontab(hour=1, minute=0), # after 'create-nightly-notification-status'
'options': {'queue': QueueNames.PERIODIC}
},
'delete-letter-notifications': {
'task': 'delete-letter-notifications',
'schedule': crontab(hour=1, minute=20), # after 'create-nightly-notification-status'
'options': {'queue': QueueNames.PERIODIC}
},
'delete-inbound-sms': {
'task': 'delete-inbound-sms',
'schedule': crontab(hour=1, minute=40),
'options': {'queue': QueueNames.PERIODIC}
},
'send-daily-performance-platform-stats': {
'task': 'send-daily-performance-platform-stats',
'schedule': crontab(hour=2, minute=0),
'options': {'queue': QueueNames.PERIODIC}
},
'remove_sms_email_jobs': {
@@ -254,21 +271,6 @@ class Config(object):
'schedule': crontab(hour=23, minute=00),
'options': {'queue': QueueNames.PERIODIC}
},
'check-job-status': {
'task': 'check-job-status',
'schedule': crontab(),
'options': {'queue': QueueNames.PERIODIC}
},
'daily-stats-template-usage-by-month': {
'task': 'daily-stats-template-usage-by-month',
'schedule': crontab(hour=0, minute=5),
'options': {'queue': QueueNames.PERIODIC}
},
'replay-created-notifications': {
'task': 'replay-created-notifications',
'schedule': crontab(minute='0, 15, 30, 45'),
'options': {'queue': QueueNames.PERIODIC}
}
}
CELERY_QUEUES = []

View File

@@ -49,12 +49,9 @@ def fetch_notification_status_for_day(process_day, service_id=None):
def update_fact_notification_status(data, process_day):
table = FactNotificationStatus.__table__
'''
This uses the Postgres upsert to avoid race conditions when two threads try to insert
at the same row. The excluded object refers to values that we tried to insert but were
rejected.
http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#insert-on-conflict-upsert
'''
FactNotificationStatus.query.filter(
FactNotificationStatus.bst_date == process_day.date()
).delete()
for row in data:
stmt = insert(table).values(
bst_date=process_day.date(),
@@ -66,13 +63,6 @@ def update_fact_notification_status(data, process_day):
notification_status=row.status,
notification_count=row.notification_count,
)
stmt = stmt.on_conflict_do_update(
constraint="ft_notification_status_pkey",
set_={"notification_count": stmt.excluded.notification_count,
"updated_at": datetime.utcnow()
}
)
db.session.connection().execute(stmt)
db.session.commit()