mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-05 10:42:41 -05:00
Add service permissions DAO and refactor user service permission mock
This commit is contained in:
41
app/dao/service_permissions_dao.py
Normal file
41
app/dao/service_permissions_dao.py
Normal file
@@ -0,0 +1,41 @@
|
||||
from sqlalchemy import exc
|
||||
|
||||
from app import db
|
||||
from app.dao.dao_utils import transactional
|
||||
from app.models import Service, ServicePermission, SERVICE_PERMISSION_TYPES
|
||||
|
||||
|
||||
def dao_fetch_service_permissions(service_id):
|
||||
return ServicePermission.query.filter(
|
||||
ServicePermission.service_id == service_id).all()
|
||||
|
||||
|
||||
def make_service_permissions_list(service_id, permissions):
|
||||
arr = []
|
||||
for permission in permissions:
|
||||
if permission not in SERVICE_PERMISSION_TYPES:
|
||||
raise ValueError("'{}' not of service permission type: {}".format(permission, SERVICE_PERMISSION_TYPES))
|
||||
|
||||
service_permission = ServicePermission(service_id=service_id, permission=permission)
|
||||
arr.append(service_permission)
|
||||
|
||||
return arr
|
||||
|
||||
|
||||
@transactional
|
||||
def dao_add_and_commit_service_permissions(service_id, permissions):
|
||||
service_permissions = make_service_permissions_list(service_id, permissions)
|
||||
|
||||
try:
|
||||
db.session.add_all(service_permissions)
|
||||
db.session.commit()
|
||||
except exc.IntegrityError as e:
|
||||
if "duplicate key value violates unique constraint" in str(e.orig):
|
||||
raise ValueError(e.orig)
|
||||
raise
|
||||
|
||||
|
||||
def dao_remove_service_permission(service_id, permission=None):
|
||||
return ServicePermission.query.filter(
|
||||
ServicePermission.service_id == service_id,
|
||||
ServicePermission.permission == permission if permission else None).delete()
|
||||
@@ -1 +1,7 @@
|
||||
Generic single-database configuration.
|
||||
Generic single-database configuration.
|
||||
|
||||
python application.py db migration to generate migration script.
|
||||
|
||||
python application.py db upgrade to upgrade db with script.
|
||||
|
||||
python application.py db downgrade to rollback db changes.
|
||||
|
||||
@@ -15,10 +15,10 @@ import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
service_permission_types = op.create_table('service_permission_types',
|
||||
sa.Column('name', sa.String(length=255), nullable=False),
|
||||
sa.PrimaryKeyConstraint('name'))
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
service_permission_types=op.create_table('service_permission_types',
|
||||
sa.Column('name', sa.String(length=255), nullable=False),
|
||||
sa.PrimaryKeyConstraint('name'))
|
||||
|
||||
op.bulk_insert(service_permission_types,
|
||||
[
|
||||
|
||||
@@ -666,11 +666,11 @@ def sample_permission(notify_db,
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def sample_service_permission(notify_db,
|
||||
notify_db_session,
|
||||
service=None,
|
||||
user=None,
|
||||
permission="manage_settings"):
|
||||
def sample_user_service_permission(notify_db,
|
||||
notify_db_session,
|
||||
service=None,
|
||||
user=None,
|
||||
permission="manage_settings"):
|
||||
if user is None:
|
||||
user = create_user()
|
||||
if service is None:
|
||||
|
||||
82
tests/app/dao/test_service_permissions_dao.py
Normal file
82
tests/app/dao/test_service_permissions_dao.py
Normal file
@@ -0,0 +1,82 @@
|
||||
import pytest
|
||||
|
||||
from app.dao.service_permissions_dao import (
|
||||
dao_fetch_service_permissions, dao_remove_service_permission)
|
||||
from app.models import (
|
||||
EMAIL_TYPE, SMS_TYPE, LETTER_TYPE, INTERNATIONAL_SMS_TYPE, INCOMING_SMS_TYPE, SERVICE_PERMISSION_TYPES)
|
||||
|
||||
from tests.app.db import create_service_permissions, create_service
|
||||
|
||||
|
||||
def test_create_service_permissions(sample_service):
|
||||
service_permission_types = [SMS_TYPE, INTERNATIONAL_SMS_TYPE]
|
||||
|
||||
service_permissions = create_service_permissions(
|
||||
service_id=sample_service.id, permissions=service_permission_types)
|
||||
|
||||
assert len(service_permissions) == len(service_permission_types)
|
||||
assert all(sp.service_id == sample_service.id for sp in service_permissions)
|
||||
assert all(sp.permission in service_permission_types for sp in service_permissions)
|
||||
|
||||
|
||||
def test_fetch_service_permissions_gets_service_permissions(sample_service):
|
||||
service_permission_types = [LETTER_TYPE, EMAIL_TYPE, SMS_TYPE]
|
||||
create_service_permissions(service_id=sample_service.id, permissions=service_permission_types)
|
||||
service_permissions = dao_fetch_service_permissions(sample_service.id)
|
||||
|
||||
assert len(service_permission_types) == len(service_permission_types)
|
||||
assert all(sp.service_id == sample_service.id for sp in service_permissions)
|
||||
assert all(sp.permission in service_permission_types for sp in service_permissions)
|
||||
|
||||
|
||||
def test_add_service_permissions_to_existing_permissions(sample_service):
|
||||
service_permission_types_1 = [EMAIL_TYPE, INCOMING_SMS_TYPE]
|
||||
service_permission_types_2 = [LETTER_TYPE, INTERNATIONAL_SMS_TYPE, SMS_TYPE]
|
||||
|
||||
create_service_permissions(
|
||||
service_id=sample_service.id, permissions=service_permission_types_1)
|
||||
create_service_permissions(
|
||||
service_id=sample_service.id, permissions=service_permission_types_2)
|
||||
|
||||
permissions = dao_fetch_service_permissions(sample_service.id)
|
||||
|
||||
assert len(permissions) == len(service_permission_types_1 + service_permission_types_2)
|
||||
|
||||
|
||||
def test_create_invalid_service_permissions_raises_error(sample_service):
|
||||
service_permission_types = ['invalid']
|
||||
|
||||
with pytest.raises(ValueError) as e:
|
||||
service_permissions = create_service_permissions(
|
||||
service_id=sample_service.id, permissions=service_permission_types)
|
||||
|
||||
assert "'invalid' not of service permission type: " + str(SERVICE_PERMISSION_TYPES) in str(e.value)
|
||||
|
||||
|
||||
def test_remove_service_permission(sample_service):
|
||||
service_permission_types = [EMAIL_TYPE, INCOMING_SMS_TYPE]
|
||||
service_permission_type_to_remove = EMAIL_TYPE
|
||||
service_permission_type_remaining = INCOMING_SMS_TYPE
|
||||
|
||||
service_permissions = create_service_permissions(
|
||||
service_id=sample_service.id, permissions=service_permission_types)
|
||||
|
||||
dao_remove_service_permission(sample_service.id, service_permission_type_to_remove)
|
||||
|
||||
permissions = dao_fetch_service_permissions(sample_service.id)
|
||||
assert len(permissions) == 1
|
||||
assert permissions[0].permission == service_permission_type_remaining
|
||||
assert permissions[0].service_id == sample_service.id
|
||||
|
||||
|
||||
def test_adding_duplicate_service_id_permission_raises_value_error(sample_service):
|
||||
service_permission_types = [EMAIL_TYPE, INCOMING_SMS_TYPE]
|
||||
service_permission_types_with_duplicate_email_type = [LETTER_TYPE, EMAIL_TYPE]
|
||||
|
||||
with pytest.raises(ValueError) as e:
|
||||
create_service_permissions(
|
||||
service_id=sample_service.id, permissions=service_permission_types)
|
||||
create_service_permissions(
|
||||
service_id=sample_service.id, permissions=service_permission_types_with_duplicate_email_type)
|
||||
|
||||
assert "duplicate key value violates unique constraint \"service_permissions_pkey\"" in str(e.value)
|
||||
@@ -2,11 +2,14 @@ from datetime import datetime
|
||||
import uuid
|
||||
|
||||
from app.dao.jobs_dao import dao_create_job
|
||||
from app.models import Service, User, Template, Notification, SMS_TYPE, KEY_TYPE_NORMAL, Job
|
||||
from app.models import (Service, User, Template, Notification, EMAIL_TYPE, LETTER_TYPE,
|
||||
SMS_TYPE, KEY_TYPE_NORMAL, Job, ServicePermission)
|
||||
from app.dao.users_dao import save_model_user
|
||||
from app.dao.notifications_dao import dao_create_notification
|
||||
from app.dao.templates_dao import dao_create_template
|
||||
from app.dao.services_dao import dao_create_service
|
||||
from app.dao.service_permissions_dao import (
|
||||
dao_add_and_commit_service_permissions, make_service_permissions_list)
|
||||
|
||||
|
||||
def create_user(mobile_number="+447700900986", email="notify@digital.cabinet-office.gov.uk"):
|
||||
@@ -141,3 +144,12 @@ def create_job(template,
|
||||
job = Job(**data)
|
||||
dao_create_job(job)
|
||||
return job
|
||||
|
||||
|
||||
def create_service_permissions(service_id, permissions=[EMAIL_TYPE, LETTER_TYPE]):
|
||||
dao_add_and_commit_service_permissions(
|
||||
service_id if service_id else create_service().id, permissions)
|
||||
|
||||
service_permissions = ServicePermission.query.all()
|
||||
|
||||
return service_permissions
|
||||
|
||||
@@ -15,7 +15,7 @@ from tests import create_authorization_header
|
||||
from tests.app.db import create_template
|
||||
from tests.app.conftest import (
|
||||
sample_service as create_service,
|
||||
sample_service_permission as create_service_permission,
|
||||
sample_user_service_permission as create_user_service_permission,
|
||||
sample_notification as create_sample_notification,
|
||||
sample_notification_history as create_notification_history,
|
||||
sample_notification_with_job
|
||||
@@ -941,12 +941,12 @@ def test_add_unknown_user_to_service_returns404(notify_api, notify_db, notify_db
|
||||
assert result['message'] == expected_message
|
||||
|
||||
|
||||
def test_remove_user_from_service(notify_api, notify_db, notify_db_session, sample_service_permission):
|
||||
def test_remove_user_from_service(notify_api, notify_db, notify_db_session, sample_user_service_permission):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
second_user = create_user(email="new@digital.cabinet-office.gov.uk")
|
||||
# Simulates successfully adding a user to the service
|
||||
second_permission = create_service_permission(
|
||||
second_permission = create_user_service_permission(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
user=second_user)
|
||||
@@ -961,13 +961,13 @@ def test_remove_user_from_service(notify_api, notify_db, notify_db_session, samp
|
||||
assert resp.status_code == 204
|
||||
|
||||
|
||||
def test_remove_user_from_service(notify_api, notify_db, notify_db_session, sample_service_permission):
|
||||
def test_remove_user_from_service(notify_api, notify_db, notify_db_session, sample_user_service_permission):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
second_user = create_user(email="new@digital.cabinet-office.gov.uk")
|
||||
endpoint = url_for(
|
||||
'service.remove_user_from_service',
|
||||
service_id=str(sample_service_permission.service.id),
|
||||
service_id=str(sample_user_service_permission.service.id),
|
||||
user_id=str(second_user.id))
|
||||
auth_header = create_authorization_header()
|
||||
resp = client.delete(
|
||||
@@ -979,13 +979,13 @@ def test_remove_user_from_service(notify_api, notify_db, notify_db_session, samp
|
||||
def test_cannot_remove_only_user_from_service(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_service_permission):
|
||||
sample_user_service_permission):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
endpoint = url_for(
|
||||
'service.remove_user_from_service',
|
||||
service_id=str(sample_service_permission.service.id),
|
||||
user_id=str(sample_service_permission.user.id))
|
||||
service_id=str(sample_user_service_permission.service.id),
|
||||
user_id=str(sample_user_service_permission.user.id))
|
||||
auth_header = create_authorization_header()
|
||||
resp = client.delete(
|
||||
endpoint,
|
||||
|
||||
@@ -290,13 +290,13 @@ def test_get_user_by_email_bad_url_returns_404(client, sample_user):
|
||||
assert json_resp['message'] == 'Invalid request. Email query string param required'
|
||||
|
||||
|
||||
def test_get_user_with_permissions(client, sample_service_permission):
|
||||
def test_get_user_with_permissions(client, sample_user_service_permission):
|
||||
header = create_authorization_header()
|
||||
response = client.get(url_for('user.get_user', user_id=str(sample_service_permission.user.id)),
|
||||
response = client.get(url_for('user.get_user', user_id=str(sample_user_service_permission.user.id)),
|
||||
headers=[header])
|
||||
assert response.status_code == 200
|
||||
permissions = json.loads(response.get_data(as_text=True))['data']['permissions']
|
||||
assert sample_service_permission.permission in permissions[str(sample_service_permission.service.id)]
|
||||
assert sample_user_service_permission.permission in permissions[str(sample_user_service_permission.service.id)]
|
||||
|
||||
|
||||
def test_set_user_permissions(client, sample_user, sample_service):
|
||||
|
||||
@@ -77,7 +77,8 @@ def notify_db_session(notify_db):
|
||||
"provider_details_history",
|
||||
"template_process_type",
|
||||
"dvla_organisation",
|
||||
"notification_status_types"]:
|
||||
"notification_status_types",
|
||||
"service_permission_types"]:
|
||||
notify_db.engine.execute(tbl.delete())
|
||||
notify_db.session.commit()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user