百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Python文件操作常用库高级应用教程

liuian 2025-05-26 17:22 12 浏览

本文是在前面《Python文件操作常用库使用教程》的基础上,进一步学习Python文件操作库的高级应用。

一、高级文件系统监控

1.1 watchdog库 - 实时文件系统监控

安装与基本使用

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class FileChangeHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if not event.is_directory:
            print(f"文件被修改: {event.src_path}")

def monitor_directory(path):
    event_handler = FileChangeHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

# 监控当前目录
monitor_directory('.')

功能扩展

class CustomHandler(FileSystemEventHandler):
    def __init__(self, callback):
        self.callback = callback
    
    def on_any_event(self, event):
        event_type = event.event_type  # 'modified', 'created', 'deleted', 'moved'
        self.callback(event_type, event.src_path, 
                     getattr(event, 'dest_path', None))

# 使用示例
def log_change(event_type, src, dest):
    print(f"{event_type}: {src} {f'-> {dest}' if dest else ''}")

monitor_with_callback('.', log_change)

文件监控系统架构




二、高性能文件处理

2.1 内存映射文件(mmap)

大文件处理示例

import mmap
import re

def search_large_file(filename, pattern):
    """在大文件中搜索模式"""
    with open(filename, 'r+b') as f:
        # 内存映射文件
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            # 使用正则搜索
            for match in re.finditer(pattern.encode(), mm):
                yield match.start(), match.group().decode()

# 使用示例
for pos, text in search_large_file('large.log', r'ERROR\s+\d+'):
    print(f"在位置 {pos} 发现错误: {text}")

性能对比测试

import timeit

def test_performance():
    # 传统方式
    def traditional():
        with open('large.file') as f:
            return sum(1 for _ in f)
    
    # mmap方式
    def mmap_way():
        with open('large.file', 'r+b') as f:
            with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
                return sum(1 for _ in mm)
    
    print("传统方式:", timeit.timeit(traditional, number=10))
    print("mmap方式:", timeit.timeit(mmap_way, number=10))

2.2 并行文件处理

多进程文件处理

from multiprocessing import Pool
import os

def process_file_chunk(args):
    """处理文件块"""
    filename, start, end = args
    with open(filename, 'rb') as f:
        f.seek(start)
        chunk = f.read(end - start)
        return len(chunk.splitlines())

def parallel_file_processing(filename, workers=4):
    """并行处理大文件"""
    file_size = os.path.getsize(filename)
    chunk_size = file_size // workers
    ranges = [(filename, i*chunk_size, (i+1)*chunk_size) 
              for i in range(workers)]
    
    with Pool(workers) as pool:
        results = pool.map(process_file_chunk, ranges)
    
    return sum(results)

# 使用示例
total_lines = parallel_file_processing('huge_file.csv')
print(f"文件总行数: {total_lines}")

三、高级压缩与归档处理

3.1 增量压缩与加密

ZIP加密与增量添加

import zipfile
import os
from tqdm import tqdm  # 进度条库

def create_secure_zip(output_zip, files_to_zip, password):
    """创建加密ZIP文件"""
    with zipfile.ZipFile(
        output_zip, 
        'w', 
        compression=zipfile.ZIP_DEFLATED
    ) as zipf:
        zipf.setpassword(password.encode())
        
        for file in tqdm(files_to_zip, desc="压缩进度"):
            if os.path.isfile(file):
                zipf.write(file)
                
def add_to_existing_zip(zip_file, new_files, password=None):
    """向现有ZIP添加文件"""
    temp_zip = f"temp_{zip_file}"
    os.rename(zip_file, temp_zip)
    
    with zipfile.ZipFile(temp_zip, 'r') as zin:
        with zipfile.ZipFile(zip_file, 'w') as zout:
            if password:
                zout.setpassword(password.encode())
            
            # 复制现有条目
            for item in tqdm(zin.infolist(), desc="复制原有文件"):
                zout.writestr(item, zin.read(item))
            
            # 添加新文件
            for file in tqdm(new_files, desc="添加新文件"):
                if os.path.isfile(file):
                    zout.write(file)
    
    os.remove(temp_zip)

3.2 多卷压缩文件

分卷压缩实现

import zipfile
import math

