一篇文章搞定Celery消息队列配置定时任务
liuian 2024-12-01 00:59 64 浏览
介绍
celery 定时器是一个调度器(scheduler);它会定时地开启(kicks off)任务,然后由集群中可用的工人(worker)来执行。
定时任务记录(entries)默认 从 beat_schedule 设置中获取,但自定义存储也可以使用,如把记录存储到SQL数据库中。
要确保同一时间一份时间表上只有一个调度器在运行,否则会因为重复发送任务而结束。使用集中途径意味着定时任务不用必须同步,并且服务无需用锁操控。
celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中) 充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
版本和要求
Celery version 4.0 runs on
Python ?2.7, 3.4, 3.5?
PyPy ?5.4, 5.5?
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
If you’re running an older version of Python, you need to be running an older version of Celery:
Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.环境准备
安装rabbitMQ或Redis
安装celery
pip3 install celery快速上手
s1.py
s1.pyimport time
from celery import Celery
app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')
@app.task
def xxxxxx(x, y):
time.sleep(10)
return x + ys2.py
from s1 import func
# func,并传入两个参数
result = xxxxxx.delay(4, 4)
print(result.id)s3.py
from celery.result import AsyncResult
from s1 import app
async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')# 执行 s1.py 创建worker(终端执行命令):
celery worker -A s1 -l info
# PS:Windows系统上执行命令时出错解决方法
pip3 install eventlet
# 后期运行修改为:
celery worker -A s1 -l info -P eventlet
# 执行 s2.py ,创建一个任务并获取任务ID:
python3 s2.py
# 执行 s3.py ,检查任务状态并获取结果:
python3 s3.py多任务结构
pro_cel
├── celery_tasks# celery相关文件夹
│ ├── celery.py # celery连接和配置相关文件
│ └── tasks.py # 所有任务函数
├── check_result.py # 检查结果
└── send_task.py # 触发任务pro_cel/celery_tasks/celery
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
celery = Celery('func',
broker='redis://192.168.111.111:6379',
backend='redis://192.168.111.111:6379',
include=['celery_tasks.tasks'])
# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = Falsepro_cel/celery_tasks/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from .celery import celery
@celery.task
def func(*args, **kwargs):
time.sleep(5)
return "任务结果"
@celery.task
def hhhhhh(*args, **kwargs):
time.sleep(5)
return "任务结果"pro_cel/check_result.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery.result import AsyncResult
from celery_tasks.celery import celery
async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')pro_cel/send_task.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import celery_tasks.tasks
# 立即告知celery去执行func任务,并传入两个参数
result = celery_tasks.tasks.func.delay(4, 4)
print(result.id)更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html
定时任务
设定时间让celery执行一个任务
import datetime
from celery_tasks.tasks import func
"""
from datetime import datetime
v1 = datetime(2020, 4, 11, 3, 0, 0)
print(v1)
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
"""
ctime = datetime.datetime.now()
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
s10 = datetime.timedelta(seconds=10)
ctime_x = utc_ctime + s10
# 使用apply_async并设定时间
result = func.apply_async(args=[1, 3], eta=ctime_x)
print(result.id)类似于contab的定时任务
"""
celery beat -A proj
celery worker -A proj -l info
"""
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='amqp://147.918.134.86:5672', backend='amqp://147.918.134.86:5672', include=['proj.s1', ])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
app.conf.beat_schedule = {
# 'add-every-10-seconds': {
# 'task': 'proj.s1.add1',
# 'schedule': 10.0,
# 'args': (16, 16)
# },
'add-every-12-seconds': {
'task': 'proj.s1.add1',
'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
'args': (16, 16)
},
}注:如果想要定时执行类似于crontab的任务,需要定制 Scheduler来完成。
Flask中应用Celery
pro_flask_celery/
├── app.py
├── celery_tasks
├── celery.py
└── tasks.pyapp.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask
from celery.result import AsyncResult
from celery_tasks import tasks
from celery_tasks.celery import celery
app = Flask(__name__)
TASK_ID = None
@app.route('/')
def index():
global TASK_ID
result = tasks.func.delay()
TASK_ID = result.id
return "任务已经提交"
@app.route('/result')
def result():
global TASK_ID
result = AsyncResult(id=TASK_ID, app=celery)
if result.ready():
return result.get()
return "xxxx"
if __name__ == '__main__':
app.run()celery_tasks/celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
from celery.schedules import crontab
celery = Celery('func',
broker='redis://192.168.110.148:6379',
backend='redis://192.168.110.148:6379',
include=['celery_tasks.tasks'])
# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = Falsecelery_task/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from .celery import celery
@celery.task
def hello(*args, **kwargs):
print('执行hello')
return "hello"
@celery.task
def func(*args, **kwargs):
print('执行func')
return "func"
@celery.task
def hhhhhh(*args, **kwargs):
time.sleep(5)
return "任务结果"记录
为了定时调用任务,你必须添加记录到打点列表中:
from celery import Celery
from celery.schedules import crontab
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# 每10秒调用 test('hello') .
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# 每30秒调用 test('world')
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# 每周一上午7:30执行
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)用on_after_configure处理器进行这些设置意味着当使用test.s()时我们不会在模块层面运行app 。
add_periodic_task() 函数在幕后会添加记录到beat_schedule设定,同样的设定可以用来手动设置定时任务:
例子: 每30秒运行 tasks.add .
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'一般会使用配置文件进行配置,如下
celeryconfig.py:
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}程序里使用
app.config_from_object('celeryconfig')
注意
如果你的参数元组里只有一个项目,只用一个逗号就可以了,不要圆括号。时间表使用时间差意味着每30秒间隔会发送任务(第一个任务在celery定时器开启后30秒发送,然后上每次距一次运行后30秒发送一次)
可使用的属性
task:要执行的任务名字
schedule:执行的频率[可以是整数秒数,时间差,或者一个周期( crontab)。你也可以自 定义你的时间表类型,通过扩展schedule接口]
args:位置参数 (list 或 tuple)
kwargs:键值参数 (dict)
options:执行选项 (dict)[这可以是任何被apply_async()支持的参数与—-exchange, routing_key, expires,等]
relative:如果 relative 是 true ,时间表“由时钟时间”安排,意味着 频率近似到最近的秒,分钟,小时或天,这取决于时间差中的时间间隔[默认relative是false,频率不会近似,会相对于celery的启动时间]
Crontab 表达式语法
开启调度
开启celery定时服务
celery -A proj beat可以把定时器嵌入到工人(worker)中,通过启用workers -B选项,如果你永远不会运行超过一个工人节点这就会很方便。但这不太常见,不推荐在生产环境这样使用
celery -A proj worker -B定时器需要在本地数据库文件(默认名为 celerybeat-schedule )存储任务上次运行时间,所以它需要在当前目录中写权限。或者你也可以给这个文件指定一个位置
celery -A proj beat -s /home/celery/var/run/celerybeat-schedule#Python##每天学python##Python入门推荐#
相关推荐
- 戴尔笔记本电脑一开机就蓝屏
-
笔记本蓝屏可能是电脑硬盘故障,可以更换一个硬盘尝试。也可能是更新了驱动与修复漏洞补丁,可以进入安全模式将更新的驱动删除。有可能是内存条故障,可以把内存条取下来,用橡皮擦轻轻擦拭金手指,然后用毛刷将内存...
- 优酷路由宝怎么设置(优酷路由宝怎么设置网络)
-
无线连接如果准备用手机、笔记本电脑来设置优酷路由宝,需要先把WAN口,连接宽带网线(宽带猫、光猫);然后手机/笔记本电脑搜索连接到优酷路由宝的WiFi。优酷路由宝的默认WiFi名称是:Youku_开...
- 一键装机软件大全(一键装机下载)
-
1一键装机工具是一种自动化安装计算机操作系统以及常用软件的工具。2使用一键装机工具,需要先准备好需要安装的操作系统镜像和需要安装的软件列表,然后将它们放在一键装机工具所指定的位置。接下来,打开一键...
- home键是什么意思苹果手机(home键是苹果手机哪个键)
-
就是手机屏幕正下方的那个圆形的按钮,就是苹果手机的home键,home键的作用比较大,可以用来设置指纹解锁,单机home键可以返回主屏幕界面,双击home键可以弹出后台应用程序可以进行清楚,还可以通过...
- tplink说明书图片(tp-link路由器说明书步骤图)
-
第一步连接路由器WIFI在手机获取IP地址里找到路由器网关地址,第二步在浏览器地址栏输入路由器网关地址,之后会跳转到路由器管理员登录界面,输入账号密码就可以进入路由后台管理路由,如果提示路由器密码错误...
- 如何不安装flash玩4399(现在4399不提供flash如何玩游戏)
-
没有flash是玩不了的,需要开启flash才可以。1、首先打开浏览器,进入4399的游戏页面。2、进入游戏页面后,点击【已被屏蔽】文字。3、然后右上角会出现窗口,点击【管理】按钮。4、进入管理页面后...
- chrome download apk(chromedownloadapk in english)
-
手机下载安装的第三方应用出现问题,无法正常使用,建议按照以下方法操作:1.关闭重新启动该应用。2.建议将此软件卸载重新安装尝试。3.更换其他版本尝试。4.更新下手机系统版本后安装尝试5.备份手机数据(...
-
- qq空间官网手机登录网页版(qq空间官网登陆入口)
-
z.qq.com可以通过以下方式登录手机QQ空间:1、使用手机登录手机腾讯网3g.qq.com,点击“空间”,根据提示QQ号码和QQ密码就可以登录;2、通过手机直接输入手机QQ空间网址z.qq.com,根据提示操作即可登录;3、下载手机Q...
-
2025-12-22 13:55 liuian
- windows11我的电脑在哪里打开
-
1/6通过“开始”进入“设置”-“时间和语言”。2/6在“时间和语言”界面选择“区域”3/6这里我们将区域更改位“新加披”,退出。4/6打开微软自带的市场,搜索“你的手机”获取并下载。5/6安装完成后...
- win10怎么取消开机自启动(win10如何关闭开机自动启动)
-
要关闭Windows10的开机自动启动程序,你可以按下Win+R键,输入"msconfig"并按回车键打开系统配置工具。在"启动"选项卡中,你可以看到所有开机自动...
- 手机cpu排名2025(手机cpu排名榜)
-
一、2022手机CPU性能综合排名前八名手机CPU:1、型号:苹果A16---综合分数:暂无2、型号:骁龙8gen1---综合分数:42333、联发科天玑9000---综合分数:38724、...
- 论坛系统(论坛系统数据流图)
-
BBS是电子布告栏系统的简称,一种网站系统,也是目前流行网络论坛的前身。它允许用户使用终端程序通过调制解调器拨接或者因特网来进行连接,BBS站台提供布告栏、分类讨论区、新闻阅读、软件下载与上传、游戏、...
- hp1020plus打印机无法打印(惠普1020plus打印机突然不能打印了)
-
删除惠普打印机驱动和软件:1.如果你的打印机已通过USB连接到电脑,断开USB连接;2.打开控制面板—程序和功能(卸载或更改应用程序);3.在软件列表中找到惠普打印机,将其卸载;4.重启电脑...
- wifi密码破解器电脑版(wifi密码破解工具电脑版)
-
肯定不是万能钥匙这种“破解”wifi的东西。不是一两次见到把万能钥匙当做破解wifi用的人了,但实际上那玩意就是个分享wifi的软件。你连上一个wifi,密码就会被分享到云端(可以不分享),别...
- 手机临时文件夹在哪个位置(手机临时文件夹在哪个位置找)
-
1.手机文件临时文件是指在手机使用过程中产生的临时文件。2.手机应用程序在运行时需要产生一些临时文件,如缓存文件、日志文件、临时下载文件等,这些文件可以提高应用程序的运行效率和用户体验。但是,这些...
- 一周热门
- 最近发表
- 标签列表
-
- python判断字典是否为空 (50)
- crontab每周一执行 (48)
- aes和des区别 (43)
- bash脚本和shell脚本的区别 (35)
- canvas库 (33)
- dataframe筛选满足条件的行 (35)
- gitlab日志 (33)
- lua xpcall (36)
- blob转json (33)
- python判断是否在列表中 (34)
- python html转pdf (36)
- 安装指定版本npm (37)
- idea搜索jar包内容 (33)
- css鼠标悬停出现隐藏的文字 (34)
- linux nacos启动命令 (33)
- gitlab 日志 (36)
- adb pull (37)
- python判断元素在不在列表里 (34)
- python 字典删除元素 (34)
- vscode切换git分支 (35)
- python bytes转16进制 (35)
- grep前后几行 (34)
- hashmap转list (35)
- c++ 字符串查找 (35)
- mysql刷新权限 (34)
