From 44b7b36acdef9458837d59de75ee9ba051b67018 Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Thu, 26 Sep 2019 14:19:09 +0100 Subject: [PATCH] Added a command to process a row from a job. --- app/celery/tasks.py | 4 +++- app/commands.py | 25 ++++++++++++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 47b7ff7d0..ded705142 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -153,15 +153,17 @@ def process_row(row, template, job, service, sender_id=None): if sender_id: task_kwargs['sender_id'] = sender_id + notification_id = create_uuid() send_fn.apply_async( ( str(service.id), - create_uuid(), + notification_id, encrypted, ), task_kwargs, queue=QueueNames.DATABASE if not service.research_mode else QueueNames.RESEARCH_MODE ) + return notification_id def __sending_limits_for_job_exceeded(service, job, job_id): diff --git a/app/commands.py b/app/commands.py index 8f83767b3..093339f46 100644 --- a/app/commands.py +++ b/app/commands.py @@ -9,6 +9,7 @@ import flask import itertools from click_datetime import Datetime as click_dt from flask import current_app, json +from notifications_utils.recipients import RecipientCSV from notifications_utils.template import SMSMessageTemplate from sqlalchemy.exc import IntegrityError from sqlalchemy.orm.exc import NoResultFound @@ -16,7 +17,7 @@ from notifications_utils.statsd_decorators import statsd from app import db, DATETIME_FORMAT, encryption from app.aws import s3 -from app.celery.tasks import record_daily_sorted_counts +from app.celery.tasks import record_daily_sorted_counts, get_template_class, process_row from app.celery.nightly_tasks import send_total_sent_notifications_to_performance_platform from app.celery.service_callback_tasks import send_delivery_status_to_service from app.celery.letters_pdf_tasks import create_letters_pdf @@ -28,6 +29,7 @@ from app.dao.fact_billing_dao import ( get_service_ids_that_need_billing_populated, update_fact_billing, ) +from app.dao.jobs_dao import dao_get_job_by_id from app.dao.organisation_dao import dao_get_organisation_by_email_address, dao_add_service_to_organisation from app.dao.provider_rates_dao import create_provider_rates as dao_create_provider_rates @@ -899,3 +901,24 @@ def fix_billable_units(): ) db.session.commit() print("End fix_billable_units") + + +@notify_command(name='process-row-from-job') +@click.option('-j', '--job_id', required=True, help='Job id') +@click.option('-n', '--job_row_number', type=int, required=True, help='Job id') +def process_row_from_job(job_id, job_row_number): + job = dao_get_job_by_id(job_id) + db_template = dao_get_template_by_id(job.template_id, job.template_version) + + TemplateClass = get_template_class(db_template.template_type) + template = TemplateClass(db_template.__dict__) + + for row in RecipientCSV( + s3.get_job_from_s3(str(job.service_id), str(job.id)), + template_type=template.template_type, + placeholders=template.placeholders + ).get_rows(): + if row.index == job_row_number: + notification_id = process_row(row, template, job, job.service) + current_app.logger.info("Process row {} for job {} created notification_id: {}".format( + job_row_number, job_id, notification_id))