Add job to queue as soon as it in created.

Added status to job.
This commit is contained in:
Adam Shimali
2016-02-02 14:58:25 +00:00
parent b5c662eca8
commit 0ade39e63f
5 changed files with 65 additions and 2 deletions

View File

@@ -1,7 +1,11 @@
import boto3
import json
from flask import ( from flask import (
Blueprint, Blueprint,
jsonify, jsonify,
request request,
current_app
) )
from sqlalchemy.exc import DataError from sqlalchemy.exc import DataError
@@ -46,6 +50,18 @@ def create_job(service_id):
return jsonify(result="error", message=errors), 400 return jsonify(result="error", message=errors), 400
try: try:
save_job(job) save_job(job)
_enqueue_job(job)
except Exception as e: except Exception as e:
return jsonify(result="error", message=str(e)), 500 return jsonify(result="error", message=str(e)), 500
return jsonify(data=job_schema.dump(job).data), 201 return jsonify(data=job_schema.dump(job).data), 201
def _enqueue_job(job):
aws_region = current_app.config['AWS_REGION']
queue_name = current_app.config['NOTIFY_JOB_QUEUE']
queue = boto3.resource('sqs', region_name=aws_region).create_queue(QueueName=queue_name)
job_json = json.dumps({'job_id': str(job.id), 'service_id': str(job.service.id)})
queue.send_message(MessageBody=job_json,
MessageAttributes={'job_id': {'StringValue': str(job.id), 'DataType': 'String'},
'service_id': {'StringValue': str(job.service.id), 'DataType': 'String'}})

View File

@@ -129,6 +129,9 @@ class Template(db.Model):
service = db.relationship('Service', backref=db.backref('templates', lazy='dynamic')) service = db.relationship('Service', backref=db.backref('templates', lazy='dynamic'))
JOB_STATUS_TYPES = ['pending', 'in progress', 'finished']
class Job(db.Model): class Job(db.Model):
__tablename__ = 'jobs' __tablename__ = 'jobs'
@@ -152,6 +155,7 @@ class Job(db.Model):
unique=False, unique=False,
nullable=True, nullable=True,
onupdate=datetime.datetime.now) onupdate=datetime.datetime.now)
status = db.Column(db.Enum(*JOB_STATUS_TYPES, name='job_status_types'), nullable=False, default='pending')
VERIFY_CODE_TYPES = ['email', 'sms'] VERIFY_CODE_TYPES = ['email', 'sms']

View File

@@ -15,6 +15,7 @@ class Config(object):
ADMIN_CLIENT_SECRET = None ADMIN_CLIENT_SECRET = None
AWS_REGION = 'eu-west-1' AWS_REGION = 'eu-west-1'
NOTIFY_JOB_QUEUE = os.getenv('NOTIFY_JOB_QUEUE', 'notify-jobs-queue')
class Development(Config): class Development(Config):

View File

@@ -0,0 +1,29 @@
"""empty message
Revision ID: 0012_add_status_to_job
Revises: 0011_uuid_service_id
Create Date: 2016-02-02 11:25:34.402864
"""
# revision identifiers, used by Alembic.
revision = '0012_add_status_to_job'
down_revision = '0011_uuid_service_id'
from alembic import op
import sqlalchemy as sa
def upgrade():
### commands auto generated by Alembic - please adjust! ###
job_status_types = sa.Enum('pending', 'in progress', 'finished', name='job_status_types')
job_status_types.create(op.get_bind())
op.add_column('jobs', sa.Column('status', job_status_types, nullable=False))
### end Alembic commands ###
def downgrade():
### commands auto generated by Alembic - please adjust! ###
op.drop_column('jobs', 'status')
op.execute('DROP TYPE job_status_types')
### end Alembic commands ###

View File

@@ -1,3 +1,5 @@
import boto3
import moto
import json import json
import uuid import uuid
from flask import url_for from flask import url_for
@@ -73,7 +75,8 @@ def test_get_job_by_id(notify_api, notify_db, notify_db_session,
assert resp_json['data']['id'] == job_id assert resp_json['data']['id'] == job_id
def test_post_job(notify_api, notify_db, notify_db_session, sample_template): @moto.mock_sqs
def test_create_job(notify_api, notify_db, notify_db_session, sample_template):
job_id = uuid.uuid4() job_id = uuid.uuid4()
template_id = sample_template.id template_id = sample_template.id
service_id = sample_template.service.id service_id = sample_template.service.id
@@ -88,6 +91,7 @@ def test_post_job(notify_api, notify_db, notify_db_session, sample_template):
'bucket_name': bucket_name, 'bucket_name': bucket_name,
'file_name': file_name, 'file_name': file_name,
} }
with notify_api.test_request_context(): with notify_api.test_request_context():
with notify_api.test_client() as client: with notify_api.test_client() as client:
path = url_for('job.create_job', service_id=service_id) path = url_for('job.create_job', service_id=service_id)
@@ -109,6 +113,15 @@ def test_post_job(notify_api, notify_db, notify_db_session, sample_template):
assert resp_json['data']['template'] == template_id assert resp_json['data']['template'] == template_id
assert resp_json['data']['original_file_name'] == original_file_name assert resp_json['data']['original_file_name'] == original_file_name
boto3.setup_default_session(region_name='eu-west-1')
q = boto3.resource('sqs').get_queue_by_name(QueueName='notify-jobs-queue')
messages = q.receive_messages()
assert len(messages) == 1
expected_message = json.loads(messages[0].body)
assert expected_message['job_id'] == str(job_id)
assert expected_message['service_id'] == str(service_id)
def _setup_jobs(notify_db, notify_db_session, template, number_of_jobs=5): def _setup_jobs(notify_db, notify_db_session, template, number_of_jobs=5):
for i in range(number_of_jobs): for i in range(number_of_jobs):