Psutil + Flask + Pyecharts + Bootstrap 开发动态可视化系统监控
liuian 2024-12-11 15:43 619 浏览
文 | 某某白米饭
来源:Python 技术「ID: pythonall」
psutil 是一个跨平台库(http://pythonhosted.org/psutil)能够获取到系统运行的进程和系统利用率(包括CPU、内存、磁盘、网络等)信息。主要用来做系统监控,性能分析,进程管理。支持 Linux、Mac OS、Windows 系统。
本文以 psutil 模块获取系统信息开发一个监控 Mac OS 系统的平台。
准备工作
技术选择
监控的系统是 Mac OS 系统
监控系统模块选择 psutil 模块
Web 框架选择的是 Flask 框架
前端 UI 选择的是 Bootstrap UI
动态可视化图表选择 Pyecharts 模块
安装 psutil
pip3 install psutil
安装 Flask、pyecharts、Bootstrap
Flask 的教程是在公众号文章:Web 开发 Flask 介绍
Pyecharts 的教程在公众号文章:Python 图表利器 pyecharts,按照官网 (http://pyecharts.org/#/zh-cn/web_flask) 文档整合 Flask 框架,并使用定时全量更新图表。
Bootstrap 是一个 前端的 Web UI,官网地址是 (https://v4.bootcss.com)
获取系统信息
CPU信息
通过 psutil 获取 CPU 信息
>>> import psutil
# 获取当前 CPU 的利用率
>>> psutil.cpu_percent
53.8
# 获取当前 CPU 的用户/系统/空闲时间
>>> psutil.cpu_times
scputimes(user=197483.49, nice=0.0, system=114213.01, idle=1942295.68)
# 1/5/15 分钟之内的 CPU 负载
>>> psutil.getloadavg
(7.865234375, 5.1826171875, 4.37353515625)
# CPU 逻辑个数
>>> psutil.cpu_count
4
# CPU 物理个数
>>> psutil.cpu_count(logical=False)
2
在监控平台上每 2 秒请求 url 获取 CPU 负载,并动态显示图表
cpu_percent_dict = {}
def cpu:
# 当前时间
now = time.strftime('%H:%M:%S', time.localtime(time.time))
# CPU 负载
cpu_percent = psutil.cpu_percent
cpu_percent_dict[now] = cpu_percent
# 保持在图表中 10 个数据
if len(cpu_percent_dict.keys) == 11:
cpu_percent_dict.pop(list(cpu_percent_dict.keys)[0])
def cpu_line -> Line:
cpu
# 全量更新 pyecharts 图表
c = (
Line
.add_xaxis(list(cpu_percent_dict.keys))
.add_yaxis('', list(cpu_percent_dict.values), areastyle_opts=opts.AreaStyleOpts(opacity=0.5))
.set_global_opts(title_opts=opts.TitleOpts(title = now + "CPU负载",pos_left = "center"),
yaxis_opts=opts.AxisOpts(min_=0,max_=100,split_number=10,type_="value", name='%'))
)
return c
@app.route("/cpu")
def get_cpu_chart:
c = cpu_line
return c.dump_options_with_quotes
示例结果
内存
通过 psutil 获取内存和交换区信息
# 系统内存信息 总内存/立刻可用给进程使用的内存/内存负载/已使用内存/空闲内存/当前正在使用或者最近使用的内存/未使用的内存/永久在内存
>>> psutil.virtual_memory
svmem(total=8589934592, available=2610610176, percent=69.6, used=4251074560, free=387874816, active=2219110400, inactive=2069094400, wired=2031964160)
# 交换区内存 总内存/使用的内存/空闲的内存/负载/系统从磁盘交换进来的字节数(累计)/系统从磁盘中交换的字节数(累积)
>>> psutil.swap_memory
sswap(total=2147483648, used=834404352, free=1313079296, percent=38.9, sin=328911147008, sout=3249750016)
在监控平台上每 2 秒请求 url 获取内存负载,并动态显示图表
def memory:
memory = psutil.virtual_memory
swap = psutil.swap_memory
# 在 Mac OS 上 未使用内存 = 总内存 - (空闲内存 + 未使用内存)
return memory.total, memory.total - (memory.free + memory.inactive), memory.free + memory.inactive, swap.total, swap.used, swap.free, memory.percent
def memory_liquid -> Gauge:
mtotal, mused, mfree, stotal, sused, sfree, mpercent = memory
c = (
Gauge
.add("", [("", mpercent)])
.set_global_opts(title_opts=opts.TitleOpts(title="内存负载", pos_left = "center"))
)
return mtotal, mused, mfree, stotal, sused, sfree, c
@app.route("/memory")
def get_memory_chart:
mtotal, mused, mfree, stotal, sused, sfree, c = memory_liquid
return jsonify({'mtotal': mtotal, 'mused': mused, 'mfree': mfree, 'stotal': stotal, 'sused': sused, 'sfree': sfree, 'liquid': c.dump_options_with_quotes})
示例结果
磁盘
通过 psutil 获取磁盘大小、分区、使用率和磁盘IO
# 磁盘分区情况
>>> psutil.disk_partitions
[sdiskpart(device='/dev/disk1s5', mountpoint='/', fstype='apfs', opts='ro,local,rootfs,dovolfs,journaled,multilabel'), sdiskpart(device='/dev/disk1s1', mountpoint='/System/Volumes/Data', fstype='apfs', opts='rw,local,dovolfs,dontbrowse,journaled,multilabel'), sdiskpart(device='/dev/disk1s4', mountpoint='/private/var/vm', fstype='apfs', opts='rw,local,dovolfs,dontbrowse,journaled,multilabel'), sdiskpart(device='/dev/disk1s3', mountpoint='/Volumes/Recovery', fstype='apfs', opts='rw,local,dovolfs,dontbrowse,journaled,multilabel')]
# 磁盘的使用情况 磁盘总大小/已使用大小/空闲大小/负载
>>> psutil.disk_usage('/')
sdiskusage(total=250790436864, used=10872418304, free=39636717568, percent=21.5)
# 磁盘IO 读取次数/写入次数/读取数据/写入数据/磁盘读取所花费的时间/写入磁盘所花费的时间
>>> psutil.disk_io_counters
sdiskio(read_count=26404943, write_count=11097500, read_bytes=609467826688, write_bytes=464322912256, read_time=7030486, write_time=2681553)
在监控平台上每 2 秒请求 url 获取磁盘信息,并动态显示图表
disk_dict = {'disk_time':, 'write_bytes': , 'read_bytes': , 'pre_write_bytes': 0, 'pre_read_bytes': 0, 'len': -1}
def disk:
disk_usage = psutil.disk_usage('/')
disk_used = 0
# 磁盘已使用大小 = 每个分区的总和
partitions = psutil.disk_partitions
for partition in partitions:
partition_disk_usage = psutil.disk_usage(partition[1])
disk_used = partition_disk_usage.used + disk_used
now = time.strftime('%H:%M:%S', time.localtime(time.time))
count = psutil.disk_io_counters
read_bytes = count.read_bytes
write_bytes = count.write_bytes
# 第一次请求
if disk_dict['len'] == -1:
disk_dict['pre_write_bytes'] = write_bytes
disk_dict['pre_read_bytes'] = read_bytes
disk_dict['len'] = 0
return disk_usage.total, disk_used, disk_usage.free
# 当前速率=现在写入/读取的总字节-前一次请求写入/读取的总字节
disk_dict['write_bytes'].append((write_bytes - disk_dict['pre_write_bytes'])/1024)
disk_dict['read_bytes'].append((read_bytes - disk_dict['pre_read_bytes'])/ 1024)
disk_dict['disk_time'].append(now)
disk_dict['len'] = disk_dict['len'] + 1
# 把现在写入/读取的总字节放入前一个请求的变量中
disk_dict['pre_write_bytes'] = write_bytes
disk_dict['pre_read_bytes'] = read_bytes
# 保持在图表中 50 个数据
if disk_dict['len'] == 51:
disk_dict['write_bytes'].pop(0)
disk_dict['read_bytes'].pop(0)
disk_dict['disk_time'].pop(0)
disk_dict['len'] = disk_dict['len'] - 1
return disk_usage.total, disk_used, disk_usage.free
def disk_line -> Line:
total, used, free = disk
c = (
Line(init_opts=opts.InitOpts(width="1680px", height="800px"))
.add_xaxis(xaxis_data=disk_dict['disk_time'])
.add_yaxis(
series_name="写入数据",
y_axis=disk_dict['write_bytes'],
areastyle_opts=opts.AreaStyleOpts(opacity=0.5),
linestyle_opts=opts.LineStyleOpts,
label_opts=opts.LabelOpts(is_show=False),
)
.add_yaxis(
series_name="读取数据",
y_axis=disk_dict['read_bytes'],
yaxis_index=1,
areastyle_opts=opts.AreaStyleOpts(opacity=0.5),
linestyle_opts=opts.LineStyleOpts,
label_opts=opts.LabelOpts(is_show=False),
)
.extend_axis(
yaxis=opts.AxisOpts(
name_location="start",
type_="value",
is_inverse=True,
axistick_opts=opts.AxisTickOpts(is_show=True),
splitline_opts=opts.SplitLineOpts(is_show=True),
name='KB/2S'
)
)
.set_global_opts(
title_opts=opts.TitleOpts(
title="磁盘IO",
pos_left="center",
pos_top="top",
),
tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="cross"),
legend_opts=opts.LegendOpts(pos_left="left"),
xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
yaxis_opts=opts.AxisOpts( type_="value", name='KB/2S'),
)
.set_series_opts(
axisline_opts=opts.AxisLineOpts,
)
)
return total, used, free, c
@app.route("/disk")
def get_disk_chart:
total, used, free, c = disk_line
return jsonify({'total': total, 'used': used, 'free': free, 'line': c.dump_options_with_quotes})
示例结果
网卡
通过 psutil 获取网络接口和网络连接的信息
# 获取网络字节数和包的个数 发送的字节数/收到的字节数/发送的包数/收到的包数
>>> psutil.net_io_counters
snetio(bytes_sent=9257984, bytes_recv=231398400, packets_sent=93319, packets_recv=189501, errin=0, errout=0, dropin=0, dropout=0)
# 获取当前的网络连接 注意:net_connections 需要用管理员权限运行 Python 文件
>>> psutil.net_connections
[sconn(fd=6, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_STREAM: 1>, laddr=addr(ip='192.168.5.31', port=50541), raddr=addr(ip='17.248.159.145', port=443), status='ESTABLISHED', pid=1897),
sconn(fd=12, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_STREAM: 1>, laddr=addr(ip='192.168.5.31', port=50543), raddr=addr(ip='17.250.120.9', port=443), status='ESTABLISHED', pid=1897),
sconn(fd=6, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_DGRAM: 2>, laddr=addr(ip='0.0.0.0', port=0), raddr=(), status='NONE', pid=1790),
sconn(fd=10, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_DGRAM: 2>, laddr=addr(ip='0.0.0.0', port=0), raddr=(), status='NONE', pid=1790),
sconn(fd=11, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_DGRAM: 2>, laddr=addr(ip='0.0.0.0', port=0), raddr=(), status='NONE', pid=1790),
...
sconn(fd=30, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_DGRAM: 2>, laddr=addr(ip='0.0.0.0', port=137), raddr=(), status='NONE', pid=1),
sconn(fd=31, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_DGRAM: 2>, laddr=addr(ip='0.0.0.0', port=138), raddr=(), status='NONE', pid=1)]
# 获取网络接口信息
>>> psutil.net_if_addrs
{'lo0': [snicaddr(family=<AddressFamily.AF_INET: 2>, address='127.0.0.1', netmask='255.0.0.0', broadcast=None, ptp=None),
snicaddr(family=<AddressFamily.AF_INET6: 30>, address='::1', netmask='ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff', broadcast=None, ptp=None), snicaddr(family=<AddressFamily.AF_INET6: 30>, address='fe80::1%lo0', netmask='ffff:ffff:ffff:ffff::', broadcast=None, ptp=None)],
...,
'utun1': [snicaddr(family=<AddressFamily.AF_INET6: 30>, address='fe80::b519:e5df:2bd4:857e%utun1', netmask='ffff:ffff:ffff:ffff::', broadcast=None, ptp=None)]}
# 获取网络接口的状态
>>> psutil.net_if_stats
{'lo0': snicstats(isup=True, duplex=<NicDuplex.NIC_DUPLEX_UNKNOWN: 0>, speed=0, mtu=16384),
...
'utun1': snicstats(isup=True, duplex=<NicDuplex.NIC_DUPLEX_UNKNOWN: 0>, speed=0, mtu=2000)}
在监控平台上每 2 秒请求 url 获取网卡IO,并动态显示图表
net_io_dict = {'net_io_time':, 'net_io_sent': , 'net_io_recv': , 'pre_sent': 0, 'pre_recv': 0, 'len': -1}
def net_io:
now = time.strftime('%H:%M:%S', time.localtime(time.time))
# 获取网络信息
count = psutil.net_io_counters
g_sent = count.bytes_sent
g_recv = count.bytes_recv
# 第一次请求
if net_io_dict['len'] == -1:
net_io_dict['pre_sent'] = g_sent
net_io_dict['pre_recv'] = g_recv
net_io_dict['len'] = 0
return
# 当前网络发送/接收的字节速率 = 现在网络发送/接收的总字节 - 前一次请求网络发送/接收的总字节
net_io_dict['net_io_sent'].append(g_sent - net_io_dict['pre_sent'])
net_io_dict['net_io_recv'].append(g_recv - net_io_dict['pre_recv'])
net_io_dict['net_io_time'].append(now)
net_io_dict['len'] = net_io_dict['len'] + 1
net_io_dict['pre_sent'] = g_sent
net_io_dict['pre_recv'] = g_recv
# 保持在图表中 10 个数据
if net_io_dict['len'] == 11:
net_io_dict['net_io_sent'].pop(0)
net_io_dict['net_io_recv'].pop(0)
net_io_dict['net_io_time'].pop(0)
net_io_dict['len'] = net_io_dict['len'] - 1
def net_io_line -> Line:
net_io
c = (
Line
.add_xaxis(net_io_dict['net_io_time'])
.add_yaxis("发送字节数", net_io_dict['net_io_sent'], is_smooth=True)
.add_yaxis("接收字节数", net_io_dict['net_io_recv'], is_smooth=True)
.set_series_opts(
areastyle_opts=opts.AreaStyleOpts(opacity=0.5),
label_opts=opts.LabelOpts(is_show=False),
)
.set_global_opts(
title_opts=opts.TitleOpts(title="网卡IO/2秒"),
xaxis_opts=opts.AxisOpts(
axistick_opts=opts.AxisTickOpts(is_align_with_label=True),
is_scale=False,
boundary_gap=False,
),
))
return c
@app.route("/netio")
def get_net_io_chart:
c = net_io_line
return c.dump_options_with_quotes
示例结果
进程
通过 psutil 可以获取所有进程的信息
# 所有进程的 pid
>>> psutil.pids
[0, 1, 134, 135, 138, 139, 140, 141, 144, 145, 147, 152, ..., 30400, 97792]
# 单个进程
>>> p = psutil.Process(30400)
# 名称
>>> p.name
'pycharm'
# 使用内存负载
>>> p.memory_percent
12.838459014892578
# 启动时间
>>> p.create_time
1587029962.493182
# 路径
>>> p.exe
'/Applications/PyCharm.app/Contents/MacOS/pycharm'
# 状态
>>> p.status
'running'
# 用户名
>>> p.username
'imeng'
# 内存信息
>>> p.memory_info
pmem(rss=1093005312, vms=9914318848, pfaults=7813313, pageins=8448)
列出所有不需要权限的进程
def process:
result =
process_list =
pid = psutil.pids
for k, i in enumerate(pid):
try:
proc = psutil.Process(i)
ctime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(proc.create_time))
process_list.append((str(i), proc.name, proc.cpu_percent, proc.memory_percent, ctime))
except psutil.AccessDenied:
# 需要管理员权限
pass
except psutil.NoSuchProcess:
pass
except SystemError:
pass
# 按负载排序
process_list.sort(key=process_sort, reverse=True)
for i in process_list:
result.append({'PID': i[0], 'name': i[1], 'cpu': i[2], 'mem': "%.2f%%"%i[3], 'ctime': i[4]})
return jsonify({'list', result})
def process_sort(elem):
return elem[3]
@app.route("/process")
def get_process_tab:
c = process
return c
@app.route("/delprocess")
def del_process:
pid = request.args.get("pid")
os.kill(int(pid), signal.SIGKILL)
return jsonify({'status': 'OK'})
示例结果
总结
本文以 Psutil + Flask + Pyecharts + Bootstrap 开发一个简单的系统监控平台,可以算做是本公众号内容的一个学以致用。在 Psutil 还有许多方法文章没有列举感兴趣的小伙伴可以去尝试并使用。
PS:公号内回复 :Python,即可进入Python 新手学习交流群,一起100天计划!
老规矩,兄弟们还记得么,,如果感觉文章内容不错的话,记得分享朋友圈让更多的人知道!
【代码获取方式】
相关推荐
- 教你把多个视频合并成一个视频的方法
-
一.情况介绍当你有一个m3u8文件和一个目录,目录中有连续的视频片段,这些片段可以连成一段完整的视频。m3u8文件打开后像这样:m3u8文件,可以理解为播放列表,里面是播放视频片段的顺序。视频片段像这...
- 零代码编程:用kimichat合并一个文件夹下的多个文件
-
一个文件夹里面有很多个srt字幕文件,如何借助kimichat来自动批量合并呢?在kimichat对话框中输入提示词:你是一个Python编程专家,完成如下的编程任务:这个文件夹:D:\downloa...
- Java APT_java APT 生成代码
-
JavaAPT(AnnotationProcessingTool)是一种在Java编译阶段处理注解的工具。APT会在编译阶段扫描源代码中的注解,并根据这些注解生成代码、资源文件或其他输出,...
- Unit Runtime:一键运行 AI 生成的代码,或许将成为你的复制 + 粘贴神器
-
在我们构建了UnitMesh架构之后,以及对应的demo之后,便着手于实现UnitMesh架构。于是,我们就继续开始UnitRuntime,以用于直接运行AI生成的代码。PS:...
- 挣脱臃肿的枷锁:为什么说Vert.x是Java开发者手中的一柄利剑?
-
如果你是一名Java开发者,那么你的职业生涯几乎无法避开Spring。它如同一位德高望重的老国王,统治着企业级应用开发的大片疆土。SpringBoot的约定大于配置、SpringCloud的微服务...
- 五年后,谷歌还在全力以赴发展 Kotlin
-
作者|FredericLardinois译者|Sambodhi策划|Tina自2017年谷歌I/O全球开发者大会上,谷歌首次宣布将Kotlin(JetBrains开发的Ja...
- kotlin和java开发哪个好,优缺点对比
-
Kotlin和Java都是常见的编程语言,它们有各自的优缺点。Kotlin的优点:简洁:Kotlin程序相对于Java程序更简洁,可以减少代码量。安全:Kotlin在类型系统和空值安全...
- 移动端架构模式全景解析:从MVC到MVVM,如何选择最佳设计方案?
-
掌握不同架构模式的精髓,是构建可维护、可测试且高效移动应用的关键。在移动应用开发中,选择合适的软件架构模式对项目的可维护性、可测试性和团队协作效率至关重要。随着应用复杂度的增加,一个良好的架构能够帮助...
- 颜值非常高的XShell替代工具Termora,不一样的使用体验!
-
Termora是一款面向开发者和运维人员的跨平台SSH终端与文件管理工具,支持Windows、macOS及Linux系统,通过一体化界面简化远程服务器管理流程。其核心定位是解决多平台环境下远程连接、文...
- 预处理的底层原理和预处理编译运行异常的解决方案
-
若文章对您有帮助,欢迎关注程序员小迷。助您在编程路上越走越好![Mac-10.7.1LionIntel-based]Q:预处理到底干了什么事情?A:预处理,顾名思义,预先做的处理。源代码中...
- 为“架构”再建个模:如何用代码描述软件架构?
-
在架构治理平台ArchGuard中,为了实现对架构的治理,我们需要代码+模型描述所要处理的内容和数据。所以,在ArchGuard中,我们有了代码的模型、依赖的模型、变更的模型等,剩下的两个...
- 深度解析:Google Gemma 3n —— 移动优先的轻量多模态大模型
-
2025年6月,Google正式发布了Gemma3n,这是一款能够在2GB内存环境下运行的轻量级多模态大模型。它延续了Gemma家族的开源基因,同时在架构设计上大幅优化,目标是让...
- 比分网开发技术栈与功能详解_比分网有哪些
-
一、核心功能模块一个基本的比分网通常包含以下模块:首页/总览实时比分看板:滚动展示所有正在进行的比赛,包含比分、比赛时间、红黄牌等关键信息。热门赛事/焦点战:突出显示重要的、关注度高的比赛。赛事导航...
- 设计模式之-生成器_一键生成设计
-
一、【概念定义】——“分步构建复杂对象,隐藏创建细节”生成器模式(BuilderPattern):一种“分步构建型”创建型设计模式,它将一个复杂对象的构建与其表示分离,使得同样的构建过程可以创建...
- 构建第一个 Kotlin Android 应用_kotlin简介
-
第一步:安装AndroidStudio(推荐IDE)AndroidStudio是官方推荐的Android开发集成开发环境(IDE),内置对Kotlin的完整支持。1.下载And...
- 一周热门
-
-
【验证码逆向专栏】vaptcha 手势验证码逆向分析
-
Psutil + Flask + Pyecharts + Bootstrap 开发动态可视化系统监控
-
一个解决支持HTML/CSS/JS网页转PDF(高质量)的终极解决方案
-
再见Swagger UI 国人开源了一款超好用的 API 文档生成框架,真香
-
网页转成pdf文件的经验分享 网页转成pdf文件的经验分享怎么弄
-
C++ std::vector 简介
-
飞牛OS入门安装遇到问题,如何解决?
-
系统C盘清理:微信PC端文件清理,扩大C盘可用空间步骤
-
10款高性能NAS丨双十一必看,轻松搞定虚拟机、Docker、软路由
-
python使用fitz模块提取pdf中的图片
-
- 最近发表
- 标签列表
-
- python判断字典是否为空 (50)
- crontab每周一执行 (48)
- aes和des区别 (43)
- bash脚本和shell脚本的区别 (35)
- canvas库 (33)
- dataframe筛选满足条件的行 (35)
- gitlab日志 (33)
- lua xpcall (36)
- blob转json (33)
- python判断是否在列表中 (34)
- python html转pdf (36)
- 安装指定版本npm (37)
- idea搜索jar包内容 (33)
- css鼠标悬停出现隐藏的文字 (34)
- linux nacos启动命令 (33)
- gitlab 日志 (36)
- adb pull (37)
- python判断元素在不在列表里 (34)
- python 字典删除元素 (34)
- vscode切换git分支 (35)
- python bytes转16进制 (35)
- grep前后几行 (34)
- hashmap转list (35)
- c++ 字符串查找 (35)
- mysql刷新权限 (34)