mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-03 01:41:05 -05:00
move get_all_notifications_for_service and get_all_notifications_for_job
moved from notifications/rest -> service/rest and job/rest respectively endpoint routes not affected removed requires_admin decorator - that should be set by nginx config as opposed to python code
This commit is contained in:
@@ -59,14 +59,3 @@ def fetch_client(client):
|
||||
"client": client,
|
||||
"secret": get_unsigned_secrets(client)
|
||||
}
|
||||
|
||||
|
||||
def require_admin():
|
||||
def wrap(func):
|
||||
@wraps(func)
|
||||
def wrap_func(*args, **kwargs):
|
||||
if not api_user['client'] == current_app.config.get('ADMIN_CLIENT_USER_NAME'):
|
||||
abort(403)
|
||||
return func(*args, **kwargs)
|
||||
return wrap_func
|
||||
return wrap
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
from flask import (
|
||||
Blueprint,
|
||||
jsonify,
|
||||
request
|
||||
request,
|
||||
current_app
|
||||
)
|
||||
|
||||
from app.dao.jobs_dao import (
|
||||
@@ -15,11 +16,14 @@ from app.dao.services_dao import (
|
||||
)
|
||||
|
||||
from app.dao.templates_dao import (dao_get_template_by_id)
|
||||
from app.dao.notifications_dao import get_notifications_for_job
|
||||
|
||||
from app.schemas import job_schema, unarchived_template_schema
|
||||
from app.schemas import job_schema, unarchived_template_schema, notifications_filter_schema, notification_status_schema
|
||||
|
||||
from app.celery.tasks import process_job
|
||||
|
||||
from app.utils import pagination_links
|
||||
|
||||
job = Blueprint('job', __name__, url_prefix='/service/<uuid:service_id>/job')
|
||||
|
||||
from app.errors import (
|
||||
@@ -37,6 +41,33 @@ def get_job_by_service_and_job_id(service_id, job_id):
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
@job.route('/<job_id>/notifications', methods=['GET'])
|
||||
def get_all_notifications_for_service_job(service_id, job_id):
|
||||
data = notifications_filter_schema.load(request.args).data
|
||||
page = data['page'] if 'page' in data else 1
|
||||
page_size = data['page_size'] if 'page_size' in data else current_app.config.get('PAGE_SIZE')
|
||||
|
||||
pagination = get_notifications_for_job(
|
||||
service_id,
|
||||
job_id,
|
||||
filter_dict=data,
|
||||
page=page,
|
||||
page_size=page_size)
|
||||
kwargs = request.args.to_dict()
|
||||
kwargs['service_id'] = service_id
|
||||
kwargs['job_id'] = job_id
|
||||
return jsonify(
|
||||
notifications=notification_status_schema.dump(pagination.items, many=True).data,
|
||||
page_size=page_size,
|
||||
total=pagination.total,
|
||||
links=pagination_links(
|
||||
pagination,
|
||||
'.get_all_notifications_for_service_job',
|
||||
**kwargs
|
||||
)
|
||||
), 200
|
||||
|
||||
|
||||
@job.route('', methods=['GET'])
|
||||
def get_jobs_by_service(service_id):
|
||||
if request.args.get('limit_days'):
|
||||
|
||||
@@ -5,14 +5,12 @@ from flask import (
|
||||
jsonify,
|
||||
request,
|
||||
current_app,
|
||||
url_for,
|
||||
json
|
||||
)
|
||||
from notifications_utils.recipients import allowed_to_send_to, first_column_heading
|
||||
from notifications_utils.template import Template
|
||||
from app.clients.email.aws_ses import get_aws_responses
|
||||
from app import api_user, encryption, create_uuid, DATETIME_FORMAT, DATE_FORMAT, statsd_client
|
||||
from app.authentication.auth import require_admin
|
||||
from app.dao import (
|
||||
templates_dao,
|
||||
services_dao,
|
||||
@@ -32,6 +30,7 @@ from app.schemas import (
|
||||
unarchived_template_schema
|
||||
)
|
||||
from app.celery.tasks import send_sms, send_email
|
||||
from app.utils import pagination_links
|
||||
|
||||
notifications = Blueprint('notifications', __name__)
|
||||
|
||||
@@ -199,74 +198,6 @@ def get_all_notifications():
|
||||
), 200
|
||||
|
||||
|
||||
@notifications.route('/service/<service_id>/notifications', methods=['GET'])
|
||||
@require_admin()
|
||||
def get_all_notifications_for_service(service_id):
|
||||
data = notifications_filter_schema.load(request.args).data
|
||||
page = data['page'] if 'page' in data else 1
|
||||
page_size = data['page_size'] if 'page_size' in data else current_app.config.get('PAGE_SIZE')
|
||||
limit_days = data.get('limit_days')
|
||||
|
||||
pagination = notifications_dao.get_notifications_for_service(
|
||||
service_id,
|
||||
filter_dict=data,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
limit_days=limit_days)
|
||||
kwargs = request.args.to_dict()
|
||||
kwargs['service_id'] = service_id
|
||||
return jsonify(
|
||||
notifications=notification_status_schema.dump(pagination.items, many=True).data,
|
||||
page_size=page_size,
|
||||
total=pagination.total,
|
||||
links=pagination_links(
|
||||
pagination,
|
||||
'.get_all_notifications_for_service',
|
||||
**kwargs
|
||||
)
|
||||
), 200
|
||||
|
||||
|
||||
@notifications.route('/service/<service_id>/job/<job_id>/notifications', methods=['GET'])
|
||||
@require_admin()
|
||||
def get_all_notifications_for_service_job(service_id, job_id):
|
||||
data = notifications_filter_schema.load(request.args).data
|
||||
page = data['page'] if 'page' in data else 1
|
||||
page_size = data['page_size'] if 'page_size' in data else current_app.config.get('PAGE_SIZE')
|
||||
|
||||
pagination = notifications_dao.get_notifications_for_job(
|
||||
service_id,
|
||||
job_id,
|
||||
filter_dict=data,
|
||||
page=page,
|
||||
page_size=page_size)
|
||||
kwargs = request.args.to_dict()
|
||||
kwargs['service_id'] = service_id
|
||||
kwargs['job_id'] = job_id
|
||||
return jsonify(
|
||||
notifications=notification_status_schema.dump(pagination.items, many=True).data,
|
||||
page_size=page_size,
|
||||
total=pagination.total,
|
||||
links=pagination_links(
|
||||
pagination,
|
||||
'.get_all_notifications_for_service_job',
|
||||
**kwargs
|
||||
)
|
||||
), 200
|
||||
|
||||
|
||||
def pagination_links(pagination, endpoint, **kwargs):
|
||||
if 'page' in kwargs:
|
||||
kwargs.pop('page', None)
|
||||
links = dict()
|
||||
if pagination.has_prev:
|
||||
links['prev'] = url_for(endpoint, page=pagination.prev_num, **kwargs)
|
||||
if pagination.has_next:
|
||||
links['next'] = url_for(endpoint, page=pagination.next_num, **kwargs)
|
||||
links['last'] = url_for(endpoint, page=pagination.pages, **kwargs)
|
||||
return links
|
||||
|
||||
|
||||
@notifications.route('/notifications/<string:notification_type>', methods=['POST'])
|
||||
def send_notification(notification_type):
|
||||
if notification_type not in ['sms', 'email']:
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
from datetime import (
|
||||
datetime,
|
||||
date
|
||||
)
|
||||
from datetime import date
|
||||
|
||||
from flask import (
|
||||
jsonify,
|
||||
request,
|
||||
Blueprint
|
||||
Blueprint,
|
||||
current_app
|
||||
)
|
||||
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from app.dao.api_key_dao import (
|
||||
@@ -26,19 +23,19 @@ from app.dao.services_dao import (
|
||||
dao_add_user_to_service,
|
||||
dao_remove_user_from_service
|
||||
)
|
||||
|
||||
from app.dao import notifications_dao
|
||||
from app.dao.provider_statistics_dao import get_fragment_count
|
||||
|
||||
from app.dao.users_dao import get_model_users
|
||||
|
||||
from app.schemas import (
|
||||
service_schema,
|
||||
api_key_schema,
|
||||
user_schema,
|
||||
from_to_date_schema,
|
||||
permission_schema
|
||||
permission_schema,
|
||||
notification_status_schema,
|
||||
notifications_filter_schema,
|
||||
)
|
||||
|
||||
from app.utils import pagination_links
|
||||
from app.errors import (
|
||||
register_errors,
|
||||
InvalidRequest
|
||||
@@ -208,3 +205,30 @@ def get_service_history(service_id):
|
||||
'events': events_data}
|
||||
|
||||
return jsonify(data=data)
|
||||
|
||||
|
||||
@service.route('/<uuid:service_id>/notifications', methods=['GET'])
|
||||
def get_all_notifications_for_service(service_id):
|
||||
data = notifications_filter_schema.load(request.args).data
|
||||
page = data['page'] if 'page' in data else 1
|
||||
page_size = data['page_size'] if 'page_size' in data else current_app.config.get('PAGE_SIZE')
|
||||
limit_days = data.get('limit_days')
|
||||
|
||||
pagination = notifications_dao.get_notifications_for_service(
|
||||
service_id,
|
||||
filter_dict=data,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
limit_days=limit_days)
|
||||
kwargs = request.args.to_dict()
|
||||
kwargs['service_id'] = service_id
|
||||
return jsonify(
|
||||
notifications=notification_status_schema.dump(pagination.items, many=True).data,
|
||||
page_size=page_size,
|
||||
total=pagination.total,
|
||||
links=pagination_links(
|
||||
pagination,
|
||||
'.get_all_notifications_for_service',
|
||||
**kwargs
|
||||
)
|
||||
), 200
|
||||
|
||||
13
app/utils.py
Normal file
13
app/utils.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from flask import url_for
|
||||
|
||||
|
||||
def pagination_links(pagination, endpoint, **kwargs):
|
||||
if 'page' in kwargs:
|
||||
kwargs.pop('page', None)
|
||||
links = dict()
|
||||
if pagination.has_prev:
|
||||
links['prev'] = url_for(endpoint, page=pagination.prev_num, **kwargs)
|
||||
if pagination.has_next:
|
||||
links['next'] = url_for(endpoint, page=pagination.next_num, **kwargs)
|
||||
links['last'] = url_for(endpoint, page=pagination.pages, **kwargs)
|
||||
return links
|
||||
@@ -2,11 +2,16 @@ import json
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import pytest
|
||||
|
||||
import app.celery.tasks
|
||||
|
||||
from tests import create_authorization_header
|
||||
from tests.app.conftest import sample_job as create_job
|
||||
from tests.app.conftest import (
|
||||
sample_job as create_job,
|
||||
sample_notification as create_sample_notification)
|
||||
from app.dao.templates_dao import dao_update_template
|
||||
from app.models import NOTIFICATION_STATUS_TYPES
|
||||
|
||||
|
||||
def test_get_jobs(notify_api, notify_db, notify_db_session, sample_template):
|
||||
@@ -239,3 +244,109 @@ def _setup_jobs(notify_db, notify_db_session, template, number_of_jobs=5):
|
||||
notify_db_session,
|
||||
service=template.service,
|
||||
template=template)
|
||||
|
||||
|
||||
def test_get_all_notifications_for_job_in_order_of_job_number(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service):
|
||||
with notify_api.test_request_context(), notify_api.test_client() as client:
|
||||
main_job = create_job(notify_db, notify_db_session, service=sample_service)
|
||||
another_job = create_job(notify_db, notify_db_session, service=sample_service)
|
||||
|
||||
notification_1 = create_sample_notification(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
job=main_job,
|
||||
to_field="1",
|
||||
created_at=datetime.utcnow(),
|
||||
job_row_number=1
|
||||
)
|
||||
notification_2 = create_sample_notification(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
job=main_job,
|
||||
to_field="2",
|
||||
created_at=datetime.utcnow(),
|
||||
job_row_number=2
|
||||
)
|
||||
notification_3 = create_sample_notification(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
job=main_job,
|
||||
to_field="3",
|
||||
created_at=datetime.utcnow(),
|
||||
job_row_number=3
|
||||
)
|
||||
create_sample_notification(notify_db, notify_db_session, job=another_job)
|
||||
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
response = client.get(
|
||||
path='/service/{}/job/{}/notifications'.format(sample_service.id, main_job.id),
|
||||
headers=[auth_header])
|
||||
|
||||
resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(resp['notifications']) == 3
|
||||
assert resp['notifications'][0]['to'] == notification_1.to
|
||||
assert resp['notifications'][0]['job_row_number'] == notification_1.job_row_number
|
||||
assert resp['notifications'][1]['to'] == notification_2.to
|
||||
assert resp['notifications'][1]['job_row_number'] == notification_2.job_row_number
|
||||
assert resp['notifications'][2]['to'] == notification_3.to
|
||||
assert resp['notifications'][2]['job_row_number'] == notification_3.job_row_number
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"expected_notification_count, status_args",
|
||||
[
|
||||
(
|
||||
1,
|
||||
'?status={}'.format(NOTIFICATION_STATUS_TYPES[0])
|
||||
),
|
||||
(
|
||||
0,
|
||||
'?status={}'.format(NOTIFICATION_STATUS_TYPES[1])
|
||||
),
|
||||
(
|
||||
1,
|
||||
'?status={}&status={}&status={}'.format(
|
||||
*NOTIFICATION_STATUS_TYPES[0:3]
|
||||
)
|
||||
),
|
||||
(
|
||||
0,
|
||||
'?status={}&status={}&status={}'.format(
|
||||
*NOTIFICATION_STATUS_TYPES[3:6]
|
||||
)
|
||||
),
|
||||
]
|
||||
)
|
||||
def test_get_all_notifications_for_job_filtered_by_status(
|
||||
notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service,
|
||||
expected_notification_count,
|
||||
status_args
|
||||
):
|
||||
with notify_api.test_request_context(), notify_api.test_client() as client:
|
||||
job = create_job(notify_db, notify_db_session, service=sample_service)
|
||||
|
||||
create_sample_notification(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
job=job,
|
||||
to_field="1",
|
||||
created_at=datetime.utcnow(),
|
||||
status=NOTIFICATION_STATUS_TYPES[0],
|
||||
job_row_number=1
|
||||
)
|
||||
|
||||
response = client.get(
|
||||
path='/service/{}/job/{}/notifications{}'.format(sample_service.id, job.id, status_args),
|
||||
headers=[create_authorization_header()]
|
||||
)
|
||||
resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(resp['notifications']) == expected_notification_count
|
||||
assert response.status_code == 200
|
||||
|
||||
@@ -1,15 +1,12 @@
|
||||
from datetime import datetime, timedelta
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from flask import json
|
||||
|
||||
import app.celery.tasks
|
||||
from app.models import NOTIFICATION_STATUS_TYPES
|
||||
from app.dao.notifications_dao import get_notification_by_id, dao_get_notification_statistics_for_service
|
||||
from tests import create_authorization_header
|
||||
from tests.app.conftest import sample_notification as create_sample_notification
|
||||
from tests.app.conftest import sample_job as create_sample_job
|
||||
from tests.app.conftest import sample_service as create_sample_service
|
||||
|
||||
|
||||
@@ -137,170 +134,6 @@ def test_get_all_notifications_newest_first(notify_api, notify_db, notify_db_ses
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_get_all_notifications_for_service_in_order(notify_api, notify_db, notify_db_session):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
service_1 = create_sample_service(notify_db, notify_db_session, service_name="1", email_from='1')
|
||||
service_2 = create_sample_service(notify_db, notify_db_session, service_name="2", email_from='2')
|
||||
|
||||
create_sample_notification(notify_db, notify_db_session, service=service_2)
|
||||
|
||||
notification_1 = create_sample_notification(notify_db, notify_db_session, service=service_1)
|
||||
notification_2 = create_sample_notification(notify_db, notify_db_session, service=service_1)
|
||||
notification_3 = create_sample_notification(notify_db, notify_db_session, service=service_1)
|
||||
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
response = client.get(
|
||||
path='/service/{}/notifications'.format(service_1.id),
|
||||
headers=[auth_header])
|
||||
|
||||
resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(resp['notifications']) == 3
|
||||
assert resp['notifications'][0]['to'] == notification_3.to
|
||||
assert resp['notifications'][1]['to'] == notification_2.to
|
||||
assert resp['notifications'][2]['to'] == notification_1.to
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_get_all_notifications_for_job_in_order_of_job_number(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
main_job = create_sample_job(notify_db, notify_db_session, service=sample_service)
|
||||
another_job = create_sample_job(notify_db, notify_db_session, service=sample_service)
|
||||
|
||||
notification_1 = create_sample_notification(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
job=main_job,
|
||||
to_field="1",
|
||||
created_at=datetime.utcnow(),
|
||||
job_row_number=1
|
||||
)
|
||||
notification_2 = create_sample_notification(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
job=main_job,
|
||||
to_field="2",
|
||||
created_at=datetime.utcnow(),
|
||||
job_row_number=2
|
||||
)
|
||||
notification_3 = create_sample_notification(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
job=main_job,
|
||||
to_field="3",
|
||||
created_at=datetime.utcnow(),
|
||||
job_row_number=3
|
||||
)
|
||||
create_sample_notification(notify_db, notify_db_session, job=another_job)
|
||||
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
response = client.get(
|
||||
path='/service/{}/job/{}/notifications'.format(sample_service.id, main_job.id),
|
||||
headers=[auth_header])
|
||||
|
||||
resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(resp['notifications']) == 3
|
||||
assert resp['notifications'][0]['to'] == notification_1.to
|
||||
assert resp['notifications'][0]['job_row_number'] == notification_1.job_row_number
|
||||
assert resp['notifications'][1]['to'] == notification_2.to
|
||||
assert resp['notifications'][1]['job_row_number'] == notification_2.job_row_number
|
||||
assert resp['notifications'][2]['to'] == notification_3.to
|
||||
assert resp['notifications'][2]['job_row_number'] == notification_3.job_row_number
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"expected_notification_count, status_args",
|
||||
[
|
||||
(
|
||||
1,
|
||||
'?status={}'.format(NOTIFICATION_STATUS_TYPES[0])
|
||||
),
|
||||
(
|
||||
0,
|
||||
'?status={}'.format(NOTIFICATION_STATUS_TYPES[1])
|
||||
),
|
||||
(
|
||||
1,
|
||||
'?status={}&status={}&status={}'.format(
|
||||
*NOTIFICATION_STATUS_TYPES[0:3]
|
||||
)
|
||||
),
|
||||
(
|
||||
0,
|
||||
'?status={}&status={}&status={}'.format(
|
||||
*NOTIFICATION_STATUS_TYPES[3:6]
|
||||
)
|
||||
),
|
||||
]
|
||||
)
|
||||
def test_get_all_notifications_for_job_filtered_by_status(
|
||||
notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service,
|
||||
expected_notification_count,
|
||||
status_args
|
||||
):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
job = create_sample_job(notify_db, notify_db_session, service=sample_service)
|
||||
|
||||
create_sample_notification(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
job=job,
|
||||
to_field="1",
|
||||
created_at=datetime.utcnow(),
|
||||
status=NOTIFICATION_STATUS_TYPES[0],
|
||||
job_row_number=1
|
||||
)
|
||||
|
||||
response = client.get(
|
||||
path='/service/{}/job/{}/notifications{}'.format(sample_service.id, job.id, status_args),
|
||||
headers=[create_authorization_header()]
|
||||
)
|
||||
resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(resp['notifications']) == expected_notification_count
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_should_not_get_notifications_by_service_with_client_credentials(notify_api, sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header(service_id=sample_api_key.service.id)
|
||||
|
||||
response = client.get(
|
||||
'/service/{}/notifications'.format(sample_api_key.service.id),
|
||||
headers=[auth_header])
|
||||
|
||||
resp = json.loads(response.get_data(as_text=True))
|
||||
assert response.status_code == 403
|
||||
assert resp['result'] == 'error'
|
||||
assert resp['message'] == 'Forbidden, invalid authentication token provided'
|
||||
|
||||
|
||||
def test_should_not_get_notifications_by_job_and_service_with_client_credentials(notify_api, sample_job):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header(service_id=sample_job.service.id)
|
||||
|
||||
response = client.get(
|
||||
'/service/{}/job/{}/notifications'.format(sample_job.service.id, sample_job.id),
|
||||
headers=[auth_header])
|
||||
|
||||
resp = json.loads(response.get_data(as_text=True))
|
||||
assert response.status_code == 403
|
||||
assert resp['result'] == 'error'
|
||||
assert resp['message'] == 'Forbidden, invalid authentication token provided'
|
||||
|
||||
|
||||
def test_should_reject_invalid_page_param(notify_api, sample_email_template):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
|
||||
@@ -6,9 +6,12 @@ from app.dao.users_dao import save_model_user
|
||||
from app.dao.services_dao import dao_remove_user_from_service
|
||||
from app.models import User
|
||||
from tests import create_authorization_header
|
||||
from tests.app.conftest import sample_service as create_sample_service
|
||||
from tests.app.conftest import sample_service_permission as create_sample_service_permission
|
||||
from tests.app.conftest import sample_user as create_sample_user
|
||||
from tests.app.conftest import (
|
||||
sample_service as create_sample_service,
|
||||
sample_service_permission as create_sample_service_permission,
|
||||
sample_user as create_sample_user,
|
||||
sample_notification as create_sample_notification
|
||||
)
|
||||
|
||||
|
||||
def test_get_service_list(notify_api, service_factory):
|
||||
@@ -1001,3 +1004,28 @@ def test_set_reply_to_email_for_service(notify_api, sample_service):
|
||||
result = json.loads(resp.get_data(as_text=True))
|
||||
assert resp.status_code == 200
|
||||
assert result['data']['reply_to_email_address'] == 'reply_test@service.gov.uk'
|
||||
|
||||
|
||||
def test_get_all_notifications_for_service_in_order(notify_api, notify_db, notify_db_session):
|
||||
with notify_api.test_request_context(), notify_api.test_client() as client:
|
||||
service_1 = create_sample_service(notify_db, notify_db_session, service_name="1", email_from='1')
|
||||
service_2 = create_sample_service(notify_db, notify_db_session, service_name="2", email_from='2')
|
||||
|
||||
create_sample_notification(notify_db, notify_db_session, service=service_2)
|
||||
|
||||
notification_1 = create_sample_notification(notify_db, notify_db_session, service=service_1)
|
||||
notification_2 = create_sample_notification(notify_db, notify_db_session, service=service_1)
|
||||
notification_3 = create_sample_notification(notify_db, notify_db_session, service=service_1)
|
||||
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
response = client.get(
|
||||
path='/service/{}/notifications'.format(service_1.id),
|
||||
headers=[auth_header])
|
||||
|
||||
resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(resp['notifications']) == 3
|
||||
assert resp['notifications'][0]['to'] == notification_3.to
|
||||
assert resp['notifications'][1]['to'] == notification_2.to
|
||||
assert resp['notifications'][2]['to'] == notification_1.to
|
||||
assert response.status_code == 200
|
||||
|
||||
Reference in New Issue
Block a user