一篇文章搞定Celery消息队列配置定时任务
liuian 2024-12-01 00:59 77 浏览
介绍
celery 定时器是一个调度器(scheduler);它会定时地开启(kicks off)任务,然后由集群中可用的工人(worker)来执行。
定时任务记录(entries)默认 从 beat_schedule 设置中获取,但自定义存储也可以使用,如把记录存储到SQL数据库中。
要确保同一时间一份时间表上只有一个调度器在运行,否则会因为重复发送任务而结束。使用集中途径意味着定时任务不用必须同步,并且服务无需用锁操控。
celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中) 充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
版本和要求
Celery version 4.0 runs on
Python ?2.7, 3.4, 3.5?
PyPy ?5.4, 5.5?
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
If you’re running an older version of Python, you need to be running an older version of Celery:
Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.环境准备
安装rabbitMQ或Redis
安装celery
pip3 install celery快速上手
s1.py
s1.pyimport time
from celery import Celery
app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')
@app.task
def xxxxxx(x, y):
time.sleep(10)
return x + ys2.py
from s1 import func
# func,并传入两个参数
result = xxxxxx.delay(4, 4)
print(result.id)s3.py
from celery.result import AsyncResult
from s1 import app
async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')# 执行 s1.py 创建worker(终端执行命令):
celery worker -A s1 -l info
# PS:Windows系统上执行命令时出错解决方法
pip3 install eventlet
# 后期运行修改为:
celery worker -A s1 -l info -P eventlet
# 执行 s2.py ,创建一个任务并获取任务ID:
python3 s2.py
# 执行 s3.py ,检查任务状态并获取结果:
python3 s3.py多任务结构
pro_cel
├── celery_tasks# celery相关文件夹
│ ├── celery.py # celery连接和配置相关文件
│ └── tasks.py # 所有任务函数
├── check_result.py # 检查结果
└── send_task.py # 触发任务pro_cel/celery_tasks/celery
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
celery = Celery('func',
broker='redis://192.168.111.111:6379',
backend='redis://192.168.111.111:6379',
include=['celery_tasks.tasks'])
# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = Falsepro_cel/celery_tasks/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from .celery import celery
@celery.task
def func(*args, **kwargs):
time.sleep(5)
return "任务结果"
@celery.task
def hhhhhh(*args, **kwargs):
time.sleep(5)
return "任务结果"pro_cel/check_result.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery.result import AsyncResult
from celery_tasks.celery import celery
async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')pro_cel/send_task.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import celery_tasks.tasks
# 立即告知celery去执行func任务,并传入两个参数
result = celery_tasks.tasks.func.delay(4, 4)
print(result.id)更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html
定时任务
设定时间让celery执行一个任务
import datetime
from celery_tasks.tasks import func
"""
from datetime import datetime
v1 = datetime(2020, 4, 11, 3, 0, 0)
print(v1)
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
"""
ctime = datetime.datetime.now()
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
s10 = datetime.timedelta(seconds=10)
ctime_x = utc_ctime + s10
# 使用apply_async并设定时间
result = func.apply_async(args=[1, 3], eta=ctime_x)
print(result.id)类似于contab的定时任务
"""
celery beat -A proj
celery worker -A proj -l info
"""
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='amqp://147.918.134.86:5672', backend='amqp://147.918.134.86:5672', include=['proj.s1', ])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
app.conf.beat_schedule = {
# 'add-every-10-seconds': {
# 'task': 'proj.s1.add1',
# 'schedule': 10.0,
# 'args': (16, 16)
# },
'add-every-12-seconds': {
'task': 'proj.s1.add1',
'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
'args': (16, 16)
},
}注:如果想要定时执行类似于crontab的任务,需要定制 Scheduler来完成。
Flask中应用Celery
pro_flask_celery/
├── app.py
├── celery_tasks
├── celery.py
└── tasks.pyapp.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask
from celery.result import AsyncResult
from celery_tasks import tasks
from celery_tasks.celery import celery
app = Flask(__name__)
TASK_ID = None
@app.route('/')
def index():
global TASK_ID
result = tasks.func.delay()
TASK_ID = result.id
return "任务已经提交"
@app.route('/result')
def result():
global TASK_ID
result = AsyncResult(id=TASK_ID, app=celery)
if result.ready():
return result.get()
return "xxxx"
if __name__ == '__main__':
app.run()celery_tasks/celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
from celery.schedules import crontab
celery = Celery('func',
broker='redis://192.168.110.148:6379',
backend='redis://192.168.110.148:6379',
include=['celery_tasks.tasks'])
# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = Falsecelery_task/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from .celery import celery
@celery.task
def hello(*args, **kwargs):
print('执行hello')
return "hello"
@celery.task
def func(*args, **kwargs):
print('执行func')
return "func"
@celery.task
def hhhhhh(*args, **kwargs):
time.sleep(5)
return "任务结果"记录
为了定时调用任务,你必须添加记录到打点列表中:
from celery import Celery
from celery.schedules import crontab
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# 每10秒调用 test('hello') .
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# 每30秒调用 test('world')
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# 每周一上午7:30执行
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)用on_after_configure处理器进行这些设置意味着当使用test.s()时我们不会在模块层面运行app 。
add_periodic_task() 函数在幕后会添加记录到beat_schedule设定,同样的设定可以用来手动设置定时任务:
例子: 每30秒运行 tasks.add .
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'一般会使用配置文件进行配置,如下
celeryconfig.py:
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}程序里使用
app.config_from_object('celeryconfig')
注意
如果你的参数元组里只有一个项目,只用一个逗号就可以了,不要圆括号。时间表使用时间差意味着每30秒间隔会发送任务(第一个任务在celery定时器开启后30秒发送,然后上每次距一次运行后30秒发送一次)
可使用的属性
task:要执行的任务名字
schedule:执行的频率[可以是整数秒数,时间差,或者一个周期( crontab)。你也可以自 定义你的时间表类型,通过扩展schedule接口]
args:位置参数 (list 或 tuple)
kwargs:键值参数 (dict)
options:执行选项 (dict)[这可以是任何被apply_async()支持的参数与—-exchange, routing_key, expires,等]
relative:如果 relative 是 true ,时间表“由时钟时间”安排,意味着 频率近似到最近的秒,分钟,小时或天,这取决于时间差中的时间间隔[默认relative是false,频率不会近似,会相对于celery的启动时间]
Crontab 表达式语法
开启调度
开启celery定时服务
celery -A proj beat可以把定时器嵌入到工人(worker)中,通过启用workers -B选项,如果你永远不会运行超过一个工人节点这就会很方便。但这不太常见,不推荐在生产环境这样使用
celery -A proj worker -B定时器需要在本地数据库文件(默认名为 celerybeat-schedule )存储任务上次运行时间,所以它需要在当前目录中写权限。或者你也可以给这个文件指定一个位置
celery -A proj beat -s /home/celery/var/run/celerybeat-schedule#Python##每天学python##Python入门推荐#
相关推荐
-
- 驱动网卡(怎么从新驱动网卡)
-
网卡一般是指为电脑主机提供有线无线网络功能的适配器。而网卡驱动指的就是电脑连接识别这些网卡型号的桥梁。网卡只有打上了网卡驱动才能正常使用。并不是说所有的网卡一插到电脑上面就能进行数据传输了,他都需要里面芯片组的驱动文件才能支持他进行数据传输...
-
2026-01-30 00:37 liuian
- win10更新助手装系统(微软win10更新助手)
-
1、点击首页“系统升级”的按钮,给出弹框,告诉用户需要上传IMEI码才能使用升级服务。同时给出同意和取消按钮。华为手机助手2、点击同意,则进入到“系统升级”功能华为手机助手华为手机助手3、在检测界面,...
- windows11专业版密钥最新(windows11专业版激活码永久)
-
Windows11专业版的正版密钥,我们是对windows的激活所必备的工具。该密钥我们可以通过微软商城或者通过计算机的硬件供应商去购买获得。获得了windows11专业版的正版密钥后,我...
-
- 手机删过的软件恢复(手机删除过的软件怎么恢复)
-
操作步骤:1、首先,我们需要先打开手机。然后在许多图标中找到带有[文件管理]文本的图标,然后单击“文件管理”进入页面。2、进入页面后,我们将在顶部看到一行文本:手机,最新信息,文档,视频,图片,音乐,收藏,最后是我们正在寻找的[更多],单击...
-
2026-01-29 23:55 liuian
- 一键ghost手动备份系统步骤(一键ghost 备份)
-
步骤1、首先把装有一键GHOST装系统的U盘插在电脑上,然后打开电脑马上按F2或DEL键入BIOS界面,然后就选择BOOT打USDHDD模式选择好,然后按F10键保存,电脑就会马上重启。 步骤...
- 怎么创建局域网(怎么创建局域网打游戏)
-
1、购买路由器一台。进入路由器把dhcp功能打开 2、购买一台交换机。从路由器lan端口拉出一条网线查到交换机的任意一个端口上。 3、两台以上电脑。从交换机任意端口拉出网线插到电脑上(电脑设置...
- 精灵驱动器官方下载(精灵驱动手机版下载)
-
是的。驱动精灵是一款集驱动管理和硬件检测于一体的、专业级的驱动管理和维护工具。驱动精灵为用户提供驱动备份、恢复、安装、删除、在线更新等实用功能。1、全新驱动精灵2012引擎,大幅提升硬件和驱动辨识能力...
- 一键还原系统步骤(一键还原系统有哪些)
-
1、首先需要下载安装一下Windows一键还原程序,在安装程序窗口中,点击“下一步”,弹出“用户许可协议”窗口,选择“我同意该许可协议的条款”,并点击“下一步”。 2、在弹出的“准备安装”窗口中,可...
- 电脑加速器哪个好(电脑加速器哪款好)
-
我认为pp加速器最好用,飞速土豆太懒,急速酷六根本不工作。pp加速器什么网页都加速,太任劳任怨了!以上是个人观点,具体性能请自己试。ps:我家电脑性能很好。迅游加速盒子是可以加速电脑的。因为有过之...
- 任何u盘都可以做启动盘吗(u盘必须做成启动盘才能装系统吗)
-
是的,需要注意,U盘的大小要在4G以上,最好是8G以上,因为启动盘里面需要装系统,内存小的话,不能用来安装系统。内存卡或者U盘或者移动硬盘都可以用来做启动盘安装系统。普通的U盘就可以,不过最好U盘...
- u盘怎么恢复文件(u盘文件恢复的方法)
-
开360安全卫士,点击上面的“功能大全”。点击文件恢复然后点击“数据”下的“文件恢复”功能。选择驱动接着选择需要恢复的驱动,选择接入的U盘。点击开始扫描选好就点击中间的“开始扫描”,开始扫描U盘数据。...
- 系统虚拟内存太低怎么办(系统虚拟内存占用过高什么原因)
-
1.检查系统虚拟内存使用情况,如果发现有大量的空闲内存,可以尝试释放一些不必要的进程,以释放内存空间。2.如果系统虚拟内存使用率较高,可以尝试增加系统虚拟内存的大小,以便更多的应用程序可以使用更多...
-
- 剪贴板权限设置方法(剪贴板访问权限)
-
1、首先打开iphone手机,触碰并按住单词或图像直到显示选择选项。2、其次,然后选取“拷贝”或“剪贴板”。3、勾选需要的“权限”,最后选择开启,即可完成苹果剪贴板权限设置。仅参考1.打开苹果手机设置按钮,点击【通用】。2.点击【键盘】,再...
-
2026-01-29 21:37 liuian
- 平板系统重装大师(平板重装win系统)
-
如果你的平板开不了机,但可以连接上电脑,那就能好办,楼主下载安装个平板刷机王到你的个人电脑上,然后连接你的平板,平板刷机王会自动识别你的平板,平板刷机王上有你平板的我刷机包,楼主点击下载一个,下载完成...
- 联想官网售后服务网点(联想官网售后服务热线)
-
联想3c服务中心是联想旗下的官方售后,是基于互联网O2O模式开发的全新服务平台。可以为终端用户提供多品牌手机、电脑以及其他3C类产品的维修、保养和保险服务。根据客户需求层次,联想服务针对个人及家庭客户...
- 一周热门
- 最近发表
- 标签列表
-
- python判断字典是否为空 (50)
- crontab每周一执行 (48)
- aes和des区别 (43)
- bash脚本和shell脚本的区别 (35)
- canvas库 (33)
- dataframe筛选满足条件的行 (35)
- gitlab日志 (33)
- lua xpcall (36)
- blob转json (33)
- python判断是否在列表中 (34)
- python html转pdf (36)
- 安装指定版本npm (37)
- idea搜索jar包内容 (33)
- css鼠标悬停出现隐藏的文字 (34)
- linux nacos启动命令 (33)
- gitlab 日志 (36)
- adb pull (37)
- python判断元素在不在列表里 (34)
- python 字典删除元素 (34)
- vscode切换git分支 (35)
- python bytes转16进制 (35)
- grep前后几行 (34)
- hashmap转list (35)
- c++ 字符串查找 (35)
- mysql刷新权限 (34)
