mirror of
https://github.com/GSA/notifications-api.git
synced 2026-05-24 10:41:35 -04:00
Merge branch 'master' into remove-contested-writes
This commit is contained in:
@@ -1,9 +1,24 @@
|
||||
from datetime import date, timedelta
|
||||
|
||||
from sqlalchemy import desc, cast, Date as sql_date
|
||||
from app import db
|
||||
from app.dao import days_ago
|
||||
from app.models import Job
|
||||
from app.models import Job, NotificationHistory
|
||||
from app.statsd_decorators import statsd
|
||||
from sqlalchemy import func, asc
|
||||
|
||||
|
||||
@statsd(namespace="dao")
|
||||
def dao_get_notification_outcomes_for_job(service_id, job_id):
|
||||
query = db.session.query(
|
||||
func.count(NotificationHistory.status).label('count'),
|
||||
NotificationHistory.status.label('status')
|
||||
)
|
||||
|
||||
return query \
|
||||
.filter(NotificationHistory.service_id == service_id) \
|
||||
.filter(NotificationHistory.job_id == job_id)\
|
||||
.group_by(NotificationHistory.status) \
|
||||
.order_by(asc(NotificationHistory.status)) \
|
||||
.all()
|
||||
|
||||
|
||||
def dao_get_job_by_service_id_and_job_id(service_id, job_id):
|
||||
|
||||
@@ -8,7 +8,8 @@ from flask import (
|
||||
from app.dao.jobs_dao import (
|
||||
dao_create_job,
|
||||
dao_get_job_by_service_id_and_job_id,
|
||||
dao_get_jobs_by_service_id
|
||||
dao_get_jobs_by_service_id,
|
||||
dao_get_notification_outcomes_for_job
|
||||
)
|
||||
|
||||
from app.dao.services_dao import (
|
||||
@@ -42,7 +43,11 @@ register_errors(job)
|
||||
@job.route('/<job_id>', methods=['GET'])
|
||||
def get_job_by_service_and_job_id(service_id, job_id):
|
||||
job = dao_get_job_by_service_id_and_job_id(service_id, job_id)
|
||||
statistics = dao_get_notification_outcomes_for_job(service_id, job_id)
|
||||
data = job_schema.dump(job).data
|
||||
|
||||
data['statistics'] = [{'status': statistic[1], 'count': statistic[0]} for statistic in statistics]
|
||||
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
|
||||
@@ -303,6 +303,19 @@ class ProviderDetails(db.Model):
|
||||
JOB_STATUS_TYPES = ['pending', 'in progress', 'finished', 'sending limits exceeded']
|
||||
|
||||
|
||||
JOB_STATUS_PENDING = 'pending'
|
||||
JOB_STATUS_IN_PROGRESS = 'in progress'
|
||||
JOB_STATUS_FINISHED = 'finished'
|
||||
JOB_STATUS_SENDING_LIMITS_EXCEEDED = 'sending limits exceeded'
|
||||
JOB_STATUS_SCHEDULED = 'scheduled'
|
||||
|
||||
|
||||
class JobStatusTypes(db.Model):
|
||||
__tablename__ = 'job_status'
|
||||
|
||||
name = db.Column(db.String(255), primary_key=True)
|
||||
|
||||
|
||||
class Job(db.Model):
|
||||
__tablename__ = 'jobs'
|
||||
|
||||
@@ -343,6 +356,13 @@ class Job(db.Model):
|
||||
nullable=True)
|
||||
created_by = db.relationship('User')
|
||||
created_by_id = db.Column(UUID(as_uuid=True), db.ForeignKey('users.id'), index=True, nullable=False)
|
||||
scheduled_for = db.Column(
|
||||
db.DateTime,
|
||||
index=True,
|
||||
unique=False,
|
||||
nullable=True)
|
||||
job_status = db.Column(
|
||||
db.String(255), db.ForeignKey('job_status.name'), index=True, nullable=True)
|
||||
|
||||
|
||||
VERIFY_CODE_TYPES = [EMAIL_TYPE, SMS_TYPE]
|
||||
|
||||
@@ -210,7 +210,11 @@ class JobSchema(BaseSchema):
|
||||
|
||||
class Meta:
|
||||
model = models.Job
|
||||
exclude = ('notifications',)
|
||||
exclude = (
|
||||
'notifications',
|
||||
'notifications_sent',
|
||||
'notifications_delivered',
|
||||
'notifications_failed')
|
||||
strict = True
|
||||
|
||||
|
||||
|
||||
@@ -20,59 +20,68 @@ from app.models import Job, Template, NotificationHistory
|
||||
|
||||
|
||||
def upgrade():
|
||||
session = Session(bind=op.get_bind())
|
||||
|
||||
go_live = datetime.datetime.strptime('2016-05-18', '%Y-%m-%d')
|
||||
notifications_history_start_date = datetime.datetime.strptime('2016-06-26 23:21:55', '%Y-%m-%d %H:%M:%S')
|
||||
jobs = session.query(Job).join(Template).filter(Job.service_id == '95316ff0-e555-462d-a6e7-95d26fbfd091',
|
||||
Job.created_at >= go_live,
|
||||
Job.created_at < notifications_history_start_date).all()
|
||||
|
||||
for job in jobs:
|
||||
for i in range(0, job.notifications_delivered):
|
||||
notification = NotificationHistory(id=uuid.uuid4(),
|
||||
job_id=job.id,
|
||||
service_id=job.service_id,
|
||||
template_id=job.template.id,
|
||||
template_version=job.template_version,
|
||||
key_type='normal',
|
||||
content_char_count=len(job.template.content),
|
||||
notification_type=job.template.template_type,
|
||||
created_at=job.created_at,
|
||||
sent_at=job.processing_finished,
|
||||
sent_by='ses' if job.template.template_type == 'email' else 'mmg',
|
||||
status='delivered')
|
||||
|
||||
session.add(notification)
|
||||
|
||||
for i in range(0, job.notifications_failed):
|
||||
notification = NotificationHistory(id=uuid.uuid4(),
|
||||
job_id=job.id,
|
||||
service_id=job.service_id,
|
||||
template_id=job.template.id,
|
||||
template_version=job.template_version,
|
||||
key_type='normal',
|
||||
content_char_count=len(job.template.content),
|
||||
notification_type=job.template.template_type,
|
||||
created_at=job.created_at,
|
||||
sent_at=job.processing_finished,
|
||||
sent_by='ses' if job.template.template_type == 'email' else 'mmg',
|
||||
status='permanent-failure')
|
||||
session.add(notification)
|
||||
session.commit()
|
||||
#
|
||||
# REMOVED
|
||||
# This script has been applied and doesn't need to be re-applied
|
||||
# note that by referencing the model objects in migration files, any subsequent alteration of the model and thus
|
||||
# the database causes all previous migration scripts to fail as the model and DB will be inconsistent in this
|
||||
# past state.
|
||||
#
|
||||
# session = Session(bind=op.get_bind())
|
||||
#
|
||||
# go_live = datetime.datetime.strptime('2016-05-18', '%Y-%m-%d')
|
||||
# notifications_history_start_date = datetime.datetime.strptime('2016-06-26 23:21:55', '%Y-%m-%d %H:%M:%S')
|
||||
# jobs = session.query(Job).join(Template).filter(Job.service_id == '95316ff0-e555-462d-a6e7-95d26fbfd091',
|
||||
# Job.created_at >= go_live,
|
||||
# Job.created_at < notifications_history_start_date).all()
|
||||
#
|
||||
# for job in jobs:
|
||||
# for i in range(0, job.notifications_delivered):
|
||||
# notification = NotificationHistory(id=uuid.uuid4(),
|
||||
# job_id=job.id,
|
||||
# service_id=job.service_id,
|
||||
# template_id=job.template.id,
|
||||
# template_version=job.template_version,
|
||||
# key_type='normal',
|
||||
# content_char_count=len(job.template.content),
|
||||
# notification_type=job.template.template_type,
|
||||
# created_at=job.created_at,
|
||||
# sent_at=job.processing_finished,
|
||||
# sent_by='ses' if job.template.template_type == 'email' else 'mmg',
|
||||
# status='delivered')
|
||||
#
|
||||
# session.add(notification)
|
||||
#
|
||||
# for i in range(0, job.notifications_failed):
|
||||
# notification = NotificationHistory(id=uuid.uuid4(),
|
||||
# job_id=job.id,
|
||||
# service_id=job.service_id,
|
||||
# template_id=job.template.id,
|
||||
# template_version=job.template_version,
|
||||
# key_type='normal',
|
||||
# content_char_count=len(job.template.content),
|
||||
# notification_type=job.template.template_type,
|
||||
# created_at=job.created_at,
|
||||
# sent_at=job.processing_finished,
|
||||
# sent_by='ses' if job.template.template_type == 'email' else 'mmg',
|
||||
# status='permanent-failure')
|
||||
# session.add(notification)
|
||||
# session.commit()
|
||||
pass
|
||||
|
||||
|
||||
def downgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
session = Session(bind=op.get_bind())
|
||||
|
||||
go_live = datetime.datetime.strptime('2016-05-18', '%Y-%m-%d')
|
||||
notifications_history_start_date = datetime.datetime.strptime('2016-06-26 23:21:55', '%Y-%m-%d %H:%M:%S')
|
||||
|
||||
session.query(NotificationHistory).filter(
|
||||
NotificationHistory.created_at >= go_live,
|
||||
NotificationHistory.service_id == '95316ff0-e555-462d-a6e7-95d26fbfd091',
|
||||
NotificationHistory.created_at < notifications_history_start_date).delete()
|
||||
|
||||
session.commit()
|
||||
### end Alembic commands ###
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
# session = Session(bind=op.get_bind())
|
||||
#
|
||||
# go_live = datetime.datetime.strptime('2016-05-18', '%Y-%m-%d')
|
||||
# notifications_history_start_date = datetime.datetime.strptime('2016-06-26 23:21:55', '%Y-%m-%d %H:%M:%S')
|
||||
#
|
||||
# session.query(NotificationHistory).filter(
|
||||
# NotificationHistory.created_at >= go_live,
|
||||
# NotificationHistory.service_id == '95316ff0-e555-462d-a6e7-95d26fbfd091',
|
||||
# NotificationHistory.created_at < notifications_history_start_date).delete()
|
||||
#
|
||||
# session.commit()
|
||||
# ### end Alembic commands ###
|
||||
pass
|
||||
|
||||
41
migrations/versions/0048_job_scheduled_time.py
Normal file
41
migrations/versions/0048_job_scheduled_time.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 0048_job_scheduled_time
|
||||
Revises: 0047_ukvi_spelling
|
||||
Create Date: 2016-08-24 13:21:51.744526
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0048_job_scheduled_time'
|
||||
down_revision = '0047_ukvi_spelling'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table('job_status',
|
||||
sa.Column('name', sa.String(length=255), nullable=False),
|
||||
sa.PrimaryKeyConstraint('name')
|
||||
)
|
||||
op.add_column('jobs', sa.Column('job_status', sa.String(length=255), nullable=True))
|
||||
op.add_column('jobs', sa.Column('scheduled_for', sa.DateTime(), nullable=True))
|
||||
op.create_index(op.f('ix_jobs_job_status'), 'jobs', ['job_status'], unique=False)
|
||||
op.create_index(op.f('ix_jobs_scheduled_for'), 'jobs', ['scheduled_for'], unique=False)
|
||||
op.create_foreign_key(None, 'jobs', 'job_status', ['job_status'], ['name'])
|
||||
|
||||
op.execute("insert into job_status values ('pending')")
|
||||
op.execute("insert into job_status values ('in progress')")
|
||||
op.execute("insert into job_status values ('finished')")
|
||||
op.execute("insert into job_status values ('sending limits exceeded')")
|
||||
op.execute("insert into job_status values ('scheduled')")
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_constraint('jobs_job_status_fkey', 'jobs', type_='foreignkey')
|
||||
op.drop_index(op.f('ix_jobs_scheduled_for'), table_name='jobs')
|
||||
op.drop_index(op.f('ix_jobs_job_status'), table_name='jobs')
|
||||
op.drop_column('jobs', 'scheduled_for')
|
||||
op.drop_column('jobs', 'job_status')
|
||||
op.drop_table('job_status')
|
||||
@@ -5,10 +5,95 @@ from app.dao.jobs_dao import (
|
||||
dao_get_job_by_service_id_and_job_id,
|
||||
dao_create_job,
|
||||
dao_update_job,
|
||||
dao_get_jobs_by_service_id
|
||||
)
|
||||
dao_get_jobs_by_service_id,
|
||||
dao_get_notification_outcomes_for_job)
|
||||
|
||||
from app.models import Job
|
||||
from tests.app.conftest import sample_notification, sample_job, sample_service
|
||||
|
||||
|
||||
def test_should_have_decorated_notifications_dao_functions():
|
||||
assert dao_get_notification_outcomes_for_job.__wrapped__.__name__ == 'dao_get_notification_outcomes_for_job' # noqa
|
||||
|
||||
|
||||
def test_should_get_all_statuses_for_notifications_associated_with_job(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service,
|
||||
sample_job):
|
||||
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='created')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='sending')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='delivered')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='pending')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='failed')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='technical-failure') # noqa
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='temporary-failure') # noqa
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='permanent-failure') # noqa
|
||||
|
||||
results = dao_get_notification_outcomes_for_job(sample_service.id, sample_job.id)
|
||||
assert [(row.count, row.status) for row in results] == [
|
||||
(1, 'created'),
|
||||
(1, 'sending'),
|
||||
(1, 'delivered'),
|
||||
(1, 'pending'),
|
||||
(1, 'failed'),
|
||||
(1, 'technical-failure'),
|
||||
(1, 'temporary-failure'),
|
||||
(1, 'permanent-failure')
|
||||
]
|
||||
|
||||
|
||||
def test_should_count_of_statuses_for_notifications_associated_with_job(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service,
|
||||
sample_job):
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='created')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='created')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='sending')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='sending')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='sending')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='sending')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='delivered')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=sample_job, status='delivered')
|
||||
|
||||
results = dao_get_notification_outcomes_for_job(sample_service.id, sample_job.id)
|
||||
assert [(row.count, row.status) for row in results] == [
|
||||
(2, 'created'),
|
||||
(4, 'sending'),
|
||||
(2, 'delivered')
|
||||
]
|
||||
|
||||
|
||||
def test_should_return_zero_length_array_if_no_notifications_for_job(sample_service, sample_job):
|
||||
assert len(dao_get_notification_outcomes_for_job(sample_job.id, sample_service.id)) == 0
|
||||
|
||||
|
||||
def test_should_return_notifications_only_for_this_job(notify_db, notify_db_session, sample_service):
|
||||
job_1 = sample_job(notify_db, notify_db_session, service=sample_service)
|
||||
job_2 = sample_job(notify_db, notify_db_session, service=sample_service)
|
||||
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=job_1, status='created')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_service, job=job_2, status='created')
|
||||
|
||||
results = dao_get_notification_outcomes_for_job(sample_service.id, job_1.id)
|
||||
assert [(row.count, row.status) for row in results] == [
|
||||
(1, 'created')
|
||||
]
|
||||
|
||||
|
||||
def test_should_return_notifications_only_for_this_service(notify_db, notify_db_session):
|
||||
service_1 = sample_service(notify_db, notify_db_session, service_name="one", email_from="one")
|
||||
service_2 = sample_service(notify_db, notify_db_session, service_name="two", email_from="two")
|
||||
|
||||
job_1 = sample_job(notify_db, notify_db_session, service=service_1)
|
||||
job_2 = sample_job(notify_db, notify_db_session, service=service_2)
|
||||
|
||||
sample_notification(notify_db, notify_db_session, service=service_1, job=job_1, status='created')
|
||||
sample_notification(notify_db, notify_db_session, service=service_2, job=job_2, status='created')
|
||||
|
||||
assert len(dao_get_notification_outcomes_for_job(service_1.id, job_2.id)) == 0
|
||||
|
||||
|
||||
def test_create_job(sample_template):
|
||||
|
||||
@@ -9,7 +9,7 @@ import app.celery.tasks
|
||||
from tests import create_authorization_header
|
||||
from tests.app.conftest import (
|
||||
sample_job as create_job,
|
||||
sample_notification as create_sample_notification)
|
||||
sample_notification as create_sample_notification, sample_notification)
|
||||
from app.dao.templates_dao import dao_update_template
|
||||
from app.models import NOTIFICATION_STATUS_TYPES
|
||||
|
||||
@@ -94,20 +94,6 @@ def test_get_job_with_unknown_id_returns404(notify_api, sample_template, fake_uu
|
||||
}
|
||||
|
||||
|
||||
def test_get_job_by_id(notify_api, sample_job):
|
||||
job_id = str(sample_job.id)
|
||||
service_id = sample_job.service.id
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
path = '/service/{}/job/{}'.format(service_id, job_id)
|
||||
auth_header = create_authorization_header(service_id=sample_job.service.id)
|
||||
response = client.get(path, headers=[auth_header])
|
||||
assert response.status_code == 200
|
||||
resp_json = json.loads(response.get_data(as_text=True))
|
||||
assert resp_json['data']['id'] == job_id
|
||||
assert resp_json['data']['created_by']['name'] == 'Test User'
|
||||
|
||||
|
||||
def test_create_job(notify_api, sample_template, mocker, fake_uuid):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
@@ -123,6 +109,7 @@ def test_create_job(notify_api, sample_template, mocker, fake_uuid):
|
||||
path = '/service/{}/job'.format(sample_template.service.id)
|
||||
auth_header = create_authorization_header(service_id=sample_template.service.id)
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
|
||||
response = client.post(
|
||||
path,
|
||||
data=json.dumps(data),
|
||||
@@ -350,3 +337,83 @@ def test_get_all_notifications_for_job_filtered_by_status(
|
||||
resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(resp['notifications']) == expected_notification_count
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_get_job_by_id(notify_api, sample_job):
|
||||
job_id = str(sample_job.id)
|
||||
service_id = sample_job.service.id
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
path = '/service/{}/job/{}'.format(service_id, job_id)
|
||||
auth_header = create_authorization_header(service_id=sample_job.service.id)
|
||||
response = client.get(path, headers=[auth_header])
|
||||
assert response.status_code == 200
|
||||
resp_json = json.loads(response.get_data(as_text=True))
|
||||
assert resp_json['data']['id'] == job_id
|
||||
assert resp_json['data']['statistics'] == []
|
||||
assert resp_json['data']['created_by']['name'] == 'Test User'
|
||||
|
||||
|
||||
def test_get_job_by_id_should_return_statistics(notify_db, notify_db_session, notify_api, sample_job):
|
||||
job_id = str(sample_job.id)
|
||||
service_id = sample_job.service.id
|
||||
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='created')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='sending')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='delivered')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='pending')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='failed')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='technical-failure') # noqa
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='temporary-failure') # noqa
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='permanent-failure') # noqa
|
||||
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
path = '/service/{}/job/{}'.format(service_id, job_id)
|
||||
auth_header = create_authorization_header(service_id=sample_job.service.id)
|
||||
response = client.get(path, headers=[auth_header])
|
||||
assert response.status_code == 200
|
||||
resp_json = json.loads(response.get_data(as_text=True))
|
||||
print(resp_json)
|
||||
assert resp_json['data']['id'] == job_id
|
||||
assert {'status': 'created', 'count': 1} in resp_json['data']['statistics']
|
||||
assert {'status': 'sending', 'count': 1} in resp_json['data']['statistics']
|
||||
assert {'status': 'delivered', 'count': 1} in resp_json['data']['statistics']
|
||||
assert {'status': 'pending', 'count': 1} in resp_json['data']['statistics']
|
||||
assert {'status': 'failed', 'count': 1} in resp_json['data']['statistics']
|
||||
assert {'status': 'technical-failure', 'count': 1} in resp_json['data']['statistics']
|
||||
assert {'status': 'temporary-failure', 'count': 1} in resp_json['data']['statistics']
|
||||
assert {'status': 'permanent-failure', 'count': 1} in resp_json['data']['statistics']
|
||||
assert resp_json['data']['created_by']['name'] == 'Test User'
|
||||
|
||||
|
||||
def test_get_job_by_id_should_return_summed_statistics(notify_db, notify_db_session, notify_api, sample_job):
|
||||
job_id = str(sample_job.id)
|
||||
service_id = sample_job.service.id
|
||||
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='created')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='created')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='created')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='sending')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='failed')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='failed')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='failed')
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='technical-failure') # noqa
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='temporary-failure') # noqa
|
||||
sample_notification(notify_db, notify_db_session, service=sample_job.service, job=sample_job, status='temporary-failure') # noqa
|
||||
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
path = '/service/{}/job/{}'.format(service_id, job_id)
|
||||
auth_header = create_authorization_header(service_id=sample_job.service.id)
|
||||
response = client.get(path, headers=[auth_header])
|
||||
assert response.status_code == 200
|
||||
resp_json = json.loads(response.get_data(as_text=True))
|
||||
print(resp_json)
|
||||
assert resp_json['data']['id'] == job_id
|
||||
assert {'status': 'created', 'count': 3} in resp_json['data']['statistics']
|
||||
assert {'status': 'sending', 'count': 1} in resp_json['data']['statistics']
|
||||
assert {'status': 'failed', 'count': 3} in resp_json['data']['statistics']
|
||||
assert {'status': 'technical-failure', 'count': 1} in resp_json['data']['statistics']
|
||||
assert {'status': 'temporary-failure', 'count': 2} in resp_json['data']['statistics']
|
||||
assert resp_json['data']['created_by']['name'] == 'Test User'
|
||||
|
||||
Reference in New Issue
Block a user