From 0ade39e63fd870d42c977bf1cc0101917c785c73 Mon Sep 17 00:00:00 2001 From: Adam Shimali Date: Tue, 2 Feb 2016 14:58:25 +0000 Subject: [PATCH] Add job to queue as soon as it in created. Added status to job. --- app/job/rest.py | 18 +++++++++++- app/models.py | 4 +++ config.py | 1 + migrations/versions/0012_add_status_to_job.py | 29 +++++++++++++++++++ tests/app/job/test_job_rest.py | 15 +++++++++- 5 files changed, 65 insertions(+), 2 deletions(-) create mode 100644 migrations/versions/0012_add_status_to_job.py diff --git a/app/job/rest.py b/app/job/rest.py index a1dc1f376..975b2eb9e 100644 --- a/app/job/rest.py +++ b/app/job/rest.py @@ -1,7 +1,11 @@ +import boto3 +import json + from flask import ( Blueprint, jsonify, - request + request, + current_app ) from sqlalchemy.exc import DataError @@ -46,6 +50,18 @@ def create_job(service_id): return jsonify(result="error", message=errors), 400 try: save_job(job) + _enqueue_job(job) except Exception as e: return jsonify(result="error", message=str(e)), 500 return jsonify(data=job_schema.dump(job).data), 201 + + +def _enqueue_job(job): + aws_region = current_app.config['AWS_REGION'] + queue_name = current_app.config['NOTIFY_JOB_QUEUE'] + + queue = boto3.resource('sqs', region_name=aws_region).create_queue(QueueName=queue_name) + job_json = json.dumps({'job_id': str(job.id), 'service_id': str(job.service.id)}) + queue.send_message(MessageBody=job_json, + MessageAttributes={'job_id': {'StringValue': str(job.id), 'DataType': 'String'}, + 'service_id': {'StringValue': str(job.service.id), 'DataType': 'String'}}) diff --git a/app/models.py b/app/models.py index 08115ce96..dc260836b 100644 --- a/app/models.py +++ b/app/models.py @@ -129,6 +129,9 @@ class Template(db.Model): service = db.relationship('Service', backref=db.backref('templates', lazy='dynamic')) +JOB_STATUS_TYPES = ['pending', 'in progress', 'finished'] + + class Job(db.Model): __tablename__ = 'jobs' @@ -152,6 +155,7 @@ class Job(db.Model): unique=False, nullable=True, onupdate=datetime.datetime.now) + status = db.Column(db.Enum(*JOB_STATUS_TYPES, name='job_status_types'), nullable=False, default='pending') VERIFY_CODE_TYPES = ['email', 'sms'] diff --git a/config.py b/config.py index 1dd256571..14d595348 100644 --- a/config.py +++ b/config.py @@ -15,6 +15,7 @@ class Config(object): ADMIN_CLIENT_SECRET = None AWS_REGION = 'eu-west-1' + NOTIFY_JOB_QUEUE = os.getenv('NOTIFY_JOB_QUEUE', 'notify-jobs-queue') class Development(Config): diff --git a/migrations/versions/0012_add_status_to_job.py b/migrations/versions/0012_add_status_to_job.py new file mode 100644 index 000000000..9b22f3caa --- /dev/null +++ b/migrations/versions/0012_add_status_to_job.py @@ -0,0 +1,29 @@ +"""empty message + +Revision ID: 0012_add_status_to_job +Revises: 0011_uuid_service_id +Create Date: 2016-02-02 11:25:34.402864 + +""" + +# revision identifiers, used by Alembic. +revision = '0012_add_status_to_job' +down_revision = '0011_uuid_service_id' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + ### commands auto generated by Alembic - please adjust! ### + job_status_types = sa.Enum('pending', 'in progress', 'finished', name='job_status_types') + job_status_types.create(op.get_bind()) + op.add_column('jobs', sa.Column('status', job_status_types, nullable=False)) + ### end Alembic commands ### + + +def downgrade(): + ### commands auto generated by Alembic - please adjust! ### + op.drop_column('jobs', 'status') + op.execute('DROP TYPE job_status_types') + ### end Alembic commands ### diff --git a/tests/app/job/test_job_rest.py b/tests/app/job/test_job_rest.py index b9b7eb20e..b8af8f7e9 100644 --- a/tests/app/job/test_job_rest.py +++ b/tests/app/job/test_job_rest.py @@ -1,3 +1,5 @@ +import boto3 +import moto import json import uuid from flask import url_for @@ -73,7 +75,8 @@ def test_get_job_by_id(notify_api, notify_db, notify_db_session, assert resp_json['data']['id'] == job_id -def test_post_job(notify_api, notify_db, notify_db_session, sample_template): +@moto.mock_sqs +def test_create_job(notify_api, notify_db, notify_db_session, sample_template): job_id = uuid.uuid4() template_id = sample_template.id service_id = sample_template.service.id @@ -88,6 +91,7 @@ def test_post_job(notify_api, notify_db, notify_db_session, sample_template): 'bucket_name': bucket_name, 'file_name': file_name, } + with notify_api.test_request_context(): with notify_api.test_client() as client: path = url_for('job.create_job', service_id=service_id) @@ -109,6 +113,15 @@ def test_post_job(notify_api, notify_db, notify_db_session, sample_template): assert resp_json['data']['template'] == template_id assert resp_json['data']['original_file_name'] == original_file_name + boto3.setup_default_session(region_name='eu-west-1') + q = boto3.resource('sqs').get_queue_by_name(QueueName='notify-jobs-queue') + messages = q.receive_messages() + assert len(messages) == 1 + + expected_message = json.loads(messages[0].body) + assert expected_message['job_id'] == str(job_id) + assert expected_message['service_id'] == str(service_id) + def _setup_jobs(notify_db, notify_db_session, template, number_of_jobs=5): for i in range(number_of_jobs):