def split_zip(input_files, output_prefix, max_size_mb):
    """创建分卷ZIP文件"""
    max_size = max_size_mb * 1024 * 1024
    part_num = 1
    current_size = 0
    current_zip = None
    
    for file in input_files:
        file_size = os.path.getsize(file)
        
        # 如果当前ZIP不存在或需要新分卷
        if current_zip is None or current_size + file_size > max_size:
            if current_zip is not None:
                current_zip.close()
            
            zip_name = f"{output_prefix}.zip.{part_num:03d}"
            current_zip = zipfile.ZipFile(
                zip_name, 'w', zipfile.ZIP_DEFLATED)
            part_num += 1
            current_size = 0
        
        current_zip.write(file)
        current_size += file_size
    
    if current_zip is not None:
        current_zip.close()
    
    return part_num - 1  # 返回分卷数量

四、云存储集成

4.1 boto3 - AWS S3集成

S3文件操作示例

import boto3
from botocore.exceptions import ClientError

class S3Manager:
    def __init__(self, bucket_name):
        self.s3 = boto3.client('s3')
        self.bucket = bucket_name
    
    def upload_file(self, local_path, s3_key):
        """上传文件到S3"""
        try:
            self.s3.upload_file(
                local_path, 
                self.bucket, 
                s3_key,
                ExtraArgs={
                    'ACL': 'bucket-owner-full-control',
                    'StorageClass': 'INTELLIGENT_TIERING'
                }
            )
            return True
        except ClientError as e:
            print(f"S3上传错误: {e}")
            return False
    
    def download_file(self, s3_key, local_path):
        """从S3下载文件"""
        try:
            self.s3.download_file(self.bucket, s3_key, local_path)
            return True
        except ClientError as e:
            print(f"S3下载错误: {e}")
            return False
    
    def generate_presigned_url(self, s3_key, expires=3600):
        """生成预签名URL"""
        try:
            return self.s3.generate_presigned_url(
                'get_object',
                Params={'Bucket': self.bucket, 'Key': s3_key},
                ExpiresIn=expires
            )
        except ClientError as e:
            print(f"生成URL错误: {e}")
            return None

4.2 阿里云OSS集成

OSS文件操作示例

import oss2
from oss2.exceptions import OssError

class OSSManager:
    def __init__(self, access_key, secret_key, endpoint, bucket_name):
        auth = oss2.Auth(access_key, secret_key)
        self.bucket = oss2.Bucket(auth, endpoint, bucket_name)
    
    def upload_with_progress(self, local_path, oss_key):
        """带进度显示的上传"""
        def progress_callback(consumed_bytes, total_bytes):
            if total_bytes:
                percent = 100 * (consumed_bytes / total_bytes)
                print(f"\r上传进度: {percent:.2f}%", end='')
        
        try:
            result = self.bucket.put_object_from_file(
                oss_key, 
                local_path,
                progress_callback=progress_callback
            )
            print("\n上传完成")
            return result.status == 200
        except OssError as e:
            print(f"\nOSS上传错误: {e}")
            return False
    
    def download_large_file(self, oss_key, local_path, part_size=10*1024*1024):
        """分片下载大文件"""
        try:
            oss2.resumable_download(
                self.bucket, 
                oss_key, 
                local_path,
                part_size=part_size,
                num_threads=4
            )
            return True
        except OssError as e:
            print(f"OSS下载错误: {e}")
            return False

五、文件内容高级处理

5.1 二进制文件解析

结构化二进制文件解析

import struct
from collections import namedtuple

def parse_binary_file(filename):
    """解析自定义二进制格式文件"""
    # 定义文件头结构
    Header = namedtuple('Header', 'magic version num_records')
    header_format = '<4sII'  # 4字节魔数,2个无符号整数
    
    # 定义记录结构
    Record = namedtuple('Record', 'id timestamp value')
    record_format = '<Qdf'  # 无符号长整型,双精度浮点,单精度浮点
    
    with open(filename, 'rb') as f:
        # 读取文件头
        header_data = f.read(struct.calcsize(header_format))
        header = Header._make(struct.unpack(header_format, header_data))
        
        # 验证魔数
        if header.magic != b'BIN1':
            raise ValueError("无效的文件格式")
        
        # 读取记录
        records = []
        for _ in range(header.num_records):
            record_data = f.read(struct.calcsize(record_format))
            records.append(Record._make(struct.unpack(record_format, record_data)))
    
    return header, records

