mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-03 18:01:08 -05:00
Refactored to handle single service permission
This commit is contained in:
@@ -10,29 +10,14 @@ def dao_fetch_service_permissions(service_id):
|
|||||||
ServicePermission.service_id == service_id).all()
|
ServicePermission.service_id == service_id).all()
|
||||||
|
|
||||||
|
|
||||||
def make_service_permissions_list(service_id, permissions):
|
|
||||||
arr = []
|
|
||||||
for permission in permissions:
|
|
||||||
if permission not in SERVICE_PERMISSION_TYPES:
|
|
||||||
raise ValueError("'{}' not of service permission type: {}".format(permission, SERVICE_PERMISSION_TYPES))
|
|
||||||
|
|
||||||
service_permission = ServicePermission(service_id=service_id, permission=permission)
|
|
||||||
arr.append(service_permission)
|
|
||||||
|
|
||||||
return arr
|
|
||||||
|
|
||||||
|
|
||||||
@transactional
|
@transactional
|
||||||
def dao_add_and_commit_service_permissions(service_id, permissions):
|
def dao_add_and_commit_service_permission(service_id, permission):
|
||||||
service_permissions = make_service_permissions_list(service_id, permissions)
|
if permission not in SERVICE_PERMISSION_TYPES:
|
||||||
|
raise ValueError("'{}' not of service permission type: {}".format(permission, SERVICE_PERMISSION_TYPES))
|
||||||
|
|
||||||
try:
|
service_permission = ServicePermission(service_id=service_id, permission=permission)
|
||||||
db.session.add_all(service_permissions)
|
|
||||||
db.session.commit()
|
db.session.add(service_permission)
|
||||||
except exc.IntegrityError as e:
|
|
||||||
if "duplicate key value violates unique constraint" in str(e.orig):
|
|
||||||
raise ValueError(e.orig)
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def dao_remove_service_permission(service_id, permission=None):
|
def dao_remove_service_permission(service_id, permission=None):
|
||||||
|
|||||||
@@ -5,61 +5,47 @@ from app.dao.service_permissions_dao import (
|
|||||||
from app.models import (
|
from app.models import (
|
||||||
EMAIL_TYPE, SMS_TYPE, LETTER_TYPE, INTERNATIONAL_SMS_TYPE, INCOMING_SMS_TYPE, SERVICE_PERMISSION_TYPES)
|
EMAIL_TYPE, SMS_TYPE, LETTER_TYPE, INTERNATIONAL_SMS_TYPE, INCOMING_SMS_TYPE, SERVICE_PERMISSION_TYPES)
|
||||||
|
|
||||||
from tests.app.db import create_service_permissions, create_service
|
from tests.app.db import create_service_permission, create_service
|
||||||
|
|
||||||
|
|
||||||
def test_create_service_permissions(sample_service):
|
def test_create_service_permission(sample_service):
|
||||||
service_permission_types = [SMS_TYPE, INTERNATIONAL_SMS_TYPE]
|
service_permission_type = SMS_TYPE
|
||||||
|
|
||||||
service_permissions = create_service_permissions(
|
service_permission = create_service_permission(
|
||||||
service_id=sample_service.id, permissions=service_permission_types)
|
service_id=sample_service.id, permission=service_permission_type)
|
||||||
|
|
||||||
|
assert len(service_permission) == 1
|
||||||
|
assert all(sp.service_id == sample_service.id for sp in service_permission)
|
||||||
|
assert all(sp.permission in service_permission_type for sp in service_permission)
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_service_permissions_gets_service_permissions(sample_service):
|
||||||
|
service_permission_types = [LETTER_TYPE, EMAIL_TYPE, SMS_TYPE]
|
||||||
|
for spt in service_permission_types:
|
||||||
|
create_service_permission(service_id=sample_service.id, permission=spt)
|
||||||
|
service_permissions = dao_fetch_service_permissions(sample_service.id)
|
||||||
|
|
||||||
assert len(service_permissions) == len(service_permission_types)
|
assert len(service_permissions) == len(service_permission_types)
|
||||||
assert all(sp.service_id == sample_service.id for sp in service_permissions)
|
assert all(sp.service_id == sample_service.id for sp in service_permissions)
|
||||||
assert all(sp.permission in service_permission_types for sp in service_permissions)
|
assert all(sp.permission in service_permission_types for sp in service_permissions)
|
||||||
|
|
||||||
|
|
||||||
def test_fetch_service_permissions_gets_service_permissions(sample_service):
|
|
||||||
service_permission_types = [LETTER_TYPE, EMAIL_TYPE, SMS_TYPE]
|
|
||||||
create_service_permissions(service_id=sample_service.id, permissions=service_permission_types)
|
|
||||||
service_permissions = dao_fetch_service_permissions(sample_service.id)
|
|
||||||
|
|
||||||
assert len(service_permission_types) == len(service_permission_types)
|
|
||||||
assert all(sp.service_id == sample_service.id for sp in service_permissions)
|
|
||||||
assert all(sp.permission in service_permission_types for sp in service_permissions)
|
|
||||||
|
|
||||||
|
|
||||||
def test_add_service_permissions_to_existing_permissions(sample_service):
|
|
||||||
service_permission_types_1 = [EMAIL_TYPE, INCOMING_SMS_TYPE]
|
|
||||||
service_permission_types_2 = [LETTER_TYPE, INTERNATIONAL_SMS_TYPE, SMS_TYPE]
|
|
||||||
|
|
||||||
create_service_permissions(
|
|
||||||
service_id=sample_service.id, permissions=service_permission_types_1)
|
|
||||||
create_service_permissions(
|
|
||||||
service_id=sample_service.id, permissions=service_permission_types_2)
|
|
||||||
|
|
||||||
permissions = dao_fetch_service_permissions(sample_service.id)
|
|
||||||
|
|
||||||
assert len(permissions) == len(service_permission_types_1 + service_permission_types_2)
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_invalid_service_permissions_raises_error(sample_service):
|
def test_create_invalid_service_permissions_raises_error(sample_service):
|
||||||
service_permission_types = ['invalid']
|
service_permission_type = 'invalid'
|
||||||
|
|
||||||
with pytest.raises(ValueError) as e:
|
with pytest.raises(ValueError) as e:
|
||||||
service_permissions = create_service_permissions(
|
create_service_permission(service_id=sample_service.id, permission=service_permission_type)
|
||||||
service_id=sample_service.id, permissions=service_permission_types)
|
|
||||||
|
|
||||||
assert "'invalid' not of service permission type: " + str(SERVICE_PERMISSION_TYPES) in str(e.value)
|
assert "'invalid' not of service permission type: {}".format(str(SERVICE_PERMISSION_TYPES)) in str(e.value)
|
||||||
|
|
||||||
|
|
||||||
def test_remove_service_permission(sample_service):
|
def test_remove_service_permission(sample_service):
|
||||||
service_permission_types = [EMAIL_TYPE, INCOMING_SMS_TYPE]
|
service_permission_types_to_create = [EMAIL_TYPE, INCOMING_SMS_TYPE]
|
||||||
service_permission_type_to_remove = EMAIL_TYPE
|
service_permission_type_to_remove = EMAIL_TYPE
|
||||||
service_permission_type_remaining = INCOMING_SMS_TYPE
|
service_permission_type_remaining = INCOMING_SMS_TYPE
|
||||||
|
|
||||||
service_permissions = create_service_permissions(
|
for spt in service_permission_types_to_create:
|
||||||
service_id=sample_service.id, permissions=service_permission_types)
|
create_service_permission(service_id=sample_service.id, permission=spt)
|
||||||
|
|
||||||
dao_remove_service_permission(sample_service.id, service_permission_type_to_remove)
|
dao_remove_service_permission(sample_service.id, service_permission_type_to_remove)
|
||||||
|
|
||||||
@@ -67,16 +53,3 @@ def test_remove_service_permission(sample_service):
|
|||||||
assert len(permissions) == 1
|
assert len(permissions) == 1
|
||||||
assert permissions[0].permission == service_permission_type_remaining
|
assert permissions[0].permission == service_permission_type_remaining
|
||||||
assert permissions[0].service_id == sample_service.id
|
assert permissions[0].service_id == sample_service.id
|
||||||
|
|
||||||
|
|
||||||
def test_adding_duplicate_service_id_permission_raises_value_error(sample_service):
|
|
||||||
service_permission_types = [EMAIL_TYPE, INCOMING_SMS_TYPE]
|
|
||||||
service_permission_types_with_duplicate_email_type = [LETTER_TYPE, EMAIL_TYPE]
|
|
||||||
|
|
||||||
with pytest.raises(ValueError) as e:
|
|
||||||
create_service_permissions(
|
|
||||||
service_id=sample_service.id, permissions=service_permission_types)
|
|
||||||
create_service_permissions(
|
|
||||||
service_id=sample_service.id, permissions=service_permission_types_with_duplicate_email_type)
|
|
||||||
|
|
||||||
assert "duplicate key value violates unique constraint \"service_permissions_pkey\"" in str(e.value)
|
|
||||||
|
|||||||
@@ -8,8 +8,7 @@ from app.dao.users_dao import save_model_user
|
|||||||
from app.dao.notifications_dao import dao_create_notification
|
from app.dao.notifications_dao import dao_create_notification
|
||||||
from app.dao.templates_dao import dao_create_template
|
from app.dao.templates_dao import dao_create_template
|
||||||
from app.dao.services_dao import dao_create_service
|
from app.dao.services_dao import dao_create_service
|
||||||
from app.dao.service_permissions_dao import (
|
from app.dao.service_permissions_dao import dao_add_and_commit_service_permission
|
||||||
dao_add_and_commit_service_permissions, make_service_permissions_list)
|
|
||||||
|
|
||||||
|
|
||||||
def create_user(mobile_number="+447700900986", email="notify@digital.cabinet-office.gov.uk"):
|
def create_user(mobile_number="+447700900986", email="notify@digital.cabinet-office.gov.uk"):
|
||||||
@@ -146,9 +145,9 @@ def create_job(template,
|
|||||||
return job
|
return job
|
||||||
|
|
||||||
|
|
||||||
def create_service_permissions(service_id, permissions=[EMAIL_TYPE, LETTER_TYPE]):
|
def create_service_permission(service_id, permission=EMAIL_TYPE):
|
||||||
dao_add_and_commit_service_permissions(
|
dao_add_and_commit_service_permission(
|
||||||
service_id if service_id else create_service().id, permissions)
|
service_id if service_id else create_service().id, permission)
|
||||||
|
|
||||||
service_permissions = ServicePermission.query.all()
|
service_permissions = ServicePermission.query.all()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user