How to dynamically add / remove periodic tasks to Celery (celerybeat)
No, I'm sorry, this is not possible with the regular celerybeat.
But it's easily extensible to do what you want, e.g. the django-celery
scheduler is just a subclass reading and writing the schedule to the database
(with some optimizations on top).
Also you can use the django-celery scheduler even for non-Django projects.
Something like this:
Install django + django-celery:
$ pip install -U django django-celery
Add the following settings to your celeryconfig:
DATABASES = {
'default': {
'NAME': 'celerybeat.db',
'ENGINE': 'django.db.backends.sqlite3',
},
}
INSTALLED_APPS = ('djcelery', )Create the database tables:
$ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
Start celerybeat with the database scheduler:
$ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \
-S djcelery.schedulers.DatabaseScheduler
Also there's the djcelerymon
command which can be used for non-Django projects
to start celerycam and a Django Admin webserver in the same process, you can
use that to also edit your periodic tasks in a nice web interface:
$ djcelerymon
(Note for some reason djcelerymon can't be stopped using Ctrl+C, you
have to use Ctrl+Z + kill %1)
Add, modify, remove celery.schedules at run time
When you set in your app settings:
CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler'
celery beat
proces checks django PeriodicTask model to see what task should be executed.
You can add / modify / remove those tasks by modifying it using django model:
from djcelery.models import PeriodicTask, CrontabSchedule
every_hours_crontab = CrontabSchedule(minute=0)
every_hours_crontab.save()
periodic_task = PeriodicTask(
name='Call my task every hour',
task='myproject.tasks.mytask',
crontab=every_hours_crontab,
args=json.dump([arg1, arg2]),
kwargs=json.dump({'foo': 'bar'})
)
periodic_task.save()
You can also test various configuration of PeriodicTask using django admin panel:http://localhost:8000/admin/djcelery/crontabschedule/add/
http://localhost:8000/admin/djcelery/periodictask/
dynamically add periodic tasks celery
This works for Celery 4.0.1+ and Python 2.7, and Redis
from celery import Celery
import os, logging
logger = logging.getLogger(__name__)
current_module = __import__(__name__)
CELERY_CONFIG = {
'CELERY_BROKER_URL':
'redis://{}/0'.format(os.environ.get('REDIS_URL', 'localhost:6379')),
'CELERY_TASK_SERIALIZER': 'json',
}
celery = Celery(__name__, broker=CELERY_CONFIG['CELERY_BROKER_URL'])
celery.conf.update(CELERY_CONFIG)
I define a job in the following way:
job = {
'task': 'my_function', # Name of a predefined function
'schedule': {'minute': 0, 'hour': 0} # crontab schedule
'args': [2, 3],
'kwargs': {}
}
I then define a decorator like this:
def add_to_module(f):
setattr(current_module, 'tasks_{}__'.format(f.name), f)
return f
My task is
@add_to_module
def my_function(x, y, **kwargs):
return x + y
Then add a function which adds the task on the fly
def add_task(job):
logger.info("Adding periodic job: %s", job)
if not isinstance(job, dict) and 'task' in jobs:
logger.error("Job {} is ill-formed".format(job))
return False
celery.add_periodic_task(
crontab(**job.get('schedule', {'minute': 0, 'hour': 0})),
get_from_module(job['task']).s(
enterprise_id,
*job.get('args', []),
**job.get('kwargs', {})
),
name = job.get('name'),
expires = job.get('expires')
)
return True
def get_from_module(f):
return getattr(current_module, 'tasks_{}__'.format(f))
After this, you can link the add_task function to a URL, and get them to create tasks out of functions in your current module
Setting up periodic tasks in Celery (celerybeat) dynamically using add_periodic_task
EDIT: (13/01/2018)
The latest release 4.1.0 have addressed the subject in this ticket #3958 and has been merged
Actually you can't not define periodic task at the view level, because the beat schedule setting will be loaded first and can not be rescheduled at runtime:
The
add_periodic_task()
function will add the entry to the beat_schedule setting behind the scenes, and the same setting can also can be used to set up periodic tasks manually:app.conf.CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.my_task',
'schedule': 10.0,
'args': (66,)
},
}
which means if you want to use add_periodic_task()
it should be wrapped within an on_after_configure
handler at the celery app level and any modification on runtime will not take effect:
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10, my_task.s(66))
As mentioned in the doc the the regular celerybeat simply keep track of task execution:
The default scheduler is the
celery.beat.PersistentScheduler
, that simply keeps track of the last run times in a local shelve database file.
In order to be able to dynamically manage periodic tasks and reschedule celerybeat at runtime:
There’s also the django-celery-beat extension that stores the schedule in the Django database, and presents a convenient admin interface to manage periodic tasks at runtime.
The tasks will be persisted in django database and the scheduler could be updated in task model at the db level. Whenever you update a periodic task a counter in this tasks table will be incremented, and tells the celery beat service to reload the schedule from the database.
A possible solution for you could be as follow:
from django_celery_beat.models import PeriodicTask, IntervalSchedule
schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66]))
views.py
def update_task_view(request, id)
task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique
task.args=json.dumps([id])
task.save()
Related Topics
Lambda Function in List Comprehensions
How to Write a File or Data to an S3 Object Using Boto3
Django Smtpauthenticationerror
How Can One Continuously Generate and Track Several Random Objects with a Time Delay in Pygame
Deep-Learning Nan Loss Reasons
Time Complexity of Python Set Operations
How to Suppress the Newline After a Print Statement
How to Implement the Softmax Function in Python
Numpy: Find First Index of Value Fast
How to Get System Timezone Setting and Pass It to Pytz.Timezone
How to Make My Player Rotate Towards Mouse Position
Find Maximum Value of a Column and Return the Corresponding Row Values Using Pandas