5.2 实时日志文件分析

实时日志分析器

import re
from collections import defaultdict
import time

class LogAnalyzer:
    def __init__(self, log_file, patterns):
        self.log_file = log_file
        self.patterns = patterns
        self.position = 0
        self.stats = defaultdict(int)
    
    def analyze(self):
        """分析日志文件"""
        with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
            # 跳到上次读取位置
            f.seek(self.position)
            
            for line in f:
                for name, pattern in self.patterns.items():
                    if re.search(pattern, line):
                        self.stats[name] += 1
            
            # 更新读取位置
            self.position = f.tell()
        
        return dict(self.stats)
    
    def continuous_monitor(self, interval=5):
        """持续监控日志文件"""
        try:
            while True:
                stats = self.analyze()
                if stats:
                    print(f"\n[{time.ctime()}] 统计结果:")
                    for name, count in stats.items():
                        print(f"{name}: {count}")
                time.sleep(interval)
        except KeyboardInterrupt:
            print("\n监控停止")

# 使用示例
patterns = {
    'ERROR': r'ERROR',
    'WARNING': r'WARNING',
    'HTTP_500': r'HTTP/1.\d"\s+500'
}
analyzer = LogAnalyzer('app.log', patterns)
analyzer.continuous_monitor()

六、安全文件操作

6.1 安全文件写入模式

原子写入模式

import os
import tempfile

def atomic_write(filename, data, mode='w', encoding='utf-8'):
    """原子性写入文件"""
    # 创建临时文件
    temp_dir = os.path.dirname(os.path.abspath(filename))
    with tempfile.NamedTemporaryFile(
        mode=mode,
        dir=temp_dir,
        prefix='.tmp_',
        delete=False,
        encoding=encoding
    ) as tmp:
        tmp.write(data)
        tmp.flush()
        os.fsync(tmp.fileno())
        tempname = tmp.name
    
    # 原子替换
    try:
        os.replace(tempname, filename)
    except:
        os.unlink(tempname)
        raise

6.2 文件权限管理

安全权限设置

import os
import stat

def set_secure_permissions(filename):
    """设置安全文件权限"""
    # 获取当前权限
    current_mode = os.stat(filename).st_mode
    
    # 移除组和其他用户的写权限
    new_mode = current_mode & ~stat.S_IWGRP & ~stat.S_IWOTH
    
    # 如果是敏感文件,移除所有组和其他用户的权限
    if filename.endswith(('.key', '.pem', '.env')):
        new_mode = new_mode & ~stat.S_IRGRP & ~stat.S_IROTH
    
    os.chmod(filename, new_mode)

def create_secure_file(filename, content):
    """创建安全文件"""
    # 使用原子写入
    atomic_write(filename, content)
    
    # 设置安全权限
    set_secure_permissions(filename)
    
    # 验证权限
    mode = os.stat(filename).st_mode
    if mode & (stat.S_IWGRP | stat.S_IWOTH):
        raise RuntimeError("文件权限设置失败")

七、总结

表2:文件操作性能优化技术

场景

优化技术

适用条件

大文件读取

内存映射(mmap)

需要随机访问的大文件

批量小文件

并行处理(multiprocessing)

大量独立小文件

顺序处理

缓冲读写(大缓冲区)

顺序读写场景

网络存储

断点续传/分片上传

不稳定网络环境

实时处理

增量读取(记录位置)

日志监控等场景

压缩文件

流式处理(不解压)

大型压缩文件

建议

  1. 避免频繁的小IO操作,尽量批量处理
  2. 使用with语句确保资源释放
  3. 对大文件考虑内存映射技术
  4. 大量文件处理使用并行技术
  5. 网络存储操作添加重试机制
  6. 关键操作添加原子性保证

通过本教程的高级技巧,我们可以在实际项目中实现更高效、更安全的文件操作,满足企业级应用的需求。


持续更新Python编程学习日志与技巧,敬请关注!



#编程# #python# #在头条记录我的2025#

相关推荐

结构力学!EI会议图表规范秘籍(ei会议排版)

推荐会议:国际结构与材料工程进展大会(ISME2026)会议编号:EI#73521截稿时间:2026年3月10日召开时间/地点:2026年8月15-17日·德国柏林论文集上线:会后4...

