mirror of
https://github.com/GSA/notifications-api.git
synced 2026-04-24 11:20:11 -04:00
more
This commit is contained in:
@@ -3,8 +3,10 @@ from unittest.mock import ANY, call
|
||||
|
||||
import pytest
|
||||
from freezegun import freeze_time
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from app import db
|
||||
from app.celery import nightly_tasks
|
||||
from app.celery.nightly_tasks import (
|
||||
_delete_notifications_older_than_retention_by_type,
|
||||
@@ -230,7 +232,7 @@ def test_save_daily_notification_processing_time(
|
||||
|
||||
save_daily_notification_processing_time(date_provided)
|
||||
|
||||
persisted_to_db = FactProcessingTime.query.all()
|
||||
persisted_to_db = db.session.execute(select(FactProcessingTime)).scalars().all()
|
||||
assert len(persisted_to_db) == 1
|
||||
assert persisted_to_db[0].local_date == date(2021, 1, 17)
|
||||
assert persisted_to_db[0].messages_total == 2
|
||||
@@ -269,7 +271,7 @@ def test_save_daily_notification_processing_time_when_in_est(
|
||||
|
||||
save_daily_notification_processing_time(date_provided)
|
||||
|
||||
persisted_to_db = FactProcessingTime.query.all()
|
||||
persisted_to_db = db.session.execute(select(FactProcessingTime)).scalars().all()
|
||||
assert len(persisted_to_db) == 1
|
||||
assert persisted_to_db[0].local_date == date(2021, 4, 17)
|
||||
assert persisted_to_db[0].messages_total == 2
|
||||
|
||||
@@ -464,7 +464,7 @@ def test_create_nightly_notification_status_for_service_and_day(notify_db_sessio
|
||||
create_notification(template=first_template)
|
||||
create_notification_history(template=second_template)
|
||||
|
||||
assert len(FactNotificationStatus.query.all()) == 0
|
||||
assert len(db.session.execute(select(FactNotificationStatus)).scalars().all()) == 0
|
||||
|
||||
create_nightly_notification_status_for_service_and_day(
|
||||
str(process_day),
|
||||
@@ -540,7 +540,7 @@ def test_create_nightly_notification_status_for_service_and_day_overwrites_old_d
|
||||
NotificationType.SMS,
|
||||
)
|
||||
|
||||
new_fact_data = FactNotificationStatus.query.all()
|
||||
new_fact_data = db.session.execute(select(FactNotificationStatus)).scalars().all()
|
||||
|
||||
assert len(new_fact_data) == 1
|
||||
assert new_fact_data[0].notification_count == 1
|
||||
|
||||
@@ -34,7 +34,9 @@ def test_save_api_key_should_create_new_api_key_and_history(sample_service):
|
||||
assert all_api_keys[0] == api_key
|
||||
assert api_key.version == 1
|
||||
|
||||
all_history = api_key.get_history_model().query.all()
|
||||
all_history = (
|
||||
db.session.execute(select(api_key.get_history_model())).scalars().all()
|
||||
)
|
||||
assert len(all_history) == 1
|
||||
assert all_history[0].id == api_key.id
|
||||
assert all_history[0].version == api_key.version
|
||||
@@ -51,7 +53,9 @@ def test_expire_api_key_should_update_the_api_key_and_create_history_record(
|
||||
assert all_api_keys[0].id == sample_api_key.id
|
||||
assert all_api_keys[0].service_id == sample_api_key.service_id
|
||||
|
||||
all_history = sample_api_key.get_history_model().query.all()
|
||||
all_history = (
|
||||
db.session.execute(select(sample_api_key.get_history_model())).scalars().all()
|
||||
)
|
||||
assert len(all_history) == 2
|
||||
assert all_history[0].id == sample_api_key.id
|
||||
assert all_history[1].id == sample_api_key.id
|
||||
@@ -123,7 +127,7 @@ def test_save_api_key_can_create_key_with_same_name_if_other_is_expired(sample_s
|
||||
}
|
||||
)
|
||||
save_model_api_key(api_key)
|
||||
keys = ApiKey.query.all()
|
||||
keys = db.session.execute(select(ApiKey)).scalars().all()
|
||||
assert len(keys) == 2
|
||||
|
||||
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
from datetime import datetime
|
||||
|
||||
from freezegun import freeze_time
|
||||
from sqlalchemy import select
|
||||
|
||||
from app import db
|
||||
from app.dao import fact_processing_time_dao
|
||||
from app.dao.fact_processing_time_dao import (
|
||||
get_processing_time_percentage_for_date_range,
|
||||
@@ -19,7 +21,7 @@ def test_insert_update_processing_time(notify_db_session):
|
||||
|
||||
fact_processing_time_dao.insert_update_processing_time(data)
|
||||
|
||||
result = FactProcessingTime.query.all()
|
||||
result = db.session.execute(select(FactProcessingTime)).scalars().all()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].local_date == datetime(2021, 2, 22).date()
|
||||
@@ -36,7 +38,7 @@ def test_insert_update_processing_time(notify_db_session):
|
||||
with freeze_time("2021-02-23 13:23:33"):
|
||||
fact_processing_time_dao.insert_update_processing_time(data)
|
||||
|
||||
result = FactProcessingTime.query.all()
|
||||
result = db.session.execute(select(FactProcessingTime)).scalars().all()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].local_date == datetime(2021, 2, 22).date()
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from app import encryption
|
||||
from app import db, encryption
|
||||
from app.dao.service_callback_api_dao import (
|
||||
get_service_callback_api,
|
||||
get_service_delivery_status_callback_api_for_service,
|
||||
@@ -25,7 +26,7 @@ def test_save_service_callback_api(sample_service):
|
||||
|
||||
save_service_callback_api(service_callback_api)
|
||||
|
||||
results = ServiceCallbackApi.query.all()
|
||||
results = db.session.execute(select(ServiceCallbackApi)).scalars().all()
|
||||
assert len(results) == 1
|
||||
callback_api = results[0]
|
||||
assert callback_api.id is not None
|
||||
@@ -114,7 +115,7 @@ def test_update_service_callback_api(sample_service):
|
||||
)
|
||||
|
||||
save_service_callback_api(service_callback_api)
|
||||
results = ServiceCallbackApi.query.all()
|
||||
results = db.session.execute(select(ServiceCallbackApi)).scalars().all()
|
||||
assert len(results) == 1
|
||||
saved_callback_api = results[0]
|
||||
|
||||
@@ -123,7 +124,7 @@ def test_update_service_callback_api(sample_service):
|
||||
updated_by_id=sample_service.users[0].id,
|
||||
url="https://some_service/changed_url",
|
||||
)
|
||||
updated_results = ServiceCallbackApi.query.all()
|
||||
updated_results = db.session.execute(select(ServiceCallbackApi)).scalars().all()
|
||||
assert len(updated_results) == 1
|
||||
updated = updated_results[0]
|
||||
assert updated.id is not None
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from app import encryption
|
||||
from app import db, encryption
|
||||
from app.dao.service_inbound_api_dao import (
|
||||
get_service_inbound_api,
|
||||
get_service_inbound_api_for_service,
|
||||
@@ -24,7 +25,7 @@ def test_save_service_inbound_api(sample_service):
|
||||
|
||||
save_service_inbound_api(service_inbound_api)
|
||||
|
||||
results = ServiceInboundApi.query.all()
|
||||
results = db.session.execute(select(ServiceInboundApi)).scalars().all()
|
||||
assert len(results) == 1
|
||||
inbound_api = results[0]
|
||||
assert inbound_api.id is not None
|
||||
@@ -68,7 +69,7 @@ def test_update_service_inbound_api(sample_service):
|
||||
)
|
||||
|
||||
save_service_inbound_api(service_inbound_api)
|
||||
results = ServiceInboundApi.query.all()
|
||||
results = db.session.execute(select(ServiceInboundApi)).scalars().all()
|
||||
assert len(results) == 1
|
||||
saved_inbound_api = results[0]
|
||||
|
||||
@@ -77,7 +78,7 @@ def test_update_service_inbound_api(sample_service):
|
||||
updated_by_id=sample_service.users[0].id,
|
||||
url="https://some_service/changed_url",
|
||||
)
|
||||
updated_results = ServiceInboundApi.query.all()
|
||||
updated_results = db.session.execute(select(ServiceInboundApi)).scalars().all()
|
||||
assert len(updated_results) == 1
|
||||
updated = updated_results[0]
|
||||
assert updated.id is not None
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
||||
from app import db
|
||||
from app.enums import BrandType
|
||||
from app.models import EmailBranding
|
||||
from tests.app.db import create_email_branding
|
||||
@@ -198,7 +200,7 @@ def test_post_update_email_branding_updates_field(
|
||||
email_branding_id=email_branding_id,
|
||||
)
|
||||
|
||||
email_branding = EmailBranding.query.all()
|
||||
email_branding = db.session.execute(select(EmailBranding)).scalars().all()
|
||||
|
||||
assert len(email_branding) == 1
|
||||
assert str(email_branding[0].id) == email_branding_id
|
||||
@@ -231,7 +233,7 @@ def test_post_update_email_branding_updates_field_with_text(
|
||||
email_branding_id=email_branding_id,
|
||||
)
|
||||
|
||||
email_branding = EmailBranding.query.all()
|
||||
email_branding = db.session.execute(select(EmailBranding)).scalars().all()
|
||||
|
||||
assert len(email_branding) == 1
|
||||
assert str(email_branding[0].id) == email_branding_id
|
||||
|
||||
@@ -1188,7 +1188,7 @@ def test_should_allow_store_original_number_on_sms_notification(
|
||||
mocked.assert_called_once_with([notification_id], queue="send-sms-tasks")
|
||||
assert response.status_code == 201
|
||||
assert notification_id
|
||||
notifications = Notification.query.all()
|
||||
notifications = db.session.execute(select(Notification)).scalars().all()
|
||||
assert len(notifications) == 1
|
||||
assert "1" == notifications[0].to
|
||||
|
||||
@@ -1349,7 +1349,7 @@ def test_post_notification_should_set_reply_to_text(
|
||||
],
|
||||
)
|
||||
assert response.status_code == 201
|
||||
notifications = Notification.query.all()
|
||||
notifications = db.session.execute(select(Notification)).scalars().all()
|
||||
assert len(notifications) == 1
|
||||
assert notifications[0].reply_to_text == expected_reply_to
|
||||
|
||||
|
||||
Reference in New Issue
Block a user