Celery tests

This commit is contained in:
Martyn Inglis
2016-02-09 13:31:45 +00:00
parent 1a7c521ebb
commit fb41acdac9
9 changed files with 84 additions and 5 deletions

0
app/celery/__init__.py Normal file
View File

21
app/celery/celery.py Normal file
View File

@@ -0,0 +1,21 @@
from celery import Celery
class NotifyCelery(Celery):
def init_app(self, app):
super().__init__(app.import_name, broker=app.config['BROKER_URL'])
self.conf.update(app.config)
TaskBase = self.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask

9
app/celery/tasks.py Normal file
View File

@@ -0,0 +1,9 @@
from app import celery
from app.dao.services_dao import get_model_services
@celery.task(name="refresh-services")
def refresh_services():
print(get_model_services())
for service in get_model_services():
celery.control.add_consumer(str(service.id))