Psutil + Flask + Pyecharts + Bootstrap 开发动态可视化系统监控
liuian 2024-12-11 15:43 536 浏览
文 | 某某白米饭
来源:Python 技术「ID: pythonall」
psutil 是一个跨平台库(http://pythonhosted.org/psutil)能够获取到系统运行的进程和系统利用率(包括CPU、内存、磁盘、网络等)信息。主要用来做系统监控,性能分析,进程管理。支持 Linux、Mac OS、Windows 系统。
本文以 psutil 模块获取系统信息开发一个监控 Mac OS 系统的平台。
准备工作
技术选择
监控的系统是 Mac OS 系统
监控系统模块选择 psutil 模块
Web 框架选择的是 Flask 框架
前端 UI 选择的是 Bootstrap UI
动态可视化图表选择 Pyecharts 模块
安装 psutil
pip3 install psutil
安装 Flask、pyecharts、Bootstrap
Flask 的教程是在公众号文章:Web 开发 Flask 介绍
Pyecharts 的教程在公众号文章:Python 图表利器 pyecharts,按照官网 (http://pyecharts.org/#/zh-cn/web_flask) 文档整合 Flask 框架,并使用定时全量更新图表。
Bootstrap 是一个 前端的 Web UI,官网地址是 (https://v4.bootcss.com)
获取系统信息
CPU信息
通过 psutil 获取 CPU 信息
>>> import psutil
# 获取当前 CPU 的利用率
>>> psutil.cpu_percent
53.8
# 获取当前 CPU 的用户/系统/空闲时间
>>> psutil.cpu_times
scputimes(user=197483.49, nice=0.0, system=114213.01, idle=1942295.68)
# 1/5/15 分钟之内的 CPU 负载
>>> psutil.getloadavg
(7.865234375, 5.1826171875, 4.37353515625)
# CPU 逻辑个数
>>> psutil.cpu_count
4
# CPU 物理个数
>>> psutil.cpu_count(logical=False)
2
在监控平台上每 2 秒请求 url 获取 CPU 负载,并动态显示图表
cpu_percent_dict = {}
def cpu:
# 当前时间
now = time.strftime('%H:%M:%S', time.localtime(time.time))
# CPU 负载
cpu_percent = psutil.cpu_percent
cpu_percent_dict[now] = cpu_percent
# 保持在图表中 10 个数据
if len(cpu_percent_dict.keys) == 11:
cpu_percent_dict.pop(list(cpu_percent_dict.keys)[0])
def cpu_line -> Line:
cpu
# 全量更新 pyecharts 图表
c = (
Line
.add_xaxis(list(cpu_percent_dict.keys))
.add_yaxis('', list(cpu_percent_dict.values), areastyle_opts=opts.AreaStyleOpts(opacity=0.5))
.set_global_opts(title_opts=opts.TitleOpts(title = now + "CPU负载",pos_left = "center"),
yaxis_opts=opts.AxisOpts(min_=0,max_=100,split_number=10,type_="value", name='%'))
)
return c
@app.route("/cpu")
def get_cpu_chart:
c = cpu_line
return c.dump_options_with_quotes
示例结果
内存
通过 psutil 获取内存和交换区信息
# 系统内存信息 总内存/立刻可用给进程使用的内存/内存负载/已使用内存/空闲内存/当前正在使用或者最近使用的内存/未使用的内存/永久在内存
>>> psutil.virtual_memory
svmem(total=8589934592, available=2610610176, percent=69.6, used=4251074560, free=387874816, active=2219110400, inactive=2069094400, wired=2031964160)
# 交换区内存 总内存/使用的内存/空闲的内存/负载/系统从磁盘交换进来的字节数(累计)/系统从磁盘中交换的字节数(累积)
>>> psutil.swap_memory
sswap(total=2147483648, used=834404352, free=1313079296, percent=38.9, sin=328911147008, sout=3249750016)
在监控平台上每 2 秒请求 url 获取内存负载,并动态显示图表
def memory:
memory = psutil.virtual_memory
swap = psutil.swap_memory
# 在 Mac OS 上 未使用内存 = 总内存 - (空闲内存 + 未使用内存)
return memory.total, memory.total - (memory.free + memory.inactive), memory.free + memory.inactive, swap.total, swap.used, swap.free, memory.percent
def memory_liquid -> Gauge:
mtotal, mused, mfree, stotal, sused, sfree, mpercent = memory
c = (
Gauge
.add("", [("", mpercent)])
.set_global_opts(title_opts=opts.TitleOpts(title="内存负载", pos_left = "center"))
)
return mtotal, mused, mfree, stotal, sused, sfree, c
@app.route("/memory")
def get_memory_chart:
mtotal, mused, mfree, stotal, sused, sfree, c = memory_liquid
return jsonify({'mtotal': mtotal, 'mused': mused, 'mfree': mfree, 'stotal': stotal, 'sused': sused, 'sfree': sfree, 'liquid': c.dump_options_with_quotes})
示例结果
磁盘
通过 psutil 获取磁盘大小、分区、使用率和磁盘IO
# 磁盘分区情况
>>> psutil.disk_partitions
[sdiskpart(device='/dev/disk1s5', mountpoint='/', fstype='apfs', opts='ro,local,rootfs,dovolfs,journaled,multilabel'), sdiskpart(device='/dev/disk1s1', mountpoint='/System/Volumes/Data', fstype='apfs', opts='rw,local,dovolfs,dontbrowse,journaled,multilabel'), sdiskpart(device='/dev/disk1s4', mountpoint='/private/var/vm', fstype='apfs', opts='rw,local,dovolfs,dontbrowse,journaled,multilabel'), sdiskpart(device='/dev/disk1s3', mountpoint='/Volumes/Recovery', fstype='apfs', opts='rw,local,dovolfs,dontbrowse,journaled,multilabel')]
# 磁盘的使用情况 磁盘总大小/已使用大小/空闲大小/负载
>>> psutil.disk_usage('/')
sdiskusage(total=250790436864, used=10872418304, free=39636717568, percent=21.5)
# 磁盘IO 读取次数/写入次数/读取数据/写入数据/磁盘读取所花费的时间/写入磁盘所花费的时间
>>> psutil.disk_io_counters
sdiskio(read_count=26404943, write_count=11097500, read_bytes=609467826688, write_bytes=464322912256, read_time=7030486, write_time=2681553)
在监控平台上每 2 秒请求 url 获取磁盘信息,并动态显示图表
disk_dict = {'disk_time':, 'write_bytes': , 'read_bytes': , 'pre_write_bytes': 0, 'pre_read_bytes': 0, 'len': -1}
def disk:
disk_usage = psutil.disk_usage('/')
disk_used = 0
# 磁盘已使用大小 = 每个分区的总和
partitions = psutil.disk_partitions
for partition in partitions:
partition_disk_usage = psutil.disk_usage(partition[1])
disk_used = partition_disk_usage.used + disk_used
now = time.strftime('%H:%M:%S', time.localtime(time.time))
count = psutil.disk_io_counters
read_bytes = count.read_bytes
write_bytes = count.write_bytes
# 第一次请求
if disk_dict['len'] == -1:
disk_dict['pre_write_bytes'] = write_bytes
disk_dict['pre_read_bytes'] = read_bytes
disk_dict['len'] = 0
return disk_usage.total, disk_used, disk_usage.free
# 当前速率=现在写入/读取的总字节-前一次请求写入/读取的总字节
disk_dict['write_bytes'].append((write_bytes - disk_dict['pre_write_bytes'])/1024)
disk_dict['read_bytes'].append((read_bytes - disk_dict['pre_read_bytes'])/ 1024)
disk_dict['disk_time'].append(now)
disk_dict['len'] = disk_dict['len'] + 1
# 把现在写入/读取的总字节放入前一个请求的变量中
disk_dict['pre_write_bytes'] = write_bytes
disk_dict['pre_read_bytes'] = read_bytes
# 保持在图表中 50 个数据
if disk_dict['len'] == 51:
disk_dict['write_bytes'].pop(0)
disk_dict['read_bytes'].pop(0)
disk_dict['disk_time'].pop(0)
disk_dict['len'] = disk_dict['len'] - 1
return disk_usage.total, disk_used, disk_usage.free
def disk_line -> Line:
total, used, free = disk
c = (
Line(init_opts=opts.InitOpts(width="1680px", height="800px"))
.add_xaxis(xaxis_data=disk_dict['disk_time'])
.add_yaxis(
series_name="写入数据",
y_axis=disk_dict['write_bytes'],
areastyle_opts=opts.AreaStyleOpts(opacity=0.5),
linestyle_opts=opts.LineStyleOpts,
label_opts=opts.LabelOpts(is_show=False),
)
.add_yaxis(
series_name="读取数据",
y_axis=disk_dict['read_bytes'],
yaxis_index=1,
areastyle_opts=opts.AreaStyleOpts(opacity=0.5),
linestyle_opts=opts.LineStyleOpts,
label_opts=opts.LabelOpts(is_show=False),
)
.extend_axis(
yaxis=opts.AxisOpts(
name_location="start",
type_="value",
is_inverse=True,
axistick_opts=opts.AxisTickOpts(is_show=True),
splitline_opts=opts.SplitLineOpts(is_show=True),
name='KB/2S'
)
)
.set_global_opts(
title_opts=opts.TitleOpts(
title="磁盘IO",
pos_left="center",
pos_top="top",
),
tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="cross"),
legend_opts=opts.LegendOpts(pos_left="left"),
xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
yaxis_opts=opts.AxisOpts( type_="value", name='KB/2S'),
)
.set_series_opts(
axisline_opts=opts.AxisLineOpts,
)
)
return total, used, free, c
@app.route("/disk")
def get_disk_chart:
total, used, free, c = disk_line
return jsonify({'total': total, 'used': used, 'free': free, 'line': c.dump_options_with_quotes})
示例结果
网卡
通过 psutil 获取网络接口和网络连接的信息
# 获取网络字节数和包的个数 发送的字节数/收到的字节数/发送的包数/收到的包数
>>> psutil.net_io_counters
snetio(bytes_sent=9257984, bytes_recv=231398400, packets_sent=93319, packets_recv=189501, errin=0, errout=0, dropin=0, dropout=0)
# 获取当前的网络连接 注意:net_connections 需要用管理员权限运行 Python 文件
>>> psutil.net_connections
[sconn(fd=6, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_STREAM: 1>, laddr=addr(ip='192.168.5.31', port=50541), raddr=addr(ip='17.248.159.145', port=443), status='ESTABLISHED', pid=1897),
sconn(fd=12, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_STREAM: 1>, laddr=addr(ip='192.168.5.31', port=50543), raddr=addr(ip='17.250.120.9', port=443), status='ESTABLISHED', pid=1897),
sconn(fd=6, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_DGRAM: 2>, laddr=addr(ip='0.0.0.0', port=0), raddr=(), status='NONE', pid=1790),
sconn(fd=10, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_DGRAM: 2>, laddr=addr(ip='0.0.0.0', port=0), raddr=(), status='NONE', pid=1790),
sconn(fd=11, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_DGRAM: 2>, laddr=addr(ip='0.0.0.0', port=0), raddr=(), status='NONE', pid=1790),
...
sconn(fd=30, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_DGRAM: 2>, laddr=addr(ip='0.0.0.0', port=137), raddr=(), status='NONE', pid=1),
sconn(fd=31, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_DGRAM: 2>, laddr=addr(ip='0.0.0.0', port=138), raddr=(), status='NONE', pid=1)]
# 获取网络接口信息
>>> psutil.net_if_addrs
{'lo0': [snicaddr(family=<AddressFamily.AF_INET: 2>, address='127.0.0.1', netmask='255.0.0.0', broadcast=None, ptp=None),
snicaddr(family=<AddressFamily.AF_INET6: 30>, address='::1', netmask='ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff', broadcast=None, ptp=None), snicaddr(family=<AddressFamily.AF_INET6: 30>, address='fe80::1%lo0', netmask='ffff:ffff:ffff:ffff::', broadcast=None, ptp=None)],
...,
'utun1': [snicaddr(family=<AddressFamily.AF_INET6: 30>, address='fe80::b519:e5df:2bd4:857e%utun1', netmask='ffff:ffff:ffff:ffff::', broadcast=None, ptp=None)]}
# 获取网络接口的状态
>>> psutil.net_if_stats
{'lo0': snicstats(isup=True, duplex=<NicDuplex.NIC_DUPLEX_UNKNOWN: 0>, speed=0, mtu=16384),
...
'utun1': snicstats(isup=True, duplex=<NicDuplex.NIC_DUPLEX_UNKNOWN: 0>, speed=0, mtu=2000)}
在监控平台上每 2 秒请求 url 获取网卡IO,并动态显示图表
net_io_dict = {'net_io_time':, 'net_io_sent': , 'net_io_recv': , 'pre_sent': 0, 'pre_recv': 0, 'len': -1}
def net_io:
now = time.strftime('%H:%M:%S', time.localtime(time.time))
# 获取网络信息
count = psutil.net_io_counters
g_sent = count.bytes_sent
g_recv = count.bytes_recv
# 第一次请求
if net_io_dict['len'] == -1:
net_io_dict['pre_sent'] = g_sent
net_io_dict['pre_recv'] = g_recv
net_io_dict['len'] = 0
return
# 当前网络发送/接收的字节速率 = 现在网络发送/接收的总字节 - 前一次请求网络发送/接收的总字节
net_io_dict['net_io_sent'].append(g_sent - net_io_dict['pre_sent'])
net_io_dict['net_io_recv'].append(g_recv - net_io_dict['pre_recv'])
net_io_dict['net_io_time'].append(now)
net_io_dict['len'] = net_io_dict['len'] + 1
net_io_dict['pre_sent'] = g_sent
net_io_dict['pre_recv'] = g_recv
# 保持在图表中 10 个数据
if net_io_dict['len'] == 11:
net_io_dict['net_io_sent'].pop(0)
net_io_dict['net_io_recv'].pop(0)
net_io_dict['net_io_time'].pop(0)
net_io_dict['len'] = net_io_dict['len'] - 1
def net_io_line -> Line:
net_io
c = (
Line
.add_xaxis(net_io_dict['net_io_time'])
.add_yaxis("发送字节数", net_io_dict['net_io_sent'], is_smooth=True)
.add_yaxis("接收字节数", net_io_dict['net_io_recv'], is_smooth=True)
.set_series_opts(
areastyle_opts=opts.AreaStyleOpts(opacity=0.5),
label_opts=opts.LabelOpts(is_show=False),
)
.set_global_opts(
title_opts=opts.TitleOpts(title="网卡IO/2秒"),
xaxis_opts=opts.AxisOpts(
axistick_opts=opts.AxisTickOpts(is_align_with_label=True),
is_scale=False,
boundary_gap=False,
),
))
return c
@app.route("/netio")
def get_net_io_chart:
c = net_io_line
return c.dump_options_with_quotes
示例结果
进程
通过 psutil 可以获取所有进程的信息
# 所有进程的 pid
>>> psutil.pids
[0, 1, 134, 135, 138, 139, 140, 141, 144, 145, 147, 152, ..., 30400, 97792]
# 单个进程
>>> p = psutil.Process(30400)
# 名称
>>> p.name
'pycharm'
# 使用内存负载
>>> p.memory_percent
12.838459014892578
# 启动时间
>>> p.create_time
1587029962.493182
# 路径
>>> p.exe
'/Applications/PyCharm.app/Contents/MacOS/pycharm'
# 状态
>>> p.status
'running'
# 用户名
>>> p.username
'imeng'
# 内存信息
>>> p.memory_info
pmem(rss=1093005312, vms=9914318848, pfaults=7813313, pageins=8448)
列出所有不需要权限的进程
def process:
result =
process_list =
pid = psutil.pids
for k, i in enumerate(pid):
try:
proc = psutil.Process(i)
ctime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(proc.create_time))
process_list.append((str(i), proc.name, proc.cpu_percent, proc.memory_percent, ctime))
except psutil.AccessDenied:
# 需要管理员权限
pass
except psutil.NoSuchProcess:
pass
except SystemError:
pass
# 按负载排序
process_list.sort(key=process_sort, reverse=True)
for i in process_list:
result.append({'PID': i[0], 'name': i[1], 'cpu': i[2], 'mem': "%.2f%%"%i[3], 'ctime': i[4]})
return jsonify({'list', result})
def process_sort(elem):
return elem[3]
@app.route("/process")
def get_process_tab:
c = process
return c
@app.route("/delprocess")
def del_process:
pid = request.args.get("pid")
os.kill(int(pid), signal.SIGKILL)
return jsonify({'status': 'OK'})
示例结果
总结
本文以 Psutil + Flask + Pyecharts + Bootstrap 开发一个简单的系统监控平台,可以算做是本公众号内容的一个学以致用。在 Psutil 还有许多方法文章没有列举感兴趣的小伙伴可以去尝试并使用。
PS:公号内回复 :Python,即可进入Python 新手学习交流群,一起100天计划!
老规矩,兄弟们还记得么,,如果感觉文章内容不错的话,记得分享朋友圈让更多的人知道!
【代码获取方式】
相关推荐
- 面试怕被问Hashmap,多看看这个文章
-
o数据结构otable数组长度永远为2的幂次方o那么为什么要把数组长度设计为2的幂次方呢?o扩容o链表树化o红黑树拆分o查找o插入o删除o遍历oequasl和hashcode总结HashMap是面试中...
- 非常简洁地重试Retry组件,使用起来杠杠的
-
前言小伙伴是不是经常遇到接口调用异常,超时的场景?尤其网络抖动导致timeout超时的场景,我们一般产品就会叫我们要重试几次。很多小伙伴的实现方式是写个循环调用for(inti=1;i<=3;...
- Kafka消息可靠传输之幂等、事务机制
-
一般而言,消息中间件的消息传输保障有3个层级,分别如下。atmostonce:至多一次。消息可能会丢失,但绝对不会重复传输。atleastonce:最少一次。消息绝不会丢失,但可能会重复传输。...
- Seata源码—9.Seata XA模式的事务处理
-
大纲1.SeataXA分布式事务案例及AT与XA的区别2.SeataXA分布式事务案例的各模块运行流程3.Seata使用SpringBoot自动装配简化复杂配置4.全局事务注解扫描组件的自动装配...
- Disruptor—3.核心源码实现分析一
-
大纲1.Disruptor的生产者源码分析2.Disruptor的消费者源码分析3.Disruptor的WaitStrategy等待策略分析4.Disruptor的高性能原因5.Disruptor高性...
- Spring Boot 进阶-详解SpringBoot中条件注解使用
-
作为使用SpringBoot框架的开发者来讲,如果你连如下的这些注解你都没有听说过,没有用过,那我劝你还是放弃吧?在SpringBoot中我们最常见到的注解应该是条件注解了吧!也就是@Condit...
- 如何自定义编解码器(如何自定义编解码器的程序)
-
1.前言上一节我们一节了解了什么是编码解码、序列化和反序列化了,并且留有一道思考题,本节内容主要是深入解析该思考题。思考题:能否把我们的编码和解码封装成独立的Handler呢?那么应该如何去封装...
- Disruptor—3.核心源码实现分析二
-
大纲1.Disruptor的生产者源码分析2.Disruptor的消费者源码分析3.Disruptor的WaitStrategy等待策略分析4.Disruptor的高性能原因5.Disruptor高性...
- 线程的状态有哪些?它是如何工作的?
-
线程的状态有哪些?它是如何工作的?线程(Thread)是并发编程的基础,也是程序执行的最小单元,它依托进程而存在。一个进程中可以包含多个线程,多线程可以共享一块内存空间和一组系统资源,因此线程之间的切...
- 有图解有案例,我终于把Condition的原理讲透彻了
-
平时加解锁都是直接使用Synchronized关键字来实现的,简单好用,为啥还要引用ReentrantLock呢?为了解决小伙伴的疑问,我们来对两者做个简单的比较吧:相同点两者都是“可重入锁”,即当前...
- 白话DUBBO原理,通俗易记,再也不怕面试时讲不清楚了
-
现在的各种面试免不了要问些中间件,尤其是互联网公司,更注重获选人对中间件的掌握情况。在中间件中,有一大类是关于RPC框架的,Dubbo即是阿里出品的一款很著名的RPC中间件,很多互联网公司都在用,面试...
- Java 最细的集合类总结(java常用的集合类有哪些)
-
数据结构作为每一个开发者不可回避的问题,而Java对于不同的数据结构提供了非常成熟的实现,这一个又一个实现既是面试中的难点,也是工作中必不可少的工具,在此,笔者经历漫长的剖析,将其抽丝剥茧的呈现出...
- 详解Java异常(Exception)处理及常见异常
-
很多事件并非总是按照人们自己设计意愿顺利发展的,经常出现这样那样的异常情况。例如:你计划周末郊游,计划从家里出发→到达目的→游泳→烧烤→回家。但天有不测风云,当你准备烧烤时候突然天降大雨,只能终止郊...
- 为什么阿里强制要求不要在foreach循环里进行元素remove和add操作
-
在阅读《阿里巴巴Java开发手册》时,发现有一条关于在foreach循环里进行元素的remove/add操作的规约,具体内容如下:错误演示我们首先在IDEA中编写一个在foreach循...
- SpringBoot条件化配置(@Conditional)全面解析与实战指南
-
一、条件化配置基础概念1.1什么是条件化配置条件化配置是Spring框架提供的一种基于特定条件来决定是否注册Bean或加载配置的机制。在SpringBoot中,这一机制通过@Conditional...
- 一周热门
-
-
Python实现人事自动打卡,再也不会被批评
-
Psutil + Flask + Pyecharts + Bootstrap 开发动态可视化系统监控
-
【验证码逆向专栏】vaptcha 手势验证码逆向分析
-
一个解决支持HTML/CSS/JS网页转PDF(高质量)的终极解决方案
-
再见Swagger UI 国人开源了一款超好用的 API 文档生成框架,真香
-
网页转成pdf文件的经验分享 网页转成pdf文件的经验分享怎么弄
-
C++ std::vector 简介
-
python使用fitz模块提取pdf中的图片
-
《人人译客》如何规划你的移动电商网站(2)
-
Jupyterhub安装教程 jupyter怎么安装包
-
- 最近发表
- 标签列表
-
- python判断字典是否为空 (50)
- crontab每周一执行 (48)
- aes和des区别 (43)
- bash脚本和shell脚本的区别 (35)
- canvas库 (33)
- dataframe筛选满足条件的行 (35)
- gitlab日志 (33)
- lua xpcall (36)
- blob转json (33)
- python判断是否在列表中 (34)
- python html转pdf (36)
- 安装指定版本npm (37)
- idea搜索jar包内容 (33)
- css鼠标悬停出现隐藏的文字 (34)
- linux nacos启动命令 (33)
- gitlab 日志 (36)
- adb pull (37)
- table.render (33)
- uniapp textarea (33)
- python判断元素在不在列表里 (34)
- python 字典删除元素 (34)
- vscode切换git分支 (35)
- python bytes转16进制 (35)
- grep前后几行 (34)
- hashmap转list (35)