diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 4448079f0..097448ed8 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -27,17 +27,24 @@ def remove_csv_files(): @notify_celery.task(name="run-scheduled-jobs") @statsd(namespace="tasks") def run_scheduled_jobs(): + from app import db try: jobs = dao_get_scheduled_jobs() for job in jobs: job.job_status = JOB_STATUS_PENDING - dao_update_job(job) - process_job.apply_async([str(job.id)], queue="process-job") + from time import sleep + sleep(1) + print('SCHEDULING' + str(job.id)) + # dao_update_job(job) + # process_job.apply_async([str(job.id)], queue="process-job") current_app.logger.info( "Job ID {} added to process job queue".format(job.id) ) + db.session.add_all(jobs) + db.session.commit() except SQLAlchemyError as e: current_app.logger.exception("Failed to run scheduled jobs", e) + db.session.rollback() raise diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index 41ba00940..b9b677708 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -52,6 +52,7 @@ def dao_get_scheduled_jobs(): Job.scheduled_for < datetime.utcnow() ) \ .order_by(asc(Job.scheduled_for)) \ + .with_for_update() \ .all() diff --git a/lambda_function/README.md b/lambda_function/README.md new file mode 100644 index 000000000..3a8df5a1a --- /dev/null +++ b/lambda_function/README.md @@ -0,0 +1,9 @@ +# Lambda function + +This code is used to setup a lambda function to call into our API to deliver messages. + +Basic expected flow is that an request to GOV.UK Notify to send a text/email will persist in our DB then invoke an async lambda function. + +This function will call /deliver/notification/{id} which will make the API call to our delivery partners, updating the DB with the response. + +## EXPERIMENTAL AT THIS STAGE