Added a command to process a row from a job.

This commit is contained in:
Rebecca Law
2019-09-26 14:19:09 +01:00
parent 3f35c634cd
commit 44b7b36acd
2 changed files with 27 additions and 2 deletions

View File

@@ -153,15 +153,17 @@ def process_row(row, template, job, service, sender_id=None):
if sender_id:
task_kwargs['sender_id'] = sender_id
notification_id = create_uuid()
send_fn.apply_async(
(
str(service.id),
create_uuid(),
notification_id,
encrypted,
),
task_kwargs,
queue=QueueNames.DATABASE if not service.research_mode else QueueNames.RESEARCH_MODE
)
return notification_id
def __sending_limits_for_job_exceeded(service, job, job_id):

View File

@@ -9,6 +9,7 @@ import flask
import itertools
from click_datetime import Datetime as click_dt
from flask import current_app, json
from notifications_utils.recipients import RecipientCSV
from notifications_utils.template import SMSMessageTemplate
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import NoResultFound
@@ -16,7 +17,7 @@ from notifications_utils.statsd_decorators import statsd
from app import db, DATETIME_FORMAT, encryption
from app.aws import s3
from app.celery.tasks import record_daily_sorted_counts
from app.celery.tasks import record_daily_sorted_counts, get_template_class, process_row
from app.celery.nightly_tasks import send_total_sent_notifications_to_performance_platform
from app.celery.service_callback_tasks import send_delivery_status_to_service
from app.celery.letters_pdf_tasks import create_letters_pdf
@@ -28,6 +29,7 @@ from app.dao.fact_billing_dao import (
get_service_ids_that_need_billing_populated,
update_fact_billing,
)
from app.dao.jobs_dao import dao_get_job_by_id
from app.dao.organisation_dao import dao_get_organisation_by_email_address, dao_add_service_to_organisation
from app.dao.provider_rates_dao import create_provider_rates as dao_create_provider_rates
@@ -899,3 +901,24 @@ def fix_billable_units():
)
db.session.commit()
print("End fix_billable_units")
@notify_command(name='process-row-from-job')
@click.option('-j', '--job_id', required=True, help='Job id')
@click.option('-n', '--job_row_number', type=int, required=True, help='Job id')
def process_row_from_job(job_id, job_row_number):
job = dao_get_job_by_id(job_id)
db_template = dao_get_template_by_id(job.template_id, job.template_version)
TemplateClass = get_template_class(db_template.template_type)
template = TemplateClass(db_template.__dict__)
for row in RecipientCSV(
s3.get_job_from_s3(str(job.service_id), str(job.id)),
template_type=template.template_type,
placeholders=template.placeholders
).get_rows():
if row.index == job_row_number:
notification_id = process_row(row, template, job, job.service)
current_app.logger.info("Process row {} for job {} created notification_id: {}".format(
job_row_number, job_id, notification_id))