Added tests to increase code coverage

This commit is contained in:
alexjanousekGSA
2025-02-27 16:09:49 -05:00
parent e9e69777de
commit ca15646b26
7 changed files with 186 additions and 32 deletions

View File

@@ -209,7 +209,7 @@
"filename": "tests/app/aws/test_s3.py",
"hashed_secret": "67a74306b06d0c01624fe0d0249a570f4d093747",
"is_verified": false,
"line_number": 40,
"line_number": 42,
"is_secret": false
}
],
@@ -384,5 +384,5 @@
}
]
},
"generated_at": "2025-02-10T16:57:15Z"
"generated_at": "2025-02-27T21:09:45Z"
}

View File

@@ -152,4 +152,4 @@ clean:
.PHONY: test-single
test-single: export NEW_RELIC_ENVIRONMENT=test
test-single: ## Run a single test file
poetry run pytest $(TEST_FILE)
poetry run pytest -s $(TEST_FILE)

View File

@@ -534,7 +534,9 @@ def dao_fetch_stats_for_service_from_hours(service_id, start_date, end_date):
# Update to group by HOUR instead of DAY
total_substmt = (
select(
func.date_trunc("hour", NotificationAllTimeView.created_at).label("hour"), # UPDATED
func.date_trunc("hour", NotificationAllTimeView.created_at).label(
"hour"
), # UPDATED
Job.notification_count.label("notification_count"),
)
.join(Job, NotificationAllTimeView.job_id == Job.id)
@@ -556,11 +558,14 @@ def dao_fetch_stats_for_service_from_hours(service_id, start_date, end_date):
total_stmt = select(
total_substmt.c.hour, # UPDATED
func.sum(total_substmt.c.notification_count).label("total_notifications"),
).group_by(total_substmt.c.hour) # UPDATED
).group_by(
total_substmt.c.hour
) # UPDATED
# Ensure we're using hourly timestamps in the response
total_notifications = {
row.hour: row.total_notifications for row in db.session.execute(total_stmt).all()
row.hour: row.total_notifications
for row in db.session.execute(total_stmt).all()
}
# Update the second query to also use "hour"
@@ -568,7 +573,9 @@ def dao_fetch_stats_for_service_from_hours(service_id, start_date, end_date):
select(
NotificationAllTimeView.notification_type,
NotificationAllTimeView.status,
func.date_trunc("hour", NotificationAllTimeView.created_at).label("hour"), # UPDATED
func.date_trunc("hour", NotificationAllTimeView.created_at).label(
"hour"
), # UPDATED
func.count(NotificationAllTimeView.id).label("count"),
)
.where(
@@ -895,7 +902,9 @@ def get_specific_days_stats(
return stats
def get_specific_hours_stats(data, start_date, hours=None, end_date=None, total_notifications=None):
def get_specific_hours_stats(
data, start_date, hours=None, end_date=None, total_notifications=None
):
if hours is not None and end_date is not None:
raise ValueError("Only set hours OR set end_date, not both.")
elif hours is not None:
@@ -919,10 +928,10 @@ def get_specific_hours_stats(data, start_date, hours=None, end_date=None, total_
# Format statistics, returning only hours with results
stats = {
hour.strftime("%Y-%m-%dT%H:00:00Z"): statistics.format_statistics(
rows,
total_notifications.get(hour, 0) if total_notifications else None
rows, total_notifications.get(hour, 0) if total_notifications else None
)
for hour, rows in grouped_data.items() if rows
for hour, rows in grouped_data.items()
if rows
}
return stats

View File

@@ -22,8 +22,10 @@ from app.aws.s3 import (
get_s3_object,
get_s3_resource,
list_s3_objects,
purge_bucket,
read_s3_file,
remove_csv_object,
remove_job_from_s3,
remove_s3_object,
)
from app.clients import AWS_CLIENT_CONFIG
@@ -563,3 +565,72 @@ def test_get_s3_object_client_error(mocker):
mock_logger.exception.assert_called_once_with(
f"Can't retrieve S3 Object from {file_location}"
)
def test_purge_bucket(mocker):
mock_s3_resource = MagicMock()
mock_bucket = MagicMock()
mock_s3_resource.Bucket.return_value = mock_bucket
mocker.patch('app.aws.s3.get_s3_resource', return_value=mock_s3_resource)
purge_bucket('my-bucket', 'access-key', 'secret-key', 'region')
# Assert that the bucket's objects.all().delete() method was called
mock_bucket.objects.all.return_value.delete.assert_called_once()
def test_remove_job_from_s3(mocker):
mock_get_job_location = mocker.patch("app.aws.s3.get_job_location")
mock_remove_s3_object = mocker.patch("app.aws.s3.remove_s3_object")
mock_get_job_location.return_value = (
"test-bucket",
"test.csv",
"fake-stuff",
)
remove_job_from_s3("service-id-123", "job-id-456")
mock_get_job_location.assert_called_once_with("service-id-123", "job-id-456")
mock_remove_s3_object.assert_called_once_with(
"test-bucket",
"test.csv",
"fake-stuff",
)
def test_get_s3_files_handles_exception(mocker):
mock_current_app = mocker.patch("app.aws.s3.current_app")
mock_current_app.config = {
"CSV_UPLOAD_BUCKET": {"bucket": "test-bucket"},
"job_cache": {},
}
mock_list_s3_objects = mocker.patch("app.aws.s3.list_s3_objects")
mock_list_s3_objects.return_value = ["file1.csv", "file2.csv"]
mock_get_s3_resource = mocker.patch("app.aws.s3.get_s3_resource")
# Make the first call succeed, second call should fail.
mock_read_s3_file = mocker.patch(
"app.aws.s3.read_s3_file",
side_effect=[None, Exception("exception here")]
)
mock_thread_pool_executor = mocker.patch("app.aws.s3.ThreadPoolExecutor")
mock_executor = mock_thread_pool_executor.return_value.__enter__.return_value
def mock_map(func, iterable):
for item in iterable:
func(item)
mock_executor.map.side_effect = mock_map
get_s3_files()
calls = [
mocker.call("test-bucket", "file1.csv", mock_get_s3_resource.return_value),
mocker.call("test-bucket", "file2.csv", mock_get_s3_resource.return_value),
]
mock_read_s3_file.assert_has_calls(calls, any_order=True)
mock_current_app.logger.exception.assert_called_with("Connection pool issue")

View File

@@ -2134,3 +2134,55 @@ def test_sanitize_successful_notification_by_id():
"sent_at": ANY,
},
)
def test_dao_get_notifications_by_recipient_or_reference_covers_sms_search_by_reference(notify_db_session):
"""
This test:
1. Creates a service and an SMS template.
2. Creates a notification with a specific client_reference and status=FAILED.
3. Calls dao_get_notifications_by_recipient_or_reference with notification_type=SMS,
statuses=[FAILED], and a search term = client_reference.
4. Confirms the function returns exactly one notification matching that reference.
"""
service = create_service(service_name="Test Service")
template = create_template(service=service, template_type=NotificationType.SMS)
# Instead of matching phone logic, we'll match on client_reference
data = {
"id": uuid.uuid4(),
"to": "1",
"normalised_to": "1", # phone is irrelevant here
"service_id": service.id,
"service": service,
"template_id": template.id,
"template_version": template.version,
"status": NotificationStatus.FAILED,
"created_at": utc_now(),
"billable_units": 1,
"notification_type": template.template_type,
"key_type": KeyType.NORMAL,
"client_reference": "some-ref", # <--- We'll search for this
}
notification = Notification(**data)
dao_create_notification(notification)
# We'll search by this reference instead of a phone number
search_term = "some-ref"
results_page = dao_get_notifications_by_recipient_or_reference(
service_id=service.id,
search_term=search_term,
notification_type=NotificationType.SMS,
statuses=[NotificationStatus.FAILED],
page=1,
page_size=50,
)
# Now we should find exactly one match
assert len(results_page.items) == 1, "Should find exactly one matching notification"
found = results_page.items[0]
assert found.id == notification.id
assert found.status == NotificationStatus.FAILED
assert found.client_reference == "some-ref"

View File

@@ -7,7 +7,9 @@ from app.dao.services_dao import get_specific_hours_stats
from app.enums import StatisticsType
from app.models import TemplateType
NotificationRow = namedtuple("NotificationRow", ["notification_type", "status", "timestamp", "count"])
NotificationRow = namedtuple(
"NotificationRow", ["notification_type", "status", "timestamp", "count"]
)
def generate_expected_hourly_output(requested_sms_hours):
@@ -38,27 +40,31 @@ def create_mock_notification(notification_type, status, timestamp, count=1):
notification_type=notification_type,
status=status,
timestamp=timestamp.replace(minute=0, second=0, microsecond=0),
count=count
count=count,
)
test_cases = [
(
[create_mock_notification(
TemplateType.SMS,
StatisticsType.REQUESTED,
datetime(2025, 2, 18, 14, 15, 0),
)],
[
create_mock_notification(
TemplateType.SMS,
StatisticsType.REQUESTED,
datetime(2025, 2, 18, 14, 15, 0),
)
],
datetime(2025, 2, 18, 12, 0),
6,
generate_expected_hourly_output(["2025-02-18T14:00:00Z"]),
),
(
[create_mock_notification(
TemplateType.SMS,
StatisticsType.REQUESTED,
datetime(2025, 2, 18, 17, 59, 59),
)],
[
create_mock_notification(
TemplateType.SMS,
StatisticsType.REQUESTED,
datetime(2025, 2, 18, 17, 59, 59),
)
],
datetime(2025, 2, 18, 15, 0),
3,
generate_expected_hourly_output(["2025-02-18T17:00:00Z"]),
@@ -66,21 +72,29 @@ test_cases = [
([], datetime(2025, 2, 18, 10, 0), 4, {}),
(
[
create_mock_notification(TemplateType.SMS, StatisticsType.REQUESTED, datetime(2025, 2, 18, 9, 30, 0)),
create_mock_notification(TemplateType.SMS, StatisticsType.REQUESTED, datetime(2025, 2, 18, 11, 45, 0)),
create_mock_notification(
TemplateType.SMS,
StatisticsType.REQUESTED,
datetime(2025, 2, 18, 9, 30, 0),
),
create_mock_notification(
TemplateType.SMS,
StatisticsType.REQUESTED,
datetime(2025, 2, 18, 11, 45, 0),
),
],
datetime(2025, 2, 18, 8, 0),
5,
generate_expected_hourly_output(["2025-02-18T09:00:00Z", "2025-02-18T11:00:00Z"]),
generate_expected_hourly_output(
["2025-02-18T09:00:00Z", "2025-02-18T11:00:00Z"]
),
),
]
@pytest.mark.parametrize("mocked_notifications, start_date, hours, expected_output", test_cases)
@pytest.mark.parametrize(
"mocked_notifications, start_date, hours, expected_output", test_cases
)
def test_get_specific_hours(mocked_notifications, start_date, hours, expected_output):
results = get_specific_hours_stats(
mocked_notifications,
start_date,
hours=hours
)
results = get_specific_hours_stats(mocked_notifications, start_date, hours=hours)
assert results == expected_output, f"Expected {expected_output}, but got {results}"

View File

@@ -15,6 +15,7 @@ from app.commands import (
dump_sms_senders,
dump_user_info,
fix_billable_units,
generate_salt,
insert_inbound_numbers_from_file,
populate_annual_billing_with_defaults,
populate_annual_billing_with_the_previous_years_allowance,
@@ -661,3 +662,10 @@ def test_dump_user_info(notify_api, mocker):
mock_get_user_by_email.assert_called_once_with("john@example.com")
mock_open_file.assert_called_once_with("user_download.json", "wb")
def test_generate_salt(notify_api):
runner = notify_api.test_cli_runner()
result = runner.invoke(generate_salt)
assert result.exit_code == 0
assert len(result.output.strip()) == 32