mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-01 07:35:34 -05:00
Move job processing into celery
- brings boto S3 into new AWS folder - CSV processing utils method Rejigs the jobs rest endpoint - removes some now unused endpoints, Calls to the task with the job, job processing in task, delegating SMS calls to the sms task
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import uuid
|
||||
import os
|
||||
import re
|
||||
from flask import request, url_for
|
||||
@@ -50,7 +51,7 @@ def create_app():
|
||||
application.register_blueprint(user_blueprint, url_prefix='/user')
|
||||
application.register_blueprint(template_blueprint)
|
||||
application.register_blueprint(status_blueprint, url_prefix='/status')
|
||||
application.register_blueprint(notifications_blueprint, url_prefix='/notifications')
|
||||
application.register_blueprint(notifications_blueprint)
|
||||
application.register_blueprint(job_blueprint)
|
||||
|
||||
return application
|
||||
@@ -99,3 +100,7 @@ def email_safe(string):
|
||||
character.lower() if character.isalnum() or character == "." else ""
|
||||
for character in re.sub("\s+", ".", string.strip())
|
||||
])
|
||||
|
||||
|
||||
def create_uuid():
|
||||
return str(uuid.uuid4())
|
||||
|
||||
7
app/aws/s3.py
Normal file
7
app/aws/s3.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from boto3 import resource
|
||||
|
||||
|
||||
def get_job_from_s3(bucket_name, job_id):
|
||||
s3 = resource('s3')
|
||||
key = s3.Object(bucket_name, '{}.csv'.format(job_id))
|
||||
return key.get()['Body'].read().decode('utf-8')
|
||||
@@ -1,11 +1,42 @@
|
||||
from app import create_uuid
|
||||
from app import notify_celery, encryption, firetext_client, aws_ses_client
|
||||
from app.clients.email.aws_ses import AwsSesClientException
|
||||
from app.clients.sms.firetext import FiretextClientException
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
from app.dao.notifications_dao import save_notification
|
||||
from app.dao.jobs_dao import dao_update_job, dao_get_job_by_id
|
||||
from app.models import Notification
|
||||
from flask import current_app
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from app.aws import s3
|
||||
from app.csv import get_mobile_numbers_from_csv
|
||||
|
||||
|
||||
@notify_celery.task(name="process-job")
|
||||
def process_job(job_id):
|
||||
job = dao_get_job_by_id(job_id)
|
||||
job.status = 'in progress'
|
||||
dao_update_job(job)
|
||||
|
||||
file = s3.get_job_from_s3(job.bucket_name, job_id)
|
||||
mobile_numbers = get_mobile_numbers_from_csv(file)
|
||||
|
||||
for mobile_number in mobile_numbers:
|
||||
notification = encryption.encrypt({
|
||||
'template': job.template_id,
|
||||
'job': str(job.id),
|
||||
'to': mobile_number
|
||||
})
|
||||
|
||||
send_sms.apply_async((
|
||||
str(job.service_id),
|
||||
str(create_uuid()),
|
||||
notification),
|
||||
queue='sms'
|
||||
)
|
||||
|
||||
job.status = 'finished'
|
||||
dao_update_job(job)
|
||||
|
||||
|
||||
@notify_celery.task(name="send-sms")
|
||||
|
||||
12
app/csv.py
Normal file
12
app/csv.py
Normal file
@@ -0,0 +1,12 @@
|
||||
import csv
|
||||
|
||||
|
||||
def get_mobile_numbers_from_csv(file_data):
|
||||
numbers = []
|
||||
reader = csv.DictReader(
|
||||
file_data.splitlines(),
|
||||
lineterminator='\n',
|
||||
quoting=csv.QUOTE_NONE)
|
||||
for i, row in enumerate(reader):
|
||||
numbers.append(row['phone'].replace(' ', ''))
|
||||
return numbers
|
||||
@@ -2,24 +2,23 @@ from app import db
|
||||
from app.models import Job
|
||||
|
||||
|
||||
def save_job(job, update_dict={}):
|
||||
if update_dict:
|
||||
update_dict.pop('id', None)
|
||||
update_dict.pop('service', None)
|
||||
update_dict.pop('template', None)
|
||||
Job.query.filter_by(id=job.id).update(update_dict)
|
||||
else:
|
||||
db.session.add(job)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def get_job(service_id, job_id):
|
||||
def dao_get_job_by_service_id_and_job_id(service_id, job_id):
|
||||
return Job.query.filter_by(service_id=service_id, id=job_id).first()
|
||||
|
||||
|
||||
def get_jobs_by_service(service_id):
|
||||
def dao_get_jobs_by_service_id(service_id):
|
||||
return Job.query.filter_by(service_id=service_id).all()
|
||||
|
||||
|
||||
def _get_jobs():
|
||||
return Job.query.all()
|
||||
def dao_get_job_by_id(job_id):
|
||||
return Job.query.filter_by(id=job_id).first()
|
||||
|
||||
|
||||
def dao_create_job(job):
|
||||
db.session.add(job)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def dao_update_job(job):
|
||||
db.session.add(job)
|
||||
db.session.commit()
|
||||
|
||||
@@ -43,7 +43,7 @@ def register_errors(blueprint):
|
||||
@blueprint.app_errorhandler(DataError)
|
||||
def no_result_found(e):
|
||||
current_app.logger.error(e)
|
||||
return jsonify(error="No result found"), 404
|
||||
return jsonify(result="error", message="No result found"), 404
|
||||
|
||||
@blueprint.app_errorhandler(SQLAlchemyError)
|
||||
def db_error(e):
|
||||
|
||||
159
app/job/rest.py
159
app/job/rest.py
@@ -1,148 +1,81 @@
|
||||
import boto3
|
||||
import json
|
||||
|
||||
from flask import (
|
||||
Blueprint,
|
||||
jsonify,
|
||||
request,
|
||||
current_app
|
||||
request
|
||||
)
|
||||
|
||||
from sqlalchemy.exc import DataError
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from app.dao.jobs_dao import (
|
||||
save_job,
|
||||
get_job,
|
||||
get_jobs_by_service
|
||||
dao_create_job,
|
||||
dao_get_job_by_service_id_and_job_id,
|
||||
dao_get_jobs_by_service_id,
|
||||
dao_update_job
|
||||
)
|
||||
|
||||
from app.dao import notifications_dao
|
||||
from app.dao.services_dao import (
|
||||
dao_fetch_service_by_id
|
||||
)
|
||||
|
||||
from app.schemas import (
|
||||
job_schema,
|
||||
jobs_schema,
|
||||
job_schema_load_json,
|
||||
notification_status_schema,
|
||||
notifications_status_schema,
|
||||
notification_status_schema_load_json
|
||||
jobs_schema
|
||||
)
|
||||
|
||||
from app.celery.tasks import process_job
|
||||
|
||||
job = Blueprint('job', __name__, url_prefix='/service/<service_id>/job')
|
||||
|
||||
|
||||
from app.errors import register_errors
|
||||
|
||||
register_errors(job)
|
||||
|
||||
|
||||
@job.route('/<job_id>', methods=['GET'])
|
||||
def get_job_by_service_and_job_id(service_id, job_id):
|
||||
job = dao_get_job_by_service_id_and_job_id(service_id, job_id)
|
||||
if not job:
|
||||
return jsonify(result="error", message="Job {} not found for service {}".format(job_id, service_id)), 404
|
||||
data, errors = job_schema.dump(job)
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
@job.route('', methods=['GET'])
|
||||
def get_job_for_service(service_id, job_id=None):
|
||||
if job_id:
|
||||
try:
|
||||
job = get_job(service_id, job_id)
|
||||
if not job:
|
||||
return jsonify(result="error", message="Job not found"), 404
|
||||
data, errors = job_schema.dump(job)
|
||||
return jsonify(data=data)
|
||||
except DataError:
|
||||
return jsonify(result="error", message="Invalid job id"), 400
|
||||
else:
|
||||
jobs = get_jobs_by_service(service_id)
|
||||
data, errors = jobs_schema.dump(jobs)
|
||||
return jsonify(data=data)
|
||||
def get_jobs_by_service(service_id):
|
||||
jobs = dao_get_jobs_by_service_id(service_id)
|
||||
data, errors = jobs_schema.dump(jobs)
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
@job.route('', methods=['POST'])
|
||||
def create_job(service_id):
|
||||
job, errors = job_schema.load(request.get_json())
|
||||
|
||||
service = dao_fetch_service_by_id(service_id)
|
||||
if not service:
|
||||
return jsonify(result="error", message="Service {} not found".format(service_id)), 404
|
||||
|
||||
data = request.get_json()
|
||||
data.update({
|
||||
"service": service_id
|
||||
})
|
||||
job, errors = job_schema.load(data)
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
|
||||
save_job(job)
|
||||
_enqueue_job(job)
|
||||
|
||||
dao_create_job(job)
|
||||
process_job.apply_async([str(job.id)], queue="process-job")
|
||||
return jsonify(data=job_schema.dump(job).data), 201
|
||||
|
||||
|
||||
@job.route('/<job_id>', methods=['PUT'])
|
||||
@job.route('/<job_id>', methods=['POST'])
|
||||
def update_job(service_id, job_id):
|
||||
fetched_job = dao_get_job_by_service_id_and_job_id(service_id, job_id)
|
||||
if not fetched_job:
|
||||
return jsonify(result="error", message="Job {} not found for service {}".format(job_id, service_id)), 404
|
||||
|
||||
job = get_job(service_id, job_id)
|
||||
update_dict, errors = job_schema_load_json.load(request.get_json())
|
||||
current_data = dict(job_schema.dump(fetched_job).data.items())
|
||||
current_data.update(request.get_json())
|
||||
|
||||
update_dict, errors = job_schema.load(current_data)
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
|
||||
save_job(job, update_dict=update_dict)
|
||||
|
||||
return jsonify(data=job_schema.dump(job).data), 200
|
||||
|
||||
|
||||
@job.route('/<job_id>/notification', methods=['POST'])
|
||||
def create_notification_for_job(service_id, job_id):
|
||||
|
||||
# TODO assert service_id == payload service id
|
||||
# and same for job id
|
||||
notification, errors = notification_status_schema.load(request.get_json())
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
|
||||
notifications_dao.save_notification(notification)
|
||||
|
||||
return jsonify(data=notification_status_schema.dump(notification).data), 201
|
||||
|
||||
|
||||
@job.route('/<job_id>/notification', methods=['GET'])
|
||||
@job.route('/<job_id>/notification/<notification_id>')
|
||||
def get_notification_for_job(service_id, job_id, notification_id=None):
|
||||
if notification_id:
|
||||
try:
|
||||
notification = notifications_dao.get_notification_for_job(service_id, job_id, notification_id)
|
||||
data, errors = notification_status_schema.dump(notification)
|
||||
return jsonify(data=data)
|
||||
except DataError:
|
||||
return jsonify(result="error", message="Invalid notification id"), 400
|
||||
except NoResultFound:
|
||||
return jsonify(result="error", message="Notification not found"), 404
|
||||
else:
|
||||
notifications = notifications_dao.get_notifications_for_job(service_id, job_id)
|
||||
data, errors = notifications_status_schema.dump(notifications)
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
@job.route('/<job_id>/notification/<notification_id>', methods=['PUT'])
|
||||
def update_notification_for_job(service_id, job_id, notification_id):
|
||||
|
||||
notification = notifications_dao.get_notification_for_job(service_id, job_id, notification_id)
|
||||
update_dict, errors = notification_status_schema_load_json.load(request.get_json())
|
||||
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
|
||||
notifications_dao.save_notification(notification, update_dict=update_dict)
|
||||
|
||||
return jsonify(data=job_schema.dump(notification).data), 200
|
||||
|
||||
|
||||
def _enqueue_job(job):
|
||||
aws_region = current_app.config['AWS_REGION']
|
||||
queue_name = current_app.config['NOTIFY_JOB_QUEUE']
|
||||
|
||||
queue = boto3.resource('sqs', region_name=aws_region).create_queue(QueueName=queue_name)
|
||||
data = {
|
||||
'id': str(job.id),
|
||||
'service': str(job.service.id),
|
||||
'template': job.template.id,
|
||||
'bucket_name': job.bucket_name,
|
||||
'file_name': job.file_name,
|
||||
'original_file_name': job.original_file_name
|
||||
}
|
||||
job_json = json.dumps(data)
|
||||
queue.send_message(MessageBody=job_json,
|
||||
MessageAttributes={'id': {'StringValue': str(job.id), 'DataType': 'String'},
|
||||
'service': {'StringValue': str(job.service.id), 'DataType': 'String'},
|
||||
'template': {'StringValue': str(job.template.id), 'DataType': 'String'},
|
||||
'bucket_name': {'StringValue': job.bucket_name, 'DataType': 'String'},
|
||||
'file_name': {'StringValue': job.file_name, 'DataType': 'String'},
|
||||
'original_file_name': {'StringValue': job.original_file_name,
|
||||
'DataType': 'String'}})
|
||||
dao_update_job(update_dict)
|
||||
return jsonify(data=job_schema.dump(update_dict).data), 200
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import uuid
|
||||
|
||||
from flask import (
|
||||
Blueprint,
|
||||
jsonify,
|
||||
@@ -7,7 +5,7 @@ from flask import (
|
||||
current_app
|
||||
)
|
||||
|
||||
from app import api_user, encryption
|
||||
from app import api_user, encryption, create_uuid
|
||||
from app.dao import (
|
||||
templates_dao,
|
||||
services_dao,
|
||||
@@ -34,11 +32,7 @@ SMS_NOTIFICATION = 'sms'
|
||||
EMAIL_NOTIFICATION = 'email'
|
||||
|
||||
|
||||
def create_notification_id():
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
@notifications.route('/<string:notification_id>', methods=['GET'])
|
||||
@notifications.route('/notifications/<string:notification_id>', methods=['GET'])
|
||||
def get_notifications(notification_id):
|
||||
try:
|
||||
notification = notifications_dao.get_notification(api_user['client'], notification_id)
|
||||
@@ -47,22 +41,22 @@ def get_notifications(notification_id):
|
||||
return jsonify(result="error", message="not found"), 404
|
||||
|
||||
|
||||
@notifications.route('/sms', methods=['POST'])
|
||||
@notifications.route('/notifications/sms', methods=['POST'])
|
||||
def create_sms_notification():
|
||||
return send_notification(notification_type=SMS_NOTIFICATION, expects_job=False)
|
||||
|
||||
|
||||
@notifications.route('/sms/service/<service_id>', methods=['POST'])
|
||||
@notifications.route('/notifications/sms/service/<service_id>', methods=['POST'])
|
||||
def create_sms_for_job(service_id):
|
||||
return send_notification(service_id=service_id, notification_type=SMS_NOTIFICATION, expects_job=True)
|
||||
|
||||
|
||||
@notifications.route('/email', methods=['POST'])
|
||||
@notifications.route('/notifications/email', methods=['POST'])
|
||||
def create_email_notification():
|
||||
return send_notification(notification_type=EMAIL_NOTIFICATION, expects_job=False)
|
||||
|
||||
|
||||
@notifications.route('/email/service/<service_id>', methods=['POST'])
|
||||
@notifications.route('/notifications/email/service/<service_id>', methods=['POST'])
|
||||
def create_email_notification_for_job(service_id):
|
||||
return send_notification(service_id=service_id, notification_type=EMAIL_NOTIFICATION, expects_job=True)
|
||||
|
||||
@@ -98,7 +92,7 @@ def send_notification(notification_type, service_id=None, expects_job=False):
|
||||
), 400
|
||||
|
||||
if expects_job:
|
||||
job = jobs_dao.get_job(service_id, notification['job'])
|
||||
job = jobs_dao.dao_get_job_by_service_id_and_job_id(service_id, notification['job'])
|
||||
|
||||
if not job:
|
||||
return jsonify(result="error", message={'job': ['Job {} not found'.format(notification['job'])]}), 400
|
||||
@@ -115,7 +109,7 @@ def send_notification(notification_type, service_id=None, expects_job=False):
|
||||
return jsonify(
|
||||
result="error", message={'to': ['Email address not permitted for restricted service']}), 400
|
||||
|
||||
notification_id = create_notification_id()
|
||||
notification_id = create_uuid()
|
||||
|
||||
if notification_type is SMS_NOTIFICATION:
|
||||
send_sms.apply_async((
|
||||
|
||||
Reference in New Issue
Block a user