How to Dynamically Add/Remove Periodic Tasks to Celery (Celerybeat)

How to dynamically add / remove periodic tasks to Celery (celerybeat)

No, I'm sorry, this is not possible with the regular celerybeat.

But it's easily extensible to do what you want, e.g. the django-celery
scheduler is just a subclass reading and writing the schedule to the database
(with some optimizations on top).

Also you can use the django-celery scheduler even for non-Django projects.

Something like this:

  • Install django + django-celery:

    $ pip install -U django django-celery

  • Add the following settings to your celeryconfig:

    DATABASES = {
    'default': {
    'NAME': 'celerybeat.db',
    'ENGINE': 'django.db.backends.sqlite3',
    },
    }
    INSTALLED_APPS = ('djcelery', )
  • Create the database tables:

    $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
  • Start celerybeat with the database scheduler:

    $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \
    -S djcelery.schedulers.DatabaseScheduler

Also there's the djcelerymon command which can be used for non-Django projects
to start celerycam and a Django Admin webserver in the same process, you can
use that to also edit your periodic tasks in a nice web interface:

   $ djcelerymon

(Note for some reason djcelerymon can't be stopped using Ctrl+C, you
have to use Ctrl+Z + kill %1)

Add, modify, remove celery.schedules at run time

When you set in your app settings:

CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler'

celery beat proces checks django PeriodicTask model to see what task should be executed.

You can add / modify / remove those tasks by modifying it using django model:

from djcelery.models import PeriodicTask, CrontabSchedule

every_hours_crontab = CrontabSchedule(minute=0)
every_hours_crontab.save()

periodic_task = PeriodicTask(
name='Call my task every hour',
task='myproject.tasks.mytask',
crontab=every_hours_crontab,
args=json.dump([arg1, arg2]),
kwargs=json.dump({'foo': 'bar'})
)
periodic_task.save()

You can also test various configuration of PeriodicTask using django admin panel:

http://localhost:8000/admin/djcelery/crontabschedule/add/
http://localhost:8000/admin/djcelery/periodictask/

dynamically add periodic tasks celery

This works for Celery 4.0.1+ and Python 2.7, and Redis

from celery import Celery
import os, logging
logger = logging.getLogger(__name__)
current_module = __import__(__name__)

CELERY_CONFIG = {
'CELERY_BROKER_URL':
'redis://{}/0'.format(os.environ.get('REDIS_URL', 'localhost:6379')),
'CELERY_TASK_SERIALIZER': 'json',
}

celery = Celery(__name__, broker=CELERY_CONFIG['CELERY_BROKER_URL'])
celery.conf.update(CELERY_CONFIG)

I define a job in the following way:

job = {
'task': 'my_function', # Name of a predefined function
'schedule': {'minute': 0, 'hour': 0} # crontab schedule
'args': [2, 3],
'kwargs': {}
}

I then define a decorator like this:

def add_to_module(f):
setattr(current_module, 'tasks_{}__'.format(f.name), f)
return f

My task is

@add_to_module
def my_function(x, y, **kwargs):
return x + y

Then add a function which adds the task on the fly

def add_task(job):
logger.info("Adding periodic job: %s", job)
if not isinstance(job, dict) and 'task' in jobs:
logger.error("Job {} is ill-formed".format(job))
return False
celery.add_periodic_task(
crontab(**job.get('schedule', {'minute': 0, 'hour': 0})),
get_from_module(job['task']).s(
enterprise_id,
*job.get('args', []),
**job.get('kwargs', {})
),
name = job.get('name'),
expires = job.get('expires')
)
return True

def get_from_module(f):
return getattr(current_module, 'tasks_{}__'.format(f))

After this, you can link the add_task function to a URL, and get them to create tasks out of functions in your current module

Setting up periodic tasks in Celery (celerybeat) dynamically using add_periodic_task

EDIT: (13/01/2018)

The latest release 4.1.0 have addressed the subject in this ticket #3958 and has been merged


Actually you can't not define periodic task at the view level, because the beat schedule setting will be loaded first and can not be rescheduled at runtime:

The add_periodic_task() function will add the entry to the beat_schedule setting behind the scenes, and the same setting can also can be used to set up periodic tasks manually:

app.conf.CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.my_task',
'schedule': 10.0,
'args': (66,)
},
}

which means if you want to use add_periodic_task() it should be wrapped within an on_after_configure handler at the celery app level and any modification on runtime will not take effect:

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10, my_task.s(66))

As mentioned in the doc the the regular celerybeat simply keep track of task execution:

The default scheduler is the celery.beat.PersistentScheduler, that simply keeps track of the last run times in a local shelve database file.

In order to be able to dynamically manage periodic tasks and reschedule celerybeat at runtime:

There’s also the django-celery-beat extension that stores the schedule in the Django database, and presents a convenient admin interface to manage periodic tasks at runtime.

The tasks will be persisted in django database and the scheduler could be updated in task model at the db level. Whenever you update a periodic task a counter in this tasks table will be incremented, and tells the celery beat service to reload the schedule from the database.

A possible solution for you could be as follow:

from django_celery_beat.models import PeriodicTask, IntervalSchedule

schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66]))

views.py

def update_task_view(request, id)
task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique
task.args=json.dumps([id])
task.save()


Related Topics



Leave a reply



Submit