fix template folder dao

This commit is contained in:
Kenneth Kehl
2024-10-14 09:21:06 -07:00
parent c8a758aff6
commit 68b9d8a484
3 changed files with 126 additions and 81 deletions

View File

@@ -239,7 +239,7 @@
"filename": "tests/app/dao/test_services_dao.py",
"hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8",
"is_verified": false,
"line_number": 265,
"line_number": 266,
"is_secret": false
}
],
@@ -384,5 +384,5 @@
}
]
},
"generated_at": "2024-09-27T16:42:53Z"
"generated_at": "2024-10-14T16:21:01Z"
}

View File

@@ -1,6 +1,7 @@
import uuid
import pytest
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from app import db
@@ -57,7 +58,8 @@ def test_get_organization_by_id_gets_correct_organization(notify_db_session):
def test_update_organization(notify_db_session):
create_organization()
organization = Organization.query.one()
stmt = select(Organization)
organization = db.session.execute(stmt).scalars().one()
user = create_user()
email_branding = create_email_branding()
@@ -78,7 +80,8 @@ def test_update_organization(notify_db_session):
dao_update_organization(organization.id, **data)
organization = Organization.query.one()
stmt = select(Organization)
organization = db.session.execute(stmt).scalars().one()
for attribute, value in data.items():
assert getattr(organization, attribute) == value
@@ -102,7 +105,8 @@ def test_update_organization_domains_lowercases(
):
create_organization()
organization = Organization.query.one()
stmt = select(Organization)
organization = db.session.execute(stmt).scalars().one()
# Seed some domains
dao_update_organization(organization.id, domains=["123", "456"])
@@ -121,7 +125,8 @@ def test_update_organization_domains_lowercases_integrity_error(
):
create_organization()
organization = Organization.query.one()
stmt = select(Organization)
organization = db.session.execute(stmt).scalars().one()
# Seed some domains
dao_update_organization(organization.id, domains=["123", "456"])
@@ -175,11 +180,11 @@ def test_update_organization_updates_the_service_org_type_if_org_type_is_provide
assert sample_organization.organization_type == OrganizationType.FEDERAL
assert sample_service.organization_type == OrganizationType.FEDERAL
stmt = select(Service.get_history_model()).filter_by(
id=sample_service.id, version=2
)
assert (
Service.get_history_model()
.query.filter_by(id=sample_service.id, version=2)
.one()
.organization_type
db.session.execute(stmt).scalars().one().organization_type
== OrganizationType.FEDERAL
)
@@ -229,11 +234,11 @@ def test_add_service_to_organization(sample_service, sample_organization):
assert sample_organization.services[0].id == sample_service.id
assert sample_service.organization_type == sample_organization.organization_type
stmt = select(Service.get_history_model()).filter_by(
id=sample_service.id, version=2
)
assert (
Service.get_history_model()
.query.filter_by(id=sample_service.id, version=2)
.one()
.organization_type
db.session.execute(stmt).scalars().one().organization_type
== sample_organization.organization_type
)
assert sample_service.organization_id == sample_organization.id

View File

@@ -6,6 +6,7 @@ from unittest.mock import Mock
import pytest
import sqlalchemy
from freezegun import freeze_time
from sqlalchemy import func, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import NoResultFound
@@ -91,7 +92,7 @@ from tests.app.db import (
def test_create_service(notify_db_session):
user = create_user()
assert Service.query.count() == 0
assert service_query_count() == 0
service = Service(
name="service_name",
email_from="email_from",
@@ -101,7 +102,7 @@ def test_create_service(notify_db_session):
created_by=user,
)
dao_create_service(service, user)
assert Service.query.count() == 1
assert service_query_count() == 1
service_db = Service.query.one()
assert service_db.name == "service_name"
assert service_db.id == service.id
@@ -120,7 +121,7 @@ def test_create_service_with_organization(notify_db_session):
organization_type=OrganizationType.STATE,
domains=["local-authority.gov.uk"],
)
assert Service.query.count() == 0
assert service_query_count() == 0
service = Service(
name="service_name",
email_from="email_from",
@@ -130,9 +131,9 @@ def test_create_service_with_organization(notify_db_session):
created_by=user,
)
dao_create_service(service, user)
assert Service.query.count() == 1
assert service_query_count() == 1
service_db = Service.query.one()
organization = Organization.query.get(organization.id)
organization = db.session.get(Organization, organization.id)
assert service_db.name == "service_name"
assert service_db.id == service.id
assert service_db.email_from == "email_from"
@@ -151,7 +152,7 @@ def test_fetch_service_by_id_with_api_keys(notify_db_session):
organization_type=OrganizationType.STATE,
domains=["local-authority.gov.uk"],
)
assert Service.query.count() == 0
assert service_query_count() == 0
service = Service(
name="service_name",
email_from="email_from",
@@ -161,9 +162,9 @@ def test_fetch_service_by_id_with_api_keys(notify_db_session):
created_by=user,
)
dao_create_service(service, user)
assert Service.query.count() == 1
assert service_query_count() == 1
service_db = Service.query.one()
organization = Organization.query.get(organization.id)
organization = db.session.get(Organization, organization.id)
assert service_db.name == "service_name"
assert service_db.id == service.id
assert service_db.email_from == "email_from"
@@ -183,7 +184,7 @@ def test_fetch_service_by_id_with_api_keys(notify_db_session):
def test_cannot_create_two_services_with_same_name(notify_db_session):
user = create_user()
assert Service.query.count() == 0
assert service_query_count() == 0
service1 = Service(
name="service_name",
email_from="email_from1",
@@ -209,7 +210,7 @@ def test_cannot_create_two_services_with_same_name(notify_db_session):
def test_cannot_create_two_services_with_same_email_from(notify_db_session):
user = create_user()
assert Service.query.count() == 0
assert service_query_count() == 0
service1 = Service(
name="service_name1",
email_from="email_from",
@@ -235,7 +236,7 @@ def test_cannot_create_two_services_with_same_email_from(notify_db_session):
def test_cannot_create_service_with_no_user(notify_db_session):
user = create_user()
assert Service.query.count() == 0
assert service_query_count() == 0
service = Service(
name="service_name",
email_from="email_from",
@@ -258,7 +259,7 @@ def test_should_add_user_to_service(notify_db_session):
created_by=user,
)
dao_create_service(service, user)
assert user in Service.query.first().users
assert user in service_query_first().users
new_user = User(
name="Test User",
email_address="new_user@digital.fake.gov",
@@ -267,7 +268,7 @@ def test_should_add_user_to_service(notify_db_session):
)
save_model_user(new_user, validated_email_access=True)
dao_add_user_to_service(service, new_user)
assert new_user in Service.query.first().users
assert new_user in service_query_first().users
def test_dao_add_user_to_service_sets_folder_permissions(sample_user, sample_service):
@@ -314,7 +315,8 @@ def test_dao_add_user_to_service_raises_error_if_adding_folder_permissions_for_a
other_service_folder = create_template_folder(other_service)
folder_permissions = [str(other_service_folder.id)]
assert ServiceUser.query.count() == 2
stmt = select(ServiceUser)
assert db.session.execute(stmt).scalar() == 2
with pytest.raises(IntegrityError) as e:
dao_add_user_to_service(
@@ -326,7 +328,8 @@ def test_dao_add_user_to_service_raises_error_if_adding_folder_permissions_for_a
'insert or update on table "user_folder_permissions" violates foreign key constraint'
in str(e.value)
)
assert ServiceUser.query.count() == 2
stmt = select(ServiceUser)
assert db.session.execute(stmt).scalar() == 2
def test_should_remove_user_from_service(notify_db_session):
@@ -347,9 +350,9 @@ def test_should_remove_user_from_service(notify_db_session):
)
save_model_user(new_user, validated_email_access=True)
dao_add_user_to_service(service, new_user)
assert new_user in Service.query.first().users
assert new_user in service_query_first().users
dao_remove_user_from_service(service, new_user)
assert new_user not in Service.query.first().users
assert new_user not in service_query_first().users
def test_should_remove_user_from_service_exception(notify_db_session):
@@ -668,8 +671,8 @@ def test_removing_all_permission_returns_service_with_no_permissions(notify_db_s
def test_create_service_creates_a_history_record_with_current_data(notify_db_session):
user = create_user()
assert Service.query.count() == 0
assert Service.get_history_model().query.count() == 0
assert service_query_count() == 0
assert service_history_query_count() == 0
service = Service(
name="service_name",
email_from="email_from",
@@ -678,10 +681,10 @@ def test_create_service_creates_a_history_record_with_current_data(notify_db_ses
created_by=user,
)
dao_create_service(service, user)
assert Service.query.count() == 1
assert Service.get_history_model().query.count() == 1
assert service_query_count() == 1
assert service_history_query_count() == 1
service_from_db = Service.query.first()
service_from_db = service_query_first()
service_history = Service.get_history_model().query.first()
assert service_from_db.id == service_history.id
@@ -692,10 +695,25 @@ def test_create_service_creates_a_history_record_with_current_data(notify_db_ses
assert service_from_db.created_by.id == service_history.created_by_id
def service_query_count():
stmt = select(func.count()).select_from(Service)
return db.session.execute(stmt).scalar() or 0
def service_query_first():
stmt = select(Service)
return db.session.execute(stmt).scalars().first()
def service_history_query_count():
stmt = select(func.count()).select_from(Service.get_history_model())
return db.session.execute(stmt).scalar() or 0
def test_update_service_creates_a_history_record_with_current_data(notify_db_session):
user = create_user()
assert Service.query.count() == 0
assert Service.get_history_model().query.count() == 0
assert service_query_count() == 0
assert service_history_query_count() == 0
service = Service(
name="service_name",
email_from="email_from",
@@ -705,17 +723,17 @@ def test_update_service_creates_a_history_record_with_current_data(notify_db_ses
)
dao_create_service(service, user)
assert Service.query.count() == 1
assert Service.query.first().version == 1
assert Service.get_history_model().query.count() == 1
assert service_query_count() == 1
assert service_query_first().version == 1
assert service_history_query_count() == 1
service.name = "updated_service_name"
dao_update_service(service)
assert Service.query.count() == 1
assert Service.get_history_model().query.count() == 2
assert service_query_count() == 1
assert service_history_query_count() == 2
service_from_db = Service.query.first()
service_from_db = service_query_first()
assert service_from_db.version == 2
@@ -736,8 +754,8 @@ def test_update_service_permission_creates_a_history_record_with_current_data(
notify_db_session,
):
user = create_user()
assert Service.query.count() == 0
assert Service.get_history_model().query.count() == 0
assert service_query_count() == 0
assert service_history_query_count() == 0
service = Service(
name="service_name",
email_from="email_from",
@@ -755,17 +773,17 @@ def test_update_service_permission_creates_a_history_record_with_current_data(
],
)
assert Service.query.count() == 1
assert service_query_count() == 1
service.permissions.append(
ServicePermission(service_id=service.id, permission=ServicePermissionType.EMAIL)
)
dao_update_service(service)
assert Service.query.count() == 1
assert Service.get_history_model().query.count() == 2
assert service_query_count() == 1
assert service_history_query_count() == 2
service_from_db = Service.query.first()
service_from_db = service_query_first()
assert service_from_db.version == 2
@@ -784,10 +802,10 @@ def test_update_service_permission_creates_a_history_record_with_current_data(
service.permissions.remove(permission)
dao_update_service(service)
assert Service.query.count() == 1
assert Service.get_history_model().query.count() == 3
assert service_query_count() == 1
assert service_history_query_count() == 3
service_from_db = Service.query.first()
service_from_db = service_query_first()
assert service_from_db.version == 3
_assert_service_permissions(
service.permissions,
@@ -810,8 +828,8 @@ def test_update_service_permission_creates_a_history_record_with_current_data(
def test_create_service_and_history_is_transactional(notify_db_session):
user = create_user()
assert Service.query.count() == 0
assert Service.get_history_model().query.count() == 0
assert service_query_count() == 0
assert service_history_query_count() == 0
service = Service(
name=None,
email_from="email_from",
@@ -828,8 +846,8 @@ def test_create_service_and_history_is_transactional(notify_db_session):
in str(seeei)
)
assert Service.query.count() == 0
assert Service.get_history_model().query.count() == 0
assert service_query_count() == 0
assert service_history_query_count() == 0
def test_delete_service_and_associated_objects(notify_db_session):
@@ -846,7 +864,8 @@ def test_delete_service_and_associated_objects(notify_db_session):
create_invited_user(service=service)
user.organizations = [organization]
assert ServicePermission.query.count() == len(
stmt = select(ServicePermission)
assert db.session.execute(stmt).scalar() == len(
(
ServicePermissionType.SMS,
ServicePermissionType.EMAIL,
@@ -855,21 +874,41 @@ def test_delete_service_and_associated_objects(notify_db_session):
)
delete_service_and_all_associated_db_objects(service)
assert VerifyCode.query.count() == 0
assert ApiKey.query.count() == 0
assert ApiKey.get_history_model().query.count() == 0
assert Template.query.count() == 0
assert TemplateHistory.query.count() == 0
assert Job.query.count() == 0
assert Notification.query.count() == 0
assert Permission.query.count() == 0
assert User.query.count() == 0
assert InvitedUser.query.count() == 0
assert Service.query.count() == 0
assert Service.get_history_model().query.count() == 0
assert ServicePermission.query.count() == 0
stmt = select(VerifyCode)
assert db.session.execute(stmt).scalar() is None
stmt = select(ApiKey)
assert db.session.execute(stmt).scalar() is None
stmt = select(ApiKey.get_history_model())
assert db.session.execute(stmt).scalar() is None
stmt = select(Template)
assert db.session.execute(stmt).scalar() is None
stmt = select(TemplateHistory)
assert db.session.execute(stmt).scalar() is None
stmt = select(Job)
assert db.session.execute(stmt).scalar() is None
stmt = select(Notification)
assert db.session.execute(stmt).scalar() is None
stmt = select(Permission)
assert db.session.execute(stmt).scalar() is None
stmt = select(User)
assert db.session.execute(stmt).scalar() is None
stmt = select(InvitedUser)
assert db.session.execute(stmt).scalar() is None
stmt = select(ServicePermission)
assert db.session.execute(stmt).scalar() is None
assert service_query_count() == 0
assert service_history_query_count() == 0
# the organization hasn't been deleted
assert Organization.query.count() == 1
stmt = select(Organization)
assert db.session.execute(stmt).scalar() == 1
def test_add_existing_user_to_another_service_doesnot_change_old_permissions(
@@ -956,9 +995,10 @@ def test_fetch_stats_filters_on_service(notify_db_session):
def test_fetch_stats_ignores_historical_notification_data(sample_template):
create_notification_history(template=sample_template)
assert Notification.query.count() == 0
assert NotificationHistory.query.count() == 1
stmt = select(Notification)
assert db.session.execute(stmt).scalar() is None
stmt = select(NotificationHistory)
assert db.session.execute(stmt).scalar() == 1
stats = dao_fetch_todays_stats_for_service(sample_template.service_id)
assert len(stats) == 0
@@ -1316,7 +1356,7 @@ def test_dao_fetch_todays_stats_for_all_services_can_exclude_from_test_key(
def test_dao_suspend_service_with_no_api_keys(notify_db_session):
service = create_service()
dao_suspend_service(service.id)
service = Service.query.get(service.id)
service = db.session.get(Service, service.id)
assert not service.active
assert service.name == service.name
assert service.api_keys == []
@@ -1329,11 +1369,11 @@ def test_dao_suspend_service_marks_service_as_inactive_and_expires_api_keys(
service = create_service()
api_key = create_api_key(service=service)
dao_suspend_service(service.id)
service = Service.query.get(service.id)
service = db.session.get(Service, service.id)
assert not service.active
assert service.name == service.name
api_key = ApiKey.query.get(api_key.id)
api_key = db.session.get(ApiKey, api_key.id)
assert api_key.expiry_date == datetime(2001, 1, 1, 23, 59, 00)
@@ -1344,13 +1384,13 @@ def test_dao_resume_service_marks_service_as_active_and_api_keys_are_still_revok
service = create_service()
api_key = create_api_key(service=service)
dao_suspend_service(service.id)
service = Service.query.get(service.id)
service = db.session.get(Service, service.id)
assert not service.active
dao_resume_service(service.id)
assert Service.query.get(service.id).active
assert db.session.get(Service, service.id).active
api_key = ApiKey.query.get(api_key.id)
api_key = db.session.get(ApiKey, api_key.id)
assert api_key.expiry_date == datetime(2001, 1, 1, 23, 59, 00)