如何在simulink中获取足端轨迹?(simulink怎么设置触发角)

哈喽大家好,我是咕噜美乐蒂。很高兴又和大家见面啦。在机器人控制的应用中,足端轨迹是一个非常重要的参数,可以用来评估机器人的运动性能和精度。在Simulink中获取足端轨迹需要考虑到模型的复杂性、仿...

JCMsuite:旋转对称发射器(旋转式发射)

示例取自Gregersen等人[1]。几何形状为非理想微柱结构:单光子柱发射器(旋转对称)多层膜是在布局文件layout.jcm中由外部形状为梯形的特殊原始多层创建的(见下文)。参数扫描Matlab(...

动态离散周期变换技术突破:无ECG参考的生理信号精准解析

来源:电子产品世界摘要本文介绍了新型滑动离散周期变换(DPT)算法,可设计用于处理生理信号,尤其是脉搏血氧仪采集的光电容积脉搏波(PPG)信号。该算法采用正弦基函数进行周期域分析,可解决随机噪声和非平...

电气EI源刊避坑指南速存(电气工程开源期刊)

期刊推荐:《IEEETransactionsonPowerSystems》刊号:ISSN0885-8950影响因子:8.5(最新JCR数据)分区:中科院1区|JCRQ1版面费:约2200美...

Matlab基础入门手册(第五章:脚本/函数)

第五章脚本和函数1.44循环和条件语句1.循环语句和条件语句的用法2.说明循环语句:for,while条件语句:if,switch3.实例演示%1_44forx=1:5%简单for程序实例...

利用GPT4-V及Langchain实现多模态RAG

多模态RAG将是2024年AI应用架构发展的一个重要趋势,在前面的一篇文章里提到llama-index在这方面的尝试《利用GPT4-V及llama-index构建多模态RAG应用》,本文[1]中将以另...

WPF基础之UI布局(wpf ui界面设计)

知识点:WPF中的布局控件主要有以下几种:StackPanel:栈面板,可以将元素排列成一行或者一列。其特点是:每个元素各占一行或者一列。WrapPanel:环绕面板,将各个控件从左至右按照行或列的顺...

27.WPF 形状(wps 形状)

摘要  在WPF用户界面中,绘制2D图形内容的最简单方法是使用形状(shape)——专门用于表示简单的直线、椭圆、矩形以及多变形的一些类。从技术角度看,形状就是所谓的绘图图元(primitive)。可...

WPF与WinForm的本质区别(wpf和winui)

在Windows应用程序开发中,WinForm和WPF是两种主要的技术框架。它们各自有不同的设计理念、渲染机制和开发模式。本文将详细探讨WPF与WinForm的本质区别,并通过示例进行说明。渲染机制W...

.NET跨平台绘图基础库--SkiaSharp

SkiaSharp是一个跨平台的2D图形API,用于.NET平台,基于Google的Skia图形库。它提供了全面的2DAPI,可以在移动、服务器和桌面模型上渲染图像。SkiaS...

django python数据中心、客户、机柜、设备资源管理平台源码分享

先转发后关注,私信“资源”即可免费获取源码下载链接!本项目一个开源的倾向于数据中心运营商而开发的,拥有数据中心、客户、机柜、设备、跳线、物品、测试、文档等一些列模块的资源管理平台,解决各类资源集中管理...

在树莓派上:安装Ubuntu Server 20.04

什么是树莓派树莓派是英国树莓派基金会(https://www.raspberrypi.org)开发的卡片式电脑,采用高通的BCM2711ARM64处理器,可用于机器人、物联网、边缘计算、通用计算等多...

手把手教你搭建深度学习环境Pytorch版-Ubuntu

引言很多搞人工智能的小伙伴,刚开始学习,往往摸不着头脑怎么跑代码。跑代码的前提是要有个环境。本篇结合自己的亲身经历,带你搭建环境。相关知识Ubuntu是Linux系统的一种显卡驱动和cuda是两个不同...

干货,Python竟然可以用Kivy编写和打包安卓APP

请大家多多点赞,关注和分享在上一篇文章中,我们介绍了在Python中使用BeeWare框架编写图形程序并将其打包为安卓的apk文件程序。爆强!直接把Python编写的图形程序打包为安卓A...