mirror of
https://github.com/GSA/notifications-api.git
synced 2026-04-05 09:59:17 -04:00
Merge with master.
This commit is contained in:
@@ -1,7 +1,10 @@
|
||||
from flask import request, jsonify, _request_ctx_stack, current_app
|
||||
from notifications_python_client.authentication import decode_jwt_token, get_token_issuer
|
||||
from notifications_python_client.errors import TokenDecodeError, TokenRequestError, TokenExpiredError, TokenPayloadError
|
||||
from werkzeug.exceptions import abort
|
||||
from app.dao.api_key_dao import get_unsigned_secrets
|
||||
from app import api_user
|
||||
from functools import wraps
|
||||
|
||||
|
||||
def authentication_response(message, code):
|
||||
@@ -68,3 +71,14 @@ def fetch_client(client):
|
||||
"client": client,
|
||||
"secret": get_unsigned_secrets(client)
|
||||
}
|
||||
|
||||
|
||||
def require_admin():
|
||||
def wrap(func):
|
||||
@wraps(func)
|
||||
def wrap_func(*args, **kwargs):
|
||||
if not api_user['client'] == current_app.config.get('ADMIN_CLIENT_USER_NAME'):
|
||||
abort(403)
|
||||
return func(*args, **kwargs)
|
||||
return wrap_func
|
||||
return wrap
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from flask import current_app
|
||||
from app import db
|
||||
from app.models import Notification
|
||||
from sqlalchemy import desc
|
||||
|
||||
|
||||
def dao_create_notification(notification):
|
||||
@@ -16,9 +18,23 @@ def get_notification_for_job(service_id, job_id, notification_id):
|
||||
return Notification.query.filter_by(service_id=service_id, job_id=job_id, id=notification_id).one()
|
||||
|
||||
|
||||
def get_notifications_for_job(service_id, job_id):
|
||||
return Notification.query.filter_by(service_id=service_id, job_id=job_id).all()
|
||||
def get_notifications_for_job(service_id, job_id, page=1):
|
||||
query = Notification.query.filter_by(service_id=service_id, job_id=job_id)\
|
||||
.order_by(desc(Notification.created_at))\
|
||||
.paginate(
|
||||
page=page,
|
||||
per_page=current_app.config['PAGE_SIZE']
|
||||
)
|
||||
return query
|
||||
|
||||
|
||||
def get_notification(service_id, notification_id):
|
||||
return Notification.query.filter_by(service_id=service_id, id=notification_id).one()
|
||||
|
||||
|
||||
def get_notifications_for_service(service_id, page=1):
|
||||
query = Notification.query.filter_by(service_id=service_id).order_by(desc(Notification.created_at)).paginate(
|
||||
page=page,
|
||||
per_page=current_app.config['PAGE_SIZE']
|
||||
)
|
||||
return query
|
||||
|
||||
@@ -18,7 +18,6 @@ default_service_permissions = [
|
||||
MANAGE_API_KEYS,
|
||||
MANAGE_TEMPLATES]
|
||||
|
||||
|
||||
class PermissionDAO(DAOClass):
|
||||
|
||||
class Meta:
|
||||
|
||||
@@ -42,14 +42,33 @@ def get_invited_users_by_service(service_id):
|
||||
|
||||
@invite.route('/<invited_user_id>', methods=['GET'])
|
||||
def get_invited_user_by_service_and_id(service_id, invited_user_id):
|
||||
invited_user = get_invited_user(service_id, invited_user_id)
|
||||
invited_user = get_invited_user(service_id=service_id, invited_user_id=invited_user_id)
|
||||
if not invited_user:
|
||||
message = 'Invited user not found for service id: {} and invited user id: {}'.format(service_id,
|
||||
invited_user_id)
|
||||
return jsonify(result='error', message=message), 404
|
||||
return _invited_user_not_found(service_id, invited_user_id)
|
||||
return jsonify(data=invited_user_schema.dump(invited_user).data), 200
|
||||
|
||||
|
||||
@invite.route('/<invited_user_id>', methods=['POST'])
|
||||
def update_invited_user(service_id, invited_user_id):
|
||||
fetched = get_invited_user(service_id=service_id, invited_user_id=invited_user_id)
|
||||
if not fetched:
|
||||
return _invited_user_not_found(service_id=service_id, invited_user_id=invited_user_id)
|
||||
|
||||
current_data = dict(invited_user_schema.dump(fetched).data.items())
|
||||
current_data.update(request.get_json())
|
||||
update_dict, errors = invited_user_schema.load(current_data)
|
||||
if errors:
|
||||
return jsonify(result='error', message=errors), 400
|
||||
save_invited_user(update_dict)
|
||||
return jsonify(data=invited_user_schema.dump(fetched).data), 200
|
||||
|
||||
|
||||
def _invited_user_not_found(service_id, invited_user_id):
|
||||
message = 'Invited user not found for service id: {} and invited user id: {}'.format(service_id,
|
||||
invited_user_id)
|
||||
return jsonify(result='error', message=message), 404
|
||||
|
||||
|
||||
def _create_invitation(invited_user):
|
||||
from utils.url_safe_token import generate_token
|
||||
token = generate_token(str(invited_user.id), current_app.config['SECRET_KEY'], current_app.config['DANGEROUS_SALT'])
|
||||
|
||||
@@ -277,13 +277,17 @@ MANAGE_SERVICE = 'manage_service'
|
||||
SEND_MESSAGES = 'send_messages'
|
||||
MANAGE_API_KEYS = 'manage_api_keys'
|
||||
MANAGE_TEMPLATES = 'manage_templates'
|
||||
MANAGE_TEAM = 'manage_team'
|
||||
VIEW_ACTIVITY = 'view_activity'
|
||||
|
||||
# List of permissions
|
||||
PERMISSION_LIST = [
|
||||
MANAGE_SERVICE,
|
||||
SEND_MESSAGES,
|
||||
MANAGE_API_KEYS,
|
||||
MANAGE_TEMPLATES]
|
||||
MANAGE_TEMPLATES,
|
||||
MANAGE_TEAM,
|
||||
VIEW_ACTIVITY]
|
||||
|
||||
|
||||
class Permission(db.Model):
|
||||
@@ -295,10 +299,11 @@ class Permission(db.Model):
|
||||
service = db.relationship('Service')
|
||||
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), index=True, nullable=False)
|
||||
user = db.relationship('User')
|
||||
permission = db.Column(db.Enum(*PERMISSION_LIST, name='permission_types'),
|
||||
index=False,
|
||||
unique=False,
|
||||
nullable=False)
|
||||
permission = db.Column(
|
||||
db.Enum(*PERMISSION_LIST, name='permission_types'),
|
||||
index=False,
|
||||
unique=False,
|
||||
nullable=False)
|
||||
created_at = db.Column(
|
||||
db.DateTime,
|
||||
index=False,
|
||||
|
||||
@@ -4,10 +4,12 @@ from flask import (
|
||||
Blueprint,
|
||||
jsonify,
|
||||
request,
|
||||
current_app
|
||||
current_app,
|
||||
url_for
|
||||
)
|
||||
|
||||
from app import api_user, encryption, create_uuid
|
||||
from app.authentication.auth import require_admin
|
||||
from app.dao import (
|
||||
templates_dao,
|
||||
services_dao,
|
||||
@@ -40,6 +42,86 @@ def get_notifications(notification_id):
|
||||
return jsonify(result="error", message="not found"), 404
|
||||
|
||||
|
||||
@notifications.route('/notifications', methods=['GET'])
|
||||
def get_all_notifications():
|
||||
page = get_page_from_request()
|
||||
|
||||
if not page:
|
||||
return jsonify(result="error", message="Invalid page"), 400
|
||||
|
||||
all_notifications = notifications_dao.get_notifications_for_service(api_user['client'], page)
|
||||
|
||||
return jsonify(
|
||||
notifications=notification_status_schema.dump(all_notifications.items, many=True).data,
|
||||
links=pagination_links(
|
||||
all_notifications,
|
||||
'.get_all_notifications',
|
||||
request.args
|
||||
)
|
||||
), 200
|
||||
|
||||
|
||||
@notifications.route('/service/<service_id>/notifications', methods=['GET'])
|
||||
@require_admin()
|
||||
def get_all_notifications_for_service(service_id):
|
||||
page = get_page_from_request()
|
||||
|
||||
if not page:
|
||||
return jsonify(result="error", message="Invalid page"), 400
|
||||
|
||||
all_notifications = notifications_dao.get_notifications_for_service(service_id, page)
|
||||
|
||||
return jsonify(
|
||||
notifications=notification_status_schema.dump(all_notifications.items, many=True).data,
|
||||
links=pagination_links(
|
||||
all_notifications,
|
||||
'.get_all_notifications_for_service',
|
||||
request.args
|
||||
)
|
||||
), 200
|
||||
|
||||
|
||||
@notifications.route('/service/<service_id>/job/<job_id>/notifications', methods=['GET'])
|
||||
@require_admin()
|
||||
def get_all_notifications_for_service_job(service_id, job_id):
|
||||
page = get_page_from_request()
|
||||
|
||||
if not page:
|
||||
return jsonify(result="error", message="Invalid page"), 400
|
||||
|
||||
all_notifications = notifications_dao.get_notifications_for_job(service_id, job_id, page)
|
||||
|
||||
return jsonify(
|
||||
notifications=notification_status_schema.dump(all_notifications.items, many=True).data,
|
||||
links=pagination_links(
|
||||
all_notifications,
|
||||
'.get_all_notifications_for_service_job',
|
||||
request.args
|
||||
)
|
||||
), 200
|
||||
|
||||
|
||||
def get_page_from_request():
|
||||
if 'page' in request.args:
|
||||
try:
|
||||
return int(request.args['page'])
|
||||
|
||||
except ValueError:
|
||||
return None
|
||||
else:
|
||||
return 1
|
||||
|
||||
|
||||
def pagination_links(pagination, endpoint, args):
|
||||
links = dict()
|
||||
if pagination.has_prev:
|
||||
links['prev'] = url_for(endpoint, **dict(list(args.items()) + [('page', pagination.prev_num)]))
|
||||
if pagination.has_next:
|
||||
links['next'] = url_for(endpoint, **dict(list(args.items()) + [('page', pagination.next_num)]))
|
||||
links['last'] = url_for(endpoint, **dict(list(args.items()) + [('page', pagination.pages)]))
|
||||
return links
|
||||
|
||||
|
||||
@notifications.route('/notifications/sms', methods=['POST'])
|
||||
def create_sms_notification():
|
||||
return send_notification(notification_type=SMS_NOTIFICATION)
|
||||
|
||||
@@ -23,6 +23,7 @@ class Config(object):
|
||||
SQLALCHEMY_RECORD_QUERIES = True
|
||||
VERIFY_CODE_FROM_EMAIL_ADDRESS = os.environ['VERIFY_CODE_FROM_EMAIL_ADDRESS']
|
||||
NOTIFY_EMAIL_DOMAIN = os.environ['NOTIFY_EMAIL_DOMAIN']
|
||||
PAGE_SIZE = 50
|
||||
|
||||
BROKER_URL = 'sqs://'
|
||||
BROKER_TRANSPORT_OPTIONS = {
|
||||
|
||||
@@ -31,6 +31,6 @@ def upgrade():
|
||||
def downgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
conn = op.get_bind()
|
||||
conn.execute("DELETE FROM permissions where permission='manage_templates")
|
||||
conn.execute("DELETE FROM permissions where permission='manage_templates'")
|
||||
|
||||
### end Alembic commands ###
|
||||
41
migrations/versions/0031_add_manage_team_permission.py
Normal file
41
migrations/versions/0031_add_manage_team_permission.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 0031_add_manage_team_permission
|
||||
Revises: 0030_add_template_permission
|
||||
Create Date: 2016-02-26 10:33:20.536362
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0031_add_manage_team_permission'
|
||||
down_revision = '0030_add_template_permission'
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
def upgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
conn = op.get_bind()
|
||||
user_services = conn.execute("SELECT * FROM user_to_service").fetchall()
|
||||
for entry in user_services:
|
||||
id_ = uuid.uuid4()
|
||||
created_at = datetime.now().isoformat().replace('T', ' ')
|
||||
conn.execute((
|
||||
"INSERT INTO permissions (id, user_id, service_id, permission, created_at)"
|
||||
" VALUES ('{}', '{}', '{}', 'manage_team', '{}')").format(id_, entry[0], entry[1], created_at))
|
||||
id_ = uuid.uuid4()
|
||||
conn.execute((
|
||||
"INSERT INTO permissions (id, user_id, service_id, permission, created_at)"
|
||||
" VALUES ('{}', '{}', '{}', 'view_activity', '{}')").format(id_, entry[0], entry[1], created_at))
|
||||
### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
conn = op.get_bind()
|
||||
conn.execute("DELETE FROM permissions where permission='manage_team'")
|
||||
conn.execute("DELETE FROM permissions where permission='view_activity'")
|
||||
|
||||
### end Alembic commands ###
|
||||
41
migrations/versions/0032_update_permission_to_enum.py
Normal file
41
migrations/versions/0032_update_permission_to_enum.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 49380ad07c88
|
||||
Revises: 0031_add_manage_team_permission
|
||||
Create Date: 2016-03-01 17:08:12.184393
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '49380ad07c88'
|
||||
down_revision = '0031_add_manage_team_permission'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
conn = op.get_bind()
|
||||
permissions = conn.execute("SELECT (id, permission) FROM permissions").fetchall()
|
||||
print(permissions)
|
||||
op.drop_column('permissions', 'permission')
|
||||
op.drop_constraint('uix_service_user_permission', 'permissions', type_='unique')
|
||||
op.add_column('permissions', sa.Column('permission', sa.Enum('manage_service', 'send_messages', 'manage_api_keys', 'manage_templates', 'manage_team', 'view_activity', name='permission_types'), nullable=False))
|
||||
for p in permissions:
|
||||
conn.execute("UPDATE permissions SET permission='{}' WHERE ID='{}'".format(p[1], p[0]))
|
||||
op.create_unique_constraint('uix_service_user_permission', 'permissions', ['service_id', 'user_id', '_permission'])
|
||||
### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
conn = op.get_bind()
|
||||
permissions = conn.execute("SELECT (id, permission) FROM permissions").fetchall()
|
||||
op.drop_column('permissions', 'permission')
|
||||
op.drop_constraint('uix_service_user_permission', 'permissions', type_='unique')
|
||||
op.add_column('permissions', sa.Column('permission', sa.VARCHAR(length=255), autoincrement=False, nullable=False))
|
||||
for p in permissions:
|
||||
conn.execute("UPDATE permissions SET permission='{}' WHERE ID='{}'".format(p[1], p[0]))
|
||||
op.create_unique_constraint('uix_service_user_permission', 'permissions', ['service_id', 'user_id', 'permission'])
|
||||
### end Alembic commands ###
|
||||
@@ -251,7 +251,8 @@ def sample_notification(notify_db,
|
||||
notify_db_session,
|
||||
service=None,
|
||||
template=None,
|
||||
job=None):
|
||||
job=None,
|
||||
to_field=None):
|
||||
if service is None:
|
||||
service = sample_service(notify_db, notify_db_session)
|
||||
if template is None:
|
||||
@@ -259,11 +260,15 @@ def sample_notification(notify_db,
|
||||
if job is None:
|
||||
job = sample_job(notify_db, notify_db_session, service=service, template=template)
|
||||
|
||||
notificaton_id = uuid.uuid4()
|
||||
to = '+44709123456'
|
||||
notification_id = uuid.uuid4()
|
||||
|
||||
if to_field:
|
||||
to = to_field
|
||||
else:
|
||||
to = '+44709123456'
|
||||
|
||||
data = {
|
||||
'id': notificaton_id,
|
||||
'id': notification_id,
|
||||
'to': to,
|
||||
'job': job,
|
||||
'service': service,
|
||||
|
||||
@@ -68,3 +68,14 @@ def test_get_invited_users_for_service_that_has_no_invites(notify_db, notify_db_
|
||||
|
||||
invites = get_invited_users_for_service(sample_service.id)
|
||||
assert len(invites) == 0
|
||||
|
||||
|
||||
def test_save_invited_user_sets_status_to_cancelled(notify_db, notify_db_session, sample_invited_user):
|
||||
assert InvitedUser.query.count() == 1
|
||||
saved = InvitedUser.query.get(sample_invited_user.id)
|
||||
assert saved.status == 'pending'
|
||||
saved.status = 'cancelled'
|
||||
save_invited_user(saved)
|
||||
assert InvitedUser.query.count() == 1
|
||||
cancelled_invited_user = InvitedUser.query.get(sample_invited_user.id)
|
||||
assert cancelled_invited_user.status == 'cancelled'
|
||||
|
||||
@@ -82,7 +82,7 @@ def test_get_all_notifications_for_job(notify_db, notify_db_session, sample_job)
|
||||
template=sample_job.template,
|
||||
job=sample_job)
|
||||
|
||||
notifcations_from_db = get_notifications_for_job(sample_job.service.id, sample_job.id)
|
||||
notifcations_from_db = get_notifications_for_job(sample_job.service.id, sample_job.id).items
|
||||
assert len(notifcations_from_db) == 5
|
||||
|
||||
|
||||
|
||||
@@ -196,3 +196,58 @@ def test_get_invited_user_by_service_but_unknown_invite_id_returns_404(notify_ap
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_update_invited_user_set_status_to_cancelled(notify_api, sample_invited_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
|
||||
data = {'status': 'cancelled'}
|
||||
url = '/service/{0}/invite/{1}'.format(sample_invited_user.service_id, sample_invited_user.id)
|
||||
auth_header = create_authorization_header(
|
||||
path=url,
|
||||
method='POST',
|
||||
request_body=json.dumps(data)
|
||||
)
|
||||
response = client.post(url,
|
||||
data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
assert response.status_code == 200
|
||||
json_resp = json.loads(response.get_data(as_text=True))['data']
|
||||
print(json_resp)
|
||||
assert json_resp['status'] == 'cancelled'
|
||||
|
||||
|
||||
def test_update_invited_user_for_wrong_service_returns_404(notify_api, sample_invited_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = {'status': 'cancelled'}
|
||||
bad_service_id = uuid.uuid4()
|
||||
url = '/service/{0}/invite/{1}'.format(bad_service_id, sample_invited_user.id)
|
||||
auth_header = create_authorization_header(
|
||||
path=url,
|
||||
method='POST',
|
||||
request_body=json.dumps(data)
|
||||
)
|
||||
response = client.post(url, data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert response.status_code == 404
|
||||
json_response = json.loads(response.get_data(as_text=True))['message']
|
||||
assert json_response == 'Invited user not found for service id: {} and invited user id: {}'\
|
||||
.format(bad_service_id, sample_invited_user.id)
|
||||
|
||||
|
||||
def test_update_invited_user_for_invalid_data_returns_400(notify_api, sample_invited_user):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = {'status': 'garbage'}
|
||||
url = '/service/{0}/invite/{1}'.format(sample_invited_user.service_id, sample_invited_user.id)
|
||||
auth_header = create_authorization_header(
|
||||
path=url,
|
||||
method='POST',
|
||||
request_body=json.dumps(data)
|
||||
)
|
||||
response = client.post(url, data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert response.status_code == 400
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import uuid
|
||||
import app.celery.tasks
|
||||
from tests import create_authorization_header
|
||||
from tests.app.conftest import sample_notification, sample_job, sample_service
|
||||
from flask import json
|
||||
from app.models import Service
|
||||
from app.dao.templates_dao import dao_get_all_templates_for_service
|
||||
@@ -47,6 +48,205 @@ def test_get_notifications_empty_result(notify_api, sample_api_key):
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_get_all_notifications(notify_api, sample_notification):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header(
|
||||
service_id=sample_notification.service_id,
|
||||
path='/notifications',
|
||||
method='GET')
|
||||
|
||||
response = client.get(
|
||||
'/notifications',
|
||||
headers=[auth_header])
|
||||
|
||||
notifications = json.loads(response.get_data(as_text=True))
|
||||
assert notifications['notifications'][0]['status'] == 'sent'
|
||||
assert notifications['notifications'][0]['template'] == sample_notification.template.id
|
||||
assert notifications['notifications'][0]['to'] == '+44709123456'
|
||||
assert notifications['notifications'][0]['service'] == str(sample_notification.service_id)
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_get_all_notifications_newest_first(notify_api, notify_db, notify_db_session, sample_email_template):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
notification_1 = sample_notification(notify_db, notify_db_session, sample_email_template.service)
|
||||
notification_2 = sample_notification(notify_db, notify_db_session, sample_email_template.service)
|
||||
notification_3 = sample_notification(notify_db, notify_db_session, sample_email_template.service)
|
||||
|
||||
auth_header = create_authorization_header(
|
||||
service_id=sample_email_template.service_id,
|
||||
path='/notifications',
|
||||
method='GET')
|
||||
|
||||
response = client.get(
|
||||
'/notifications',
|
||||
headers=[auth_header])
|
||||
|
||||
notifications = json.loads(response.get_data(as_text=True))
|
||||
assert len(notifications['notifications']) == 3
|
||||
assert notifications['notifications'][0]['to'] == notification_3.to
|
||||
assert notifications['notifications'][1]['to'] == notification_2.to
|
||||
assert notifications['notifications'][2]['to'] == notification_1.to
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_get_all_notifications_for_service_in_order(notify_api, notify_db, notify_db_session):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
|
||||
service_1 = sample_service(notify_db, notify_db_session, service_name="1")
|
||||
service_2 = sample_service(notify_db, notify_db_session, service_name="2")
|
||||
|
||||
sample_notification(notify_db, notify_db_session, service=service_2)
|
||||
|
||||
notification_1 = sample_notification(notify_db, notify_db_session, service=service_1)
|
||||
notification_2 = sample_notification(notify_db, notify_db_session, service=service_1)
|
||||
notification_3 = sample_notification(notify_db, notify_db_session, service=service_1)
|
||||
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}/notifications'.format(service_1.id),
|
||||
method='GET')
|
||||
|
||||
response = client.get(
|
||||
path='/service/{}/notifications'.format(service_1.id),
|
||||
headers=[auth_header])
|
||||
|
||||
resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(resp['notifications']) == 3
|
||||
assert resp['notifications'][0]['to'] == notification_3.to
|
||||
assert resp['notifications'][1]['to'] == notification_2.to
|
||||
assert resp['notifications'][2]['to'] == notification_1.to
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_get_all_notifications_for_job_in_order(notify_api, notify_db, notify_db_session, sample_service):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
main_job = sample_job(notify_db, notify_db_session, service=sample_service)
|
||||
another_job = sample_job(notify_db, notify_db_session, service=sample_service)
|
||||
from time import sleep
|
||||
|
||||
notification_1 = sample_notification(notify_db, notify_db_session, job=main_job, to_field="1")
|
||||
notification_2 = sample_notification(notify_db, notify_db_session, job=main_job, to_field="2")
|
||||
notification_3 = sample_notification(notify_db, notify_db_session, job=main_job, to_field="3")
|
||||
sample_notification(notify_db, notify_db_session, job=another_job)
|
||||
|
||||
auth_header = create_authorization_header(
|
||||
path='/service/{}/job/{}/notifications'.format(sample_service.id, main_job.id),
|
||||
method='GET')
|
||||
|
||||
response = client.get(
|
||||
path='/service/{}/job/{}/notifications'.format(sample_service.id, main_job.id),
|
||||
headers=[auth_header])
|
||||
|
||||
resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(resp['notifications']) == 3
|
||||
assert resp['notifications'][0]['to'] == notification_3.to
|
||||
assert resp['notifications'][1]['to'] == notification_2.to
|
||||
assert resp['notifications'][2]['to'] == notification_1.to
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_should_not_get_notifications_by_service_with_client_credentials(notify_api, sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header(
|
||||
service_id=sample_api_key.service.id,
|
||||
path='/service/{}/notifications'.format(sample_api_key.service.id),
|
||||
method='GET')
|
||||
|
||||
response = client.get(
|
||||
'/service/{}/notifications'.format(sample_api_key.service.id),
|
||||
headers=[auth_header])
|
||||
|
||||
resp = json.loads(response.get_data(as_text=True))
|
||||
assert response.status_code == 403
|
||||
assert resp['result'] == 'error'
|
||||
assert resp['message'] == 'Forbidden, invalid authentication token provided'
|
||||
|
||||
|
||||
def test_should_not_get_notifications_by_job_and_service_with_client_credentials(notify_api, sample_job):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header(
|
||||
service_id=sample_job.service.id,
|
||||
path='/service/{}/job/{}/notifications'.format(sample_job.service.id, sample_job.id),
|
||||
method='GET')
|
||||
|
||||
response = client.get(
|
||||
'/service/{}/job/{}/notifications'.format(sample_job.service.id, sample_job.id),
|
||||
headers=[auth_header])
|
||||
|
||||
resp = json.loads(response.get_data(as_text=True))
|
||||
assert response.status_code == 403
|
||||
assert resp['result'] == 'error'
|
||||
assert resp['message'] == 'Forbidden, invalid authentication token provided'
|
||||
|
||||
|
||||
def test_should_reject_invalid_page_param(notify_api, sample_email_template):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header(
|
||||
service_id=sample_email_template.service_id,
|
||||
path='/notifications',
|
||||
method='GET')
|
||||
|
||||
response = client.get(
|
||||
'/notifications?page=invalid',
|
||||
headers=[auth_header])
|
||||
|
||||
notifications = json.loads(response.get_data(as_text=True))
|
||||
assert response.status_code == 400
|
||||
assert notifications['result'] == 'error'
|
||||
assert notifications['message'] == 'Invalid page'
|
||||
|
||||
|
||||
def test_should_return_pagination_links(notify_api, notify_db, notify_db_session, sample_email_template):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
notify_api.config['PAGE_SIZE'] = 1
|
||||
|
||||
sample_notification(notify_db, notify_db_session, sample_email_template.service)
|
||||
notification_2 = sample_notification(notify_db, notify_db_session, sample_email_template.service)
|
||||
sample_notification(notify_db, notify_db_session, sample_email_template.service)
|
||||
|
||||
auth_header = create_authorization_header(
|
||||
service_id=sample_email_template.service_id,
|
||||
path='/notifications',
|
||||
method='GET')
|
||||
|
||||
response = client.get(
|
||||
'/notifications?page=2',
|
||||
headers=[auth_header])
|
||||
|
||||
notifications = json.loads(response.get_data(as_text=True))
|
||||
assert len(notifications['notifications']) == 1
|
||||
assert notifications['links']['last'] == '/notifications?page=3'
|
||||
assert notifications['links']['prev'] == '/notifications?page=1'
|
||||
assert notifications['links']['next'] == '/notifications?page=3'
|
||||
assert notifications['notifications'][0]['to'] == notification_2.to
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_get_all_notifications_returns_empty_list(notify_api, sample_api_key):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
auth_header = create_authorization_header(
|
||||
service_id=sample_api_key.service.id,
|
||||
path='/notifications',
|
||||
method='GET')
|
||||
|
||||
response = client.get(
|
||||
'/notifications',
|
||||
headers=[auth_header])
|
||||
|
||||
notifications = json.loads(response.get_data(as_text=True))
|
||||
assert response.status_code == 200
|
||||
assert len(notifications['notifications']) == 0
|
||||
|
||||
|
||||
def test_create_sms_should_reject_if_missing_required_fields(notify_api, sample_api_key, mocker):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
|
||||
@@ -21,7 +21,9 @@ def test_get_user_list(notify_api, notify_db, notify_db_session, sample_service)
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert len(json_resp['data']) == 1
|
||||
sample_user = sample_service.users[0]
|
||||
expected_permissions = ['manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']
|
||||
expected_permissions = [
|
||||
'manage_service', 'send_messages', 'manage_api_keys', 'manage_templates',
|
||||
'manage_team', 'view_activity']
|
||||
fetched = json_resp['data'][0]
|
||||
|
||||
assert sample_user.id == fetched['id']
|
||||
@@ -47,7 +49,9 @@ def test_get_user(notify_api, notify_db, notify_db_session, sample_service):
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
|
||||
expected_permissions = ['manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']
|
||||
expected_permissions = [
|
||||
'manage_service', 'send_messages', 'manage_api_keys', 'manage_templates',
|
||||
'manage_team', 'view_activity']
|
||||
fetched = json_resp['data']
|
||||
|
||||
assert sample_user.id == fetched['id']
|
||||
@@ -180,7 +184,9 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_service):
|
||||
assert User.query.count() == 1
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['email_address'] == new_email
|
||||
expected_permissions = ['manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']
|
||||
expected_permissions = [
|
||||
'manage_service', 'send_messages', 'manage_api_keys', 'manage_templates',
|
||||
'manage_team', 'view_activity']
|
||||
fetched = json_resp['data']
|
||||
|
||||
assert sample_user.id == fetched['id']
|
||||
@@ -272,7 +278,9 @@ def test_get_user_by_email(notify_api, notify_db, notify_db_session, sample_serv
|
||||
assert resp.status_code == 200
|
||||
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
expected_permissions = ['manage_service', 'send_messages', 'manage_api_keys', 'manage_templates']
|
||||
expected_permissions = [
|
||||
'manage_service', 'send_messages', 'manage_api_keys', 'manage_templates',
|
||||
'manage_team', 'view_activity']
|
||||
fetched = json_resp['data']
|
||||
|
||||
assert sample_user.id == fetched['id']
|
||||
|
||||
Reference in New Issue
Block a user