socket io setup

This commit is contained in:
Beverly Nguyen
2025-04-09 16:56:42 -07:00
parent 0792ea123f
commit f8726ca6b7
12 changed files with 238 additions and 268 deletions

View File

@@ -50,8 +50,8 @@ too-complex:
poetry run radon cc ./app -a -nc
.PHONY: run-flask
run-flask: ## Run flask
poetry run newrelic-admin run-program flask run -p 6011 --host=0.0.0.0
run-flask:
poetry run newrelic-admin run-program python run.py
.PHONY: run-celery
run-celery: ## Run celery, TODO remove purge for staging/prod

View File

@@ -1,3 +1,3 @@
web: make run-flask
web: poetry run newrelic-admin run-program python run.py
worker: make run-celery
scheduler: make run-celery-beat

View File

@@ -13,11 +13,11 @@ from flask import current_app, g, has_request_context, jsonify, make_response, r
from flask.ctx import has_app_context
from flask_marshmallow import Marshmallow
from flask_migrate import Migrate
from flask_socketio import SocketIO
from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy
from sqlalchemy import event
from werkzeug.exceptions import HTTPException as WerkzeugHTTPException
from werkzeug.local import LocalProxy
from flask_socketio import SocketIO
from app import config
from app.clients import NotificationProviderClients
@@ -32,6 +32,13 @@ from notifications_utils.clients.encryption.encryption_client import Encryption
from notifications_utils.clients.redis.redis_client import RedisClient
from notifications_utils.clients.zendesk.zendesk_client import ZendeskClient
socketio = SocketIO(
cors_allowed_origins="*",
message_queue="redis://localhost:6379",
logger=True,
engineio_logger=True
)
class NotifyCelery(Celery):
def init_app(self, app):
@@ -101,7 +108,7 @@ notification_provider_clients = NotificationProviderClients()
api_user = LocalProxy(lambda: g.api_user)
authenticated_service = LocalProxy(lambda: g.authenticated_service)
socketio = SocketIO(cors_allowed_origins="*") # ← Global instance
def create_app(application):
from app.config import configs
@@ -114,6 +121,8 @@ def create_app(application):
socketio.init_app(application)
from app.socket_handlers import register_socket_handlers
register_socket_handlers(socketio)
request_helper.init_app(application)
db.init_app(application)
migrate.init_app(application, db=db)
@@ -171,7 +180,6 @@ def register_blueprint(application):
from app.inbound_number.rest import inbound_number_blueprint
from app.inbound_sms.rest import inbound_sms as inbound_sms_blueprint
from app.job.rest import job_blueprint
from app.socketio_server.test_socketio import test_bp
from app.notifications.notifications_ses_callback import ses_callback_blueprint
from app.notifications.receive_notifications import receive_notifications_blueprint
from app.notifications.rest import notifications as notifications_blueprint
@@ -193,9 +201,6 @@ def register_blueprint(application):
from app.user.rest import user_blueprint
from app.webauthn.rest import webauthn_blueprint
# this will need to be requires_admin_auth
application.register_blueprint(test_bp)
service_blueprint.before_request(requires_admin_auth)
application.register_blueprint(service_blueprint, url_prefix="/service")

View File

@@ -26,9 +26,11 @@ from werkzeug.datastructures import MultiDict
from app import create_uuid, db
from app.dao.dao_utils import autocommit
from app.dao.inbound_sms_dao import Pagination
from app.dao.jobs_dao import dao_get_job_by_id
from app.enums import KeyType, NotificationStatus, NotificationType
from app.models import FactNotificationStatus, Notification, NotificationHistory
from app.utils import (
emit_job_update_summary,
escape_special_characters,
get_midnight_in_utc,
midnight_n_days_ago,
@@ -895,6 +897,15 @@ def dao_update_delivery_receipts(receipts, delivered):
f"#loadtestperformance batch update query time: \
updated {len(receipts)} notification in {elapsed_time} ms"
)
current_app.logger.info("✅✅✅✅ Reached delivery receipt processing")
job_ids = db.session.execute(
select(Notification.job_id)
.where(Notification.message_id.in_(id_to_carrier.keys()))
).scalars().all()
for job_id in set(job_ids):
job = dao_get_job_by_id(job_id)
emit_job_update_summary(job)
def dao_close_out_delivery_receipts():

16
app/socket_handlers.py Normal file
View File

@@ -0,0 +1,16 @@
from flask import current_app, request
from flask_socketio import join_room, leave_room
def register_socket_handlers(socketio):
@socketio.on("join")
def on_join(data): # noqa: F401
room = data.get("room")
join_room(room)
current_app.logger.info(f"Socket {request.sid} joined room {room}")
@socketio.on("leave")
def on_leave(data): # noqa: F401
room = data.get("room")
leave_room(room)
current_app.logger.info(f"Socket {request.sid} left room {room}")

View File

@@ -1,12 +0,0 @@
from flask import Blueprint
from app import socketio
test_bp = Blueprint('test', __name__)
@test_bp.route('/test-emit', methods=["GET"])
def test_emit():
socketio.emit('job_update', {
'job_id': 'abc123',
'status': 'Test message from API'
})
return "Event emitted!"

View File

@@ -131,3 +131,18 @@ def utc_now():
def debug_not_production(msg):
if os.getenv("NOTIFY_ENVIRONMENT") not in ["production"]:
current_app.logger.info(msg)
def emit_job_update_summary(job):
from app import socketio
current_app.logger.info(f"Emitting summary for job {job.id}")
socketio.emit(
"job_updated",
{
"job_id": str(job.id),
"job_status": job.job_status,
"notification_count": job.notification_count,
},
room=f"job-{job.id}"
)

120
package-lock.json generated Normal file
View File

@@ -0,0 +1,120 @@
{
"name": "notifications-api",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"dependencies": {
"socket.io-client": "^4.8.1"
}
},
"node_modules/@socket.io/component-emitter": {
"version": "3.1.2",
"resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.2.tgz",
"integrity": "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==",
"license": "MIT"
},
"node_modules/debug": {
"version": "4.3.7",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz",
"integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==",
"license": "MIT",
"dependencies": {
"ms": "^2.1.3"
},
"engines": {
"node": ">=6.0"
},
"peerDependenciesMeta": {
"supports-color": {
"optional": true
}
}
},
"node_modules/engine.io-client": {
"version": "6.6.3",
"resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.6.3.tgz",
"integrity": "sha512-T0iLjnyNWahNyv/lcjS2y4oE358tVS/SYQNxYXGAJ9/GLgH4VCvOQ/mhTjqU88mLZCQgiG8RIegFHYCdVC+j5w==",
"license": "MIT",
"dependencies": {
"@socket.io/component-emitter": "~3.1.0",
"debug": "~4.3.1",
"engine.io-parser": "~5.2.1",
"ws": "~8.17.1",
"xmlhttprequest-ssl": "~2.1.1"
}
},
"node_modules/engine.io-parser": {
"version": "5.2.3",
"resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.3.tgz",
"integrity": "sha512-HqD3yTBfnBxIrbnM1DoD6Pcq8NECnh8d4As1Qgh0z5Gg3jRRIqijury0CL3ghu/edArpUYiYqQiDUQBIs4np3Q==",
"license": "MIT",
"engines": {
"node": ">=10.0.0"
}
},
"node_modules/ms": {
"version": "2.1.3",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz",
"integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==",
"license": "MIT"
},
"node_modules/socket.io-client": {
"version": "4.8.1",
"resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.8.1.tgz",
"integrity": "sha512-hJVXfu3E28NmzGk8o1sHhN3om52tRvwYeidbj7xKy2eIIse5IoKX3USlS6Tqt3BHAtflLIkCQBkzVrEEfWUyYQ==",
"license": "MIT",
"dependencies": {
"@socket.io/component-emitter": "~3.1.0",
"debug": "~4.3.2",
"engine.io-client": "~6.6.1",
"socket.io-parser": "~4.2.4"
},
"engines": {
"node": ">=10.0.0"
}
},
"node_modules/socket.io-parser": {
"version": "4.2.4",
"resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz",
"integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==",
"license": "MIT",
"dependencies": {
"@socket.io/component-emitter": "~3.1.0",
"debug": "~4.3.1"
},
"engines": {
"node": ">=10.0.0"
}
},
"node_modules/ws": {
"version": "8.17.1",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz",
"integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==",
"license": "MIT",
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
},
"node_modules/xmlhttprequest-ssl": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.1.2.tgz",
"integrity": "sha512-TEU+nJVUUnA4CYJFLvK5X9AOeH4KvDvhIfm0vV1GaQRtchnG0hgK5p8hw/xjv8cunWYCsiPCSDzObPyhEwq3KQ==",
"engines": {
"node": ">=0.4.0"
}
}
}
}

5
package.json Normal file
View File

@@ -0,0 +1,5 @@
{
"dependencies": {
"socket.io-client": "^4.8.1"
}
}

287
poetry.lock generated

File diff suppressed because it is too large Load Diff

17
run.py Normal file
View File

@@ -0,0 +1,17 @@
# flake8: noqa: E402
import eventlet
eventlet.monkey_patch()
from flask import Flask
from app import create_app, socketio
def build_application():
application = Flask("app")
create_app(application)
return application
if __name__ == "__main__":
application = build_application()
socketio.run(application, host="0.0.0.0", port=6011, debug=True, use_reloader=True)