Created new endpoint for polling (#1996)

* Created new endpoint for polling

* Fixed sorting
This commit is contained in:
Alex Janousek
2025-09-26 14:28:54 -04:00
committed by GitHub
parent 37f31f8ae1
commit a854588b66
2 changed files with 126 additions and 1 deletions

View File

@@ -1,9 +1,10 @@
import json
from zoneinfo import ZoneInfo
import dateutil
from flask import Blueprint, current_app, jsonify, request
from app import db
from app import db, redis_store
from app.aws.s3 import (
extract_personalisation,
extract_phones,
@@ -62,6 +63,73 @@ def get_job_by_service_and_job_id(service_id, job_id):
return jsonify(data=data)
@job_blueprint.route("/<job_id>/status", methods=["GET"])
def get_job_status(service_id, job_id):
"""Lightweight endpoint for polling job status with aggressive caching."""
current_app.logger.info(f"Getting job status for service_id={service_id}, job_id={job_id}")
check_suspicious_id(service_id, job_id)
# Check cache first (10 second TTL)
cache_key = f"job_status:{service_id}:{job_id}"
cached_status = redis_store.get(cache_key)
if cached_status:
current_app.logger.info(f"Returning cached status for job {job_id}")
return jsonify(json.loads(cached_status))
job = dao_get_job_by_service_id_and_job_id(service_id, job_id)
statistics = dao_get_notification_outcomes_for_job(service_id, job_id)
sent_count = 0
failed_count = 0
pending_count = 0
status_mapping = {
'sent': ['sent'],
'delivered': ['sent'],
'failed': ['failed', 'permanent-failure', 'temporary-failure', 'technical-failure', 'virus-scan-failed'],
'pending': ['created', 'sending', 'pending', 'pending-virus-check']
}
for stat in statistics:
count = stat[0] if isinstance(stat, tuple) else stat.count
status = stat[1] if isinstance(stat, tuple) else stat.status
if status in status_mapping.get('delivered', []) or status in status_mapping.get('sent', []):
sent_count += count
elif status in status_mapping.get('failed', []):
failed_count += count
elif status in status_mapping.get('pending', []):
pending_count += count
else:
if job.processing_finished:
sent_count += count
else:
pending_count += count
response_data = {
"sent_count": sent_count,
"failed_count": failed_count,
"pending_count": pending_count,
"total_count": job.notification_count,
"job_status": job.job_status,
"processing_finished": job.processing_finished is not None
}
if job.processing_finished:
# Finished jobs can be cached for 5 minutes since they won't change
cache_ttl = 300
else:
# Active jobs cached for 10 seconds
cache_ttl = 10
redis_store.set(cache_key, json.dumps(response_data), ex=cache_ttl)
current_app.logger.info(f"Cached status for job {job_id} with TTL={cache_ttl}s")
return jsonify(response_data)
@job_blueprint.route("/<job_id>/cancel", methods=["POST"])
def cancel_job(service_id, job_id):
check_suspicious_id(service_id, job_id)

View File

@@ -1213,3 +1213,60 @@ def test_get_scheduled_job_stats(admin_request):
"count": 1,
"soonest_scheduled_for": "2017-07-17T11:00:00+00:00",
}
def test_get_job_status_returns_lightweight_response(admin_request, sample_job):
"""Test that the new status endpoint returns only essential fields."""
job_id = str(sample_job.id)
service_id = sample_job.service.id
create_notification(job=sample_job, status=NotificationStatus.SENT)
create_notification(job=sample_job, status=NotificationStatus.SENT)
create_notification(job=sample_job, status=NotificationStatus.FAILED)
create_notification(job=sample_job, status=NotificationStatus.PENDING)
create_notification(job=sample_job, status=NotificationStatus.CREATED)
resp_json = admin_request.get(
"job.get_job_status",
service_id=service_id,
job_id=job_id,
)
assert set(resp_json.keys()) == {
"sent_count",
"failed_count",
"pending_count",
"total_count",
"job_status",
"processing_finished"
}
# Verify counts are correct
assert resp_json["sent_count"] == 2
assert resp_json["failed_count"] == 1
assert resp_json["pending_count"] == 2
assert resp_json["total_count"] == sample_job.notification_count
assert resp_json["job_status"] == sample_job.job_status
assert resp_json["processing_finished"] == (sample_job.processing_finished is not None)
def test_get_job_status_caches_response(admin_request, sample_job, mocker):
"""Test that the status endpoint uses caching."""
job_id = str(sample_job.id)
service_id = sample_job.service.id
create_notification(job=sample_job, status=NotificationStatus.SENT)
mock_redis_get = mocker.patch("app.job.rest.redis_store.get", return_value=None)
mock_redis_set = mocker.patch("app.job.rest.redis_store.set")
# First request should cache the result
admin_request.get(
"job.get_job_status",
service_id=service_id,
job_id=job_id,
)
cache_key = f"job_status:{service_id}:{job_id}"
mock_redis_get.assert_called_once_with(cache_key)
mock_redis_set.assert_called_once()