Merge pull request #635 from alphagov/scheduled-delivery-of-jobs

Scheduled delivery of jobs
This commit is contained in:
minglis
2016-08-30 12:48:48 +01:00
committed by GitHub
13 changed files with 312 additions and 58 deletions

View File

@@ -6,10 +6,30 @@ from sqlalchemy.exc import SQLAlchemyError
from app import notify_celery
from app.clients import STATISTICS_FAILURE
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job
from app.dao.notifications_dao import delete_notifications_created_more_than_a_week_ago, get_notifications, \
update_notification_status_by_id
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.statsd_decorators import statsd
from app.models import JOB_STATUS_PENDING
from app.celery.tasks import process_job
@notify_celery.task(name="run-scheduled-jobs")
@statsd(namespace="tasks")
def run_scheduled_jobs():
try:
jobs = dao_get_scheduled_jobs()
for job in jobs:
job.job_status = JOB_STATUS_PENDING
dao_update_job(job)
process_job.apply_async([str(job.id)], queue="process-job")
current_app.logger.info(
"Job ID {} added to process job queue".format(job.id)
)
except SQLAlchemyError as e:
current_app.logger.exception("Failed to run scheduled jobs", e)
raise
@notify_celery.task(name="delete-verify-codes")
@@ -21,8 +41,8 @@ def delete_verify_codes():
current_app.logger.info(
"Delete job started {} finished {} deleted {} verify codes".format(start, datetime.utcnow(), deleted)
)
except SQLAlchemyError:
current_app.logger.info("Failed to delete verify codes")
except SQLAlchemyError as e:
current_app.logger.exception("Failed to delete verify codes", e)
raise
@@ -39,8 +59,8 @@ def delete_successful_notifications():
deleted
)
)
except SQLAlchemyError:
current_app.logger.info("Failed to delete successful notifications")
except SQLAlchemyError as e:
current_app.logger.exception("Failed to delete successful notifications", e)
raise
@@ -60,8 +80,8 @@ def delete_failed_notifications():
deleted
)
)
except SQLAlchemyError:
current_app.logger.info("Failed to delete failed notifications")
except SQLAlchemyError as e:
current_app.logger.exception("Failed to delete failed notifications", e)
raise
@@ -74,8 +94,8 @@ def delete_invitations():
current_app.logger.info(
"Delete job started {} finished {} deleted {} invitations".format(start, datetime.utcnow(), deleted)
)
except SQLAlchemyError:
current_app.logger.info("Failed to delete invitations")
except SQLAlchemyError as e:
current_app.logger.exception("Failed to delete invitations", e)
raise
@@ -88,7 +108,7 @@ def timeout_notifications():
for noti in notifications:
try:
if (now - noti.created_at) > timedelta(
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
):
# TODO: think about making this a bulk update rather than one at a time.
updated = update_notification_status_by_id(noti.id, 'temporary-failure')
@@ -97,6 +117,6 @@ def timeout_notifications():
"Timeout period reached for notification ({}), status has been updated.".format(noti.id))
except Exception as e:
current_app.logger.exception(e)
current_app.logger.error((
"Exception raised trying to timeout notification ({})"
", skipping notification update.").format(noti.id))
current_app.logger.error(
"Exception raised trying to timeout notification ({}) skipping notification update.".format(noti.id)
)

View File

@@ -1,4 +1,5 @@
from sqlalchemy import desc, cast, Date as sql_date
from datetime import date, timedelta, datetime
from sqlalchemy import desc, asc, cast, Date as sql_date
from app import db
from app.dao import days_ago
from app.models import Job, NotificationHistory
@@ -36,6 +37,13 @@ def dao_get_job_by_id(job_id):
return Job.query.filter_by(id=job_id).one()
def dao_get_scheduled_jobs():
return Job.query \
.filter(Job.job_status == 'scheduled', Job.scheduled_for < datetime.utcnow()) \
.order_by(asc(Job.scheduled_for)) \
.all()
def dao_create_job(job):
db.session.add(job)
db.session.commit()

View File

@@ -28,6 +28,8 @@ from app.schemas import (
from app.celery.tasks import process_job
from app.models import JOB_STATUS_SCHEDULED, JOB_STATUS_PENDING
from app.utils import pagination_links
job = Blueprint('job', __name__, url_prefix='/service/<uuid:service_id>/job')
@@ -104,6 +106,7 @@ def create_job(service_id):
dao_fetch_service_by_id(service_id)
data = request.get_json()
data.update({
"service": service_id
})
@@ -115,7 +118,15 @@ def create_job(service_id):
raise InvalidRequest(errors, status_code=400)
data.update({"template_version": template.version})
job = job_schema.load(data).data
if job.scheduled_for:
job.job_status = JOB_STATUS_SCHEDULED
dao_create_job(job)
process_job.apply_async([str(job.id)], queue="process-job")
if job.job_status == JOB_STATUS_PENDING:
process_job.apply_async([str(job.id)], queue="process-job")
return jsonify(data=job_schema.dump(job).data), 201

View File

@@ -310,7 +310,7 @@ JOB_STATUS_SENDING_LIMITS_EXCEEDED = 'sending limits exceeded'
JOB_STATUS_SCHEDULED = 'scheduled'
class JobStatusTypes(db.Model):
class JobStatus(db.Model):
__tablename__ = 'job_status'
name = db.Column(db.String(255), primary_key=True)
@@ -362,7 +362,8 @@ class Job(db.Model):
unique=False,
nullable=True)
job_status = db.Column(
db.String(255), db.ForeignKey('job_status.name'), index=True, nullable=True)
db.String(255), db.ForeignKey('job_status.name'), index=True, nullable=True, default='pending'
)
VERIFY_CODE_TYPES = [EMAIL_TYPE, SMS_TYPE]

View File

@@ -1,10 +1,9 @@
import re
from datetime import (
datetime,
date
)
date,
timedelta)
from flask_marshmallow.fields import fields
from marshmallow import (
post_load,
ValidationError,
@@ -40,11 +39,31 @@ def _validate_positive_number(value, msg="Not a positive integer"):
raise ValidationError(msg)
def _validate_datetime_not_more_than_24_hours_in_future(dte, msg="Date cannot be more than 24hrs in the future"):
if dte > datetime.utcnow() + timedelta(hours=24):
raise ValidationError(msg)
def _validate_not_in_future(dte, msg="Date cannot be in the future"):
if dte > date.today():
raise ValidationError(msg)
def _validate_not_in_past(dte, msg="Date cannot be in the past"):
if dte < date.today():
raise ValidationError(msg)
def _validate_datetime_not_in_future(dte, msg="Date cannot be in the future"):
if dte > datetime.utcnow():
raise ValidationError(msg)
def _validate_datetime_not_in_past(dte, msg="Date cannot be in the past"):
if dte < datetime.utcnow():
raise ValidationError(msg)
# TODO I think marshmallow provides a better integration and error handling.
# Would be better to replace functionality in dao with the marshmallow supported
# functionality.
@@ -208,6 +227,15 @@ class JobSchema(BaseSchema):
dump_to="created_by", only=["id", "name"], dump_only=True)
created_by = field_for(models.Job, 'created_by', required=True, load_only=True)
job_status = field_for(models.JobStatus, 'name', required=False)
scheduled_for = fields.DateTime()
@validates('scheduled_for')
def validate_scheduled_for(self, value):
_validate_datetime_not_in_past(value)
_validate_datetime_not_more_than_24_hours_in_future(value)
class Meta:
model = models.Job
exclude = (