Some checks failed
CICD Start / Sanity and Base Decision (push) Failing after 13m23s
## Summary This PR upgrades backend typing tooling and finalizes quality cleanup for the SQLAlchemy async engine/session work on `PP-11_Add_SQLAlchemy_async_engine_session`. ## What Changed - Upgraded pinned `pyright` version from `1.1.406` to `1.1.410`. - Regenerated backend lock state to align with the new pyright pin. - Removed deprecated `pythonPath` from pyright config to eliminate the config notice. - Refined backend DB runtime state handling and cleanup paths. - Updated database tests to avoid brittle singleton assumptions and keep typeguard-compatible async engine test doubles. ## Why - Remove tooling noise and keep checks deterministic. - Keep backend static/type/doc/test quality gates warning-free. - Improve reliability and clarity of DB engine/session lifecycle tests. ## Validation - `uv run ruff format --check .` - `uv run ruff check .` - `uv run pyright .` - `uv run pydoclint --config=pyproject.toml src/` - `uv run xdoctest --module backend` - `uv run pytest` Results: - Pyright: `0 errors, 0 warnings` - Pytest: `21 passed` ## Impact - No API contract changes intended. - No frontend changes. - Backend behavior remains the same; this is tooling/test/runtime-state cleanup. ## Risk - Low risk. - Main touched areas are backend tooling config, lockfile, DB runtime state internals, and tests. ## Checklist - [x] Backend lint clean - [x] Backend type checks clean - [x] Backend doc checks clean - [x] Backend tests passing - [x] Branch pushed and ready for review Co-authored-by: copilotcoder <copilotcoder@darkhelm.org> Reviewed-on: #67
184 lines
6.0 KiB
Python
184 lines
6.0 KiB
Python
"""Unit tests for database wiring helpers."""
|
|
|
|
from typing import Any, cast
|
|
from unittest.mock import AsyncMock
|
|
|
|
import pytest
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
|
|
|
|
import backend.database as database
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_database_singletons(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""Reset database singletons before each test."""
|
|
database.reset_runtime_state()
|
|
|
|
|
|
def test_get_database_url_requires_configuration(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
"""Database URL helper should fail when DATABASE_URL is not set."""
|
|
monkeypatch.delenv("DATABASE_URL", raising=False)
|
|
|
|
with pytest.raises(database.DatabaseConfigurationError):
|
|
database.get_database_url()
|
|
|
|
|
|
def test_get_database_url_rewrites_postgresql_scheme(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
"""Database URL helper should normalize URLs for SQLAlchemy async psycopg."""
|
|
monkeypatch.setenv("DATABASE_URL", "postgresql://user:pass@db:5432/app")
|
|
|
|
assert database.get_database_url() == "postgresql+psycopg://user:pass@db:5432/app"
|
|
|
|
|
|
def test_get_database_url_keeps_existing_scheme(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
"""Database URL helper should keep already-normalized URLs unchanged."""
|
|
monkeypatch.setenv("DATABASE_URL", "postgresql+psycopg://user:pass@db:5432/app")
|
|
|
|
assert database.get_database_url() == "postgresql+psycopg://user:pass@db:5432/app"
|
|
|
|
|
|
def test_get_engine_is_singleton(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""Engine helper should create engine once and reuse it."""
|
|
created: list[tuple[str, bool]] = []
|
|
engine = cast("AsyncEngine", AsyncMock(spec=AsyncEngine))
|
|
|
|
def fake_create_async_engine(url: str, *, pool_pre_ping: bool) -> AsyncEngine:
|
|
created.append((url, pool_pre_ping))
|
|
return engine
|
|
|
|
monkeypatch.setenv("DATABASE_URL", "postgresql://user:pass@db:5432/app")
|
|
monkeypatch.setattr(database, "create_async_engine", fake_create_async_engine)
|
|
|
|
first = database.get_engine()
|
|
second = database.get_engine()
|
|
|
|
assert first is engine
|
|
assert second is engine
|
|
assert created == [("postgresql+psycopg://user:pass@db:5432/app", True)]
|
|
|
|
|
|
def test_get_sessionmaker_is_singleton(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""Sessionmaker helper should create one factory and reuse it."""
|
|
engine = cast("AsyncEngine", AsyncMock(spec=AsyncEngine))
|
|
monkeypatch.setattr(database, "get_engine", lambda: engine)
|
|
|
|
first = database.get_sessionmaker()
|
|
second = database.get_sessionmaker()
|
|
|
|
assert first is second
|
|
assert first.kw["bind"] is engine
|
|
assert first.kw["expire_on_commit"] is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_session_raises_database_error_when_db_not_configured(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
"""Session helper should raise a database-layer config error when missing."""
|
|
|
|
def raise_config_error() -> Any:
|
|
raise database.DatabaseConfigurationError("missing")
|
|
|
|
monkeypatch.setattr(database, "get_sessionmaker", raise_config_error)
|
|
|
|
generator = database.get_session()
|
|
with pytest.raises(database.DatabaseConfigurationError):
|
|
await anext(generator)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_session_yields_scoped_session(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
"""Session dependency should yield one session and close context afterwards."""
|
|
fake_session = cast("AsyncSession", AsyncMock(spec=AsyncSession))
|
|
|
|
class SessionContextManager:
|
|
"""Minimal async context manager used by the fake session factory."""
|
|
|
|
exited = False
|
|
|
|
async def __aenter__(self) -> AsyncSession:
|
|
return fake_session
|
|
|
|
async def __aexit__(self, *args: object) -> None:
|
|
self.exited = True
|
|
|
|
context_manager = SessionContextManager()
|
|
|
|
class SessionFactory:
|
|
"""Callable session factory returning an async context manager."""
|
|
|
|
def __call__(self) -> SessionContextManager:
|
|
return context_manager
|
|
|
|
monkeypatch.setattr(database, "get_sessionmaker", lambda: SessionFactory())
|
|
|
|
generator = database.get_session()
|
|
yielded = await anext(generator)
|
|
|
|
assert yielded is fake_session
|
|
|
|
with pytest.raises(StopAsyncIteration):
|
|
await anext(generator)
|
|
|
|
assert context_manager.exited is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_probe_database_success() -> None:
|
|
"""Database probe should return True when SELECT 1 succeeds."""
|
|
session = cast("AsyncSession", AsyncMock(spec=AsyncSession))
|
|
session.execute = AsyncMock(return_value=1)
|
|
|
|
assert await database.probe_database(session) is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_probe_database_failure() -> None:
|
|
"""Database probe should return False when SQLAlchemy raises an error."""
|
|
session = cast("AsyncSession", AsyncMock(spec=AsyncSession))
|
|
session.execute = AsyncMock(side_effect=SQLAlchemyError("db unavailable"))
|
|
|
|
assert await database.probe_database(session) is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispose_engine_resets_global_state(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
"""Dispose helper should call engine dispose and reset cached globals."""
|
|
|
|
create_calls = 0
|
|
created_engines: list[AsyncEngine] = []
|
|
|
|
def fake_create_async_engine(url: str, *, pool_pre_ping: bool) -> AsyncEngine: # noqa: ARG001
|
|
nonlocal create_calls
|
|
create_calls += 1
|
|
engine = cast("AsyncEngine", AsyncMock(spec=AsyncEngine))
|
|
engine.dispose = AsyncMock()
|
|
created_engines.append(engine)
|
|
return engine
|
|
|
|
monkeypatch.setenv("DATABASE_URL", "postgresql://user:pass@db:5432/app")
|
|
monkeypatch.setattr(database, "create_async_engine", fake_create_async_engine)
|
|
|
|
first_engine = database.get_engine()
|
|
assert create_calls == 1
|
|
|
|
await database.dispose_engine()
|
|
|
|
cast("AsyncMock", first_engine.dispose).assert_awaited_once()
|
|
|
|
second_engine = database.get_engine()
|
|
assert create_calls == 2
|
|
assert first_engine is not second_engine
|
|
cast("AsyncMock", second_engine.dispose).assert_not_awaited()
|