initial test socket io setup

This commit is contained in:
Beverly Nguyen
2025-03-27 09:51:22 -07:00
parent 555b10d793
commit 16c68e06b9
3 changed files with 20 additions and 1 deletions

View File

@@ -17,6 +17,7 @@ from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy
from sqlalchemy import event
from werkzeug.exceptions import HTTPException as WerkzeugHTTPException
from werkzeug.local import LocalProxy
from flask_socketio import SocketIO
from app import config
from app.clients import NotificationProviderClients
@@ -100,7 +101,7 @@ notification_provider_clients = NotificationProviderClients()
api_user = LocalProxy(lambda: g.api_user)
authenticated_service = LocalProxy(lambda: g.authenticated_service)
socketio = SocketIO(cors_allowed_origins="*") # ← Global instance
def create_app(application):
from app.config import configs
@@ -111,6 +112,8 @@ def create_app(application):
application.config["NOTIFY_APP_NAME"] = application.name
init_app(application)
socketio.init_app(application)
request_helper.init_app(application)
db.init_app(application)
migrate.init_app(application, db=db)
@@ -168,6 +171,7 @@ def register_blueprint(application):
from app.inbound_number.rest import inbound_number_blueprint
from app.inbound_sms.rest import inbound_sms as inbound_sms_blueprint
from app.job.rest import job_blueprint
from app.socketio_server.test_socketio import test_bp
from app.notifications.notifications_ses_callback import ses_callback_blueprint
from app.notifications.receive_notifications import receive_notifications_blueprint
from app.notifications.rest import notifications as notifications_blueprint
@@ -189,6 +193,9 @@ def register_blueprint(application):
from app.user.rest import user_blueprint
from app.webauthn.rest import webauthn_blueprint
# this will need to be requires_admin_auth
application.register_blueprint(test_bp)
service_blueprint.before_request(requires_admin_auth)
application.register_blueprint(service_blueprint, url_prefix="/service")

View File

View File

@@ -0,0 +1,12 @@
from flask import Blueprint
from app import socketio
test_bp = Blueprint('test', __name__)
@test_bp.route('/test-emit', methods=["GET"])
def test_emit():
socketio.emit('job_update', {
'job_id': 'abc123',
'status': 'Test message from API'
})
return "Event emitted!"