from celery import Celery
from config import CELERY_BACKEND_URL, CELERY_BROKER_URL, CELERY_DEFAULT_QUEUE
from kombu import Exchange, Queue
from celery_once import QueueOnce
celery_app = Celery(
broker=CELERY_BROKER_URL,
backend=CELERY_BACKEND_URL
)
celery_app.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': CELERY_BROKER_URL,
'default_timeout': 60 * 60
}
}
default_exchange = Exchange(CELERY_DEFAULT_QUEUE, type='direct')
compute_exchange = Exchange('compute', type='direct')
eventlet_exchange = Exchange('eventlet', type='direct')
realtime_exchange = Exchange('realtime', type='direct')
CELERY_QUEUES = (
Queue(CELERY_DEFAULT_QUEUE, default_exchange, routing_key=CELERY_DEFAULT_QUEUE),
Queue('compute', compute_exchange, routing_key='compute'),
Queue('eventlet', eventlet_exchange, routing_key='eventlet'),
Queue('realtime', realtime_exchange, routing_key='realtime')
)
celery_app.conf.update(
CELERY_DEFAULT_QUEUE=CELERY_DEFAULT_QUEUE,
CELERY_DEFAULT_EXCHANGE=CELERY_DEFAULT_QUEUE,
CELERY_DEFAULT_ROUTTING_KEY=CELERY_DEFAULT_QUEUE,
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,
CELERY_TASK_PROTOCOL=1,
CELERYD_FORCE_EXECV=True,
CELERYD_MAX_TASKS_PER_CHILD=500,
CELERY_TASK_RESULT_EXPIRES=60 * 60
)
class MyTask(QueueOnce):
def before_start(self, task_id, args, kwargs):
'''
任务开始前
:param task_id:
:param args:
:param kwargs:
:return:
'''
pass