This commit is contained in:
Kenneth Kehl
2024-10-31 11:32:27 -07:00
parent 6b6bc2b4e7
commit bc7180185b
9 changed files with 74 additions and 28 deletions

View File

@@ -42,12 +42,17 @@ def test_move_notifications_does_nothing_if_notification_history_row_already_exi
1,
)
assert Notification.query.count() == 0
assert _get_notification_count() == 0
history = NotificationHistory.query.all()
assert len(history) == 1
assert history[0].status == NotificationStatus.DELIVERED
def _get_notification_count():
stmt = select(func.count()).select_from(Notification)
return db.session.execute(stmt).scalar() or 0
def test_move_notifications_only_moves_notifications_older_than_provided_timestamp(
sample_template,
):
@@ -172,8 +177,10 @@ def test_move_notifications_just_deletes_test_key_notifications(sample_template)
assert result == 2
assert Notification.query.count() == 0
assert NotificationHistory.query.count() == 2
assert _get_notification_count() == 0
stmt = select(func.count()).select_from(NotificationHistory)
count = db.session.execute(stmt).scalar() or 0
assert count == 2
stmt = (
select(func.count())
.select_from(NotificationHistory)

View File

@@ -1,9 +1,14 @@
from sqlalchemy import func, select
from app import db
from app.dao.events_dao import dao_create_event
from app.models import Event
def test_create_event(notify_db_session):
assert Event.query.count() == 0
stmt = select(func.count()).select_from(Event)
count = db.session.execute(stmt).scalar() or 0
assert count == 0
data = {
"event_type": "sucessful_login",
"data": {"something": "random", "in_fact": "could be anything"},
@@ -12,6 +17,8 @@ def test_create_event(notify_db_session):
event = Event(**data)
dao_create_event(event)
assert Event.query.count() == 1
stmt = select(func.count()).select_from(Event)
count = db.session.execute(stmt).scalar() or 0
assert count == 1
event_from_db = Event.query.first()
assert event == event_from_db

View File

@@ -1,6 +1,8 @@
import pytest
from flask import current_app
from sqlalchemy import func, select
from app import db
from app.dao.services_dao import dao_add_user_to_service
from app.enums import NotificationType, TemplateType
from app.models import Notification
@@ -23,7 +25,9 @@ def test_send_notification_to_service_users_persists_notifications_correctly(
notification = Notification.query.one()
assert Notification.query.count() == 1
stmt = select(func.count()).select_from(Notification)
count = db.session.execute(stmt).scalar() or 0
assert count == 1
assert notification.to == "1"
assert str(notification.service_id) == current_app.config["NOTIFY_SERVICE_ID"]
assert notification.template.id == template.id
@@ -89,4 +93,6 @@ def test_send_notification_to_service_users_sends_to_active_users_only(
send_notification_to_service_users(service_id=service.id, template_id=template.id)
assert Notification.query.count() == 2
stmt = select(func.count()).select_from(Notification)
count = db.session.execute(stmt).scalar() or 0
assert count == 2

View File

@@ -1,7 +1,9 @@
import uuid
import pytest
from sqlalchemy import func, select
from app import db
from app.dao.service_user_dao import dao_get_service_user
from app.models import TemplateFolder
from tests.app.db import (
@@ -286,7 +288,9 @@ def test_delete_template_folder_fails_if_folder_has_subfolders(
assert resp == {"result": "error", "message": "Folder is not empty"}
assert TemplateFolder.query.count() == 2
stmt = select(func.count()).select_from(TemplateFolder)
count = db.session.execute(stmt).scalar() or 0
assert count == 2
def test_delete_template_folder_fails_if_folder_contains_templates(
@@ -304,7 +308,9 @@ def test_delete_template_folder_fails_if_folder_contains_templates(
assert resp == {"result": "error", "message": "Folder is not empty"}
assert TemplateFolder.query.count() == 1
stmt = select(func.count()).select_from(TemplateFolder)
count = db.session.execute(stmt).scalar() or 0
assert count == 1
@pytest.mark.parametrize(

View File

@@ -414,17 +414,20 @@ def test_create_service_command(notify_db_session, notify_api):
user = User.query.first()
service_count = Service.query.count()
stmt = select(func.count()).select_from(Service)
service_count = db.session.execute(stmt).scalar() or 0
# run the command
result = notify_api.test_cli_runner().invoke(
notify_api.test_cli_runner().invoke(
create_new_service,
["-e", "somebody@fake.gov", "-n", "Fake Service", "-c", user.id],
)
print(result)
# there should be one more service
assert Service.query.count() == service_count + 1
stmt = select(func.count()).select_from(Service)
count = db.session.execute(stmt).scalar() or 0
assert count == service_count + 1
# that service should be the one we added
stmt = select(Service).where(Service.name == "Fake Service")

View File

@@ -6,7 +6,9 @@ from unittest import mock
import pytest
from flask import current_app
from freezegun import freeze_time
from sqlalchemy import func, select
from app import db
from app.dao.service_user_dao import dao_get_service_user, dao_update_service_user
from app.enums import AuthType, KeyType, NotificationType, PermissionType
from app.models import Notification, Permission, User
@@ -153,12 +155,17 @@ def test_post_user_missing_attribute_email(admin_request, notify_db_session):
}
json_resp = admin_request.post("user.create_user", _data=data, _expected_status=400)
assert User.query.count() == 0
assert _get_user_count() == 0
assert {"email_address": ["Missing data for required field."]} == json_resp[
"message"
]
def _get_user_count():
stmt = select(func.count()).select_from(User)
return db.session.execute(stmt).scalar() or 0
def test_create_user_missing_attribute_password(admin_request, notify_db_session):
"""
Tests POST endpoint '/' missing attribute password.
@@ -174,7 +181,7 @@ def test_create_user_missing_attribute_password(admin_request, notify_db_session
"permissions": {},
}
json_resp = admin_request.post("user.create_user", _data=data, _expected_status=400)
assert User.query.count() == 0
assert _get_user_count() == 0
assert {"password": ["Missing data for required field."]} == json_resp["message"]
@@ -512,8 +519,13 @@ def test_set_user_permissions_remove_old(admin_request, sample_user, sample_serv
_expected_status=204,
)
query = Permission.query.filter_by(user=sample_user)
assert query.count() == 1
query = (
select(func.count())
.select_from(Permission)
.where(Permission.user == sample_user)
)
count = db.session.execute(query).scalar() or 0
assert count == 1
assert query.first().permission == PermissionType.MANAGE_SETTINGS