百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Python文件操作常用库高级应用教程

liuian 2025-05-26 17:22 29 浏览

本文是在前面《Python文件操作常用库使用教程》的基础上,进一步学习Python文件操作库的高级应用。

一、高级文件系统监控

1.1 watchdog库 - 实时文件系统监控

安装与基本使用

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class FileChangeHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if not event.is_directory:
            print(f"文件被修改: {event.src_path}")

def monitor_directory(path):
    event_handler = FileChangeHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

# 监控当前目录
monitor_directory('.')

功能扩展

class CustomHandler(FileSystemEventHandler):
    def __init__(self, callback):
        self.callback = callback
    
    def on_any_event(self, event):
        event_type = event.event_type  # 'modified', 'created', 'deleted', 'moved'
        self.callback(event_type, event.src_path, 
                     getattr(event, 'dest_path', None))

# 使用示例
def log_change(event_type, src, dest):
    print(f"{event_type}: {src} {f'-> {dest}' if dest else ''}")

monitor_with_callback('.', log_change)

文件监控系统架构




二、高性能文件处理

2.1 内存映射文件(mmap)

大文件处理示例

import mmap
import re

def search_large_file(filename, pattern):
    """在大文件中搜索模式"""
    with open(filename, 'r+b') as f:
        # 内存映射文件
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            # 使用正则搜索
            for match in re.finditer(pattern.encode(), mm):
                yield match.start(), match.group().decode()

# 使用示例
for pos, text in search_large_file('large.log', r'ERROR\s+\d+'):
    print(f"在位置 {pos} 发现错误: {text}")

性能对比测试

import timeit

def test_performance():
    # 传统方式
    def traditional():
        with open('large.file') as f:
            return sum(1 for _ in f)
    
    # mmap方式
    def mmap_way():
        with open('large.file', 'r+b') as f:
            with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
                return sum(1 for _ in mm)
    
    print("传统方式:", timeit.timeit(traditional, number=10))
    print("mmap方式:", timeit.timeit(mmap_way, number=10))

2.2 并行文件处理

多进程文件处理

from multiprocessing import Pool
import os

def process_file_chunk(args):
    """处理文件块"""
    filename, start, end = args
    with open(filename, 'rb') as f:
        f.seek(start)
        chunk = f.read(end - start)
        return len(chunk.splitlines())

def parallel_file_processing(filename, workers=4):
    """并行处理大文件"""
    file_size = os.path.getsize(filename)
    chunk_size = file_size // workers
    ranges = [(filename, i*chunk_size, (i+1)*chunk_size) 
              for i in range(workers)]
    
    with Pool(workers) as pool:
        results = pool.map(process_file_chunk, ranges)
    
    return sum(results)

# 使用示例
total_lines = parallel_file_processing('huge_file.csv')
print(f"文件总行数: {total_lines}")

三、高级压缩与归档处理

3.1 增量压缩与加密

ZIP加密与增量添加

import zipfile
import os
from tqdm import tqdm  # 进度条库

def create_secure_zip(output_zip, files_to_zip, password):
    """创建加密ZIP文件"""
    with zipfile.ZipFile(
        output_zip, 
        'w', 
        compression=zipfile.ZIP_DEFLATED
    ) as zipf:
        zipf.setpassword(password.encode())
        
        for file in tqdm(files_to_zip, desc="压缩进度"):
            if os.path.isfile(file):
                zipf.write(file)
                
def add_to_existing_zip(zip_file, new_files, password=None):
    """向现有ZIP添加文件"""
    temp_zip = f"temp_{zip_file}"
    os.rename(zip_file, temp_zip)
    
    with zipfile.ZipFile(temp_zip, 'r') as zin:
        with zipfile.ZipFile(zip_file, 'w') as zout:
            if password:
                zout.setpassword(password.encode())
            
            # 复制现有条目
            for item in tqdm(zin.infolist(), desc="复制原有文件"):
                zout.writestr(item, zin.read(item))
            
            # 添加新文件
            for file in tqdm(new_files, desc="添加新文件"):
                if os.path.isfile(file):
                    zout.write(file)
    
    os.remove(temp_zip)

3.2 多卷压缩文件

分卷压缩实现

import zipfile
import math

def split_zip(input_files, output_prefix, max_size_mb):
    """创建分卷ZIP文件"""
    max_size = max_size_mb * 1024 * 1024
    part_num = 1
    current_size = 0
    current_zip = None
    
    for file in input_files:
        file_size = os.path.getsize(file)
        
        # 如果当前ZIP不存在或需要新分卷
        if current_zip is None or current_size + file_size > max_size:
            if current_zip is not None:
                current_zip.close()
            
            zip_name = f"{output_prefix}.zip.{part_num:03d}"
            current_zip = zipfile.ZipFile(
                zip_name, 'w', zipfile.ZIP_DEFLATED)
            part_num += 1
            current_size = 0
        
        current_zip.write(file)
        current_size += file_size
    
    if current_zip is not None:
        current_zip.close()
    
    return part_num - 1  # 返回分卷数量

四、云存储集成

4.1 boto3 - AWS S3集成

S3文件操作示例

import boto3
from botocore.exceptions import ClientError

class S3Manager:
    def __init__(self, bucket_name):
        self.s3 = boto3.client('s3')
        self.bucket = bucket_name
    
    def upload_file(self, local_path, s3_key):
        """上传文件到S3"""
        try:
            self.s3.upload_file(
                local_path, 
                self.bucket, 
                s3_key,
                ExtraArgs={
                    'ACL': 'bucket-owner-full-control',
                    'StorageClass': 'INTELLIGENT_TIERING'
                }
            )
            return True
        except ClientError as e:
            print(f"S3上传错误: {e}")
            return False
    
    def download_file(self, s3_key, local_path):
        """从S3下载文件"""
        try:
            self.s3.download_file(self.bucket, s3_key, local_path)
            return True
        except ClientError as e:
            print(f"S3下载错误: {e}")
            return False
    
    def generate_presigned_url(self, s3_key, expires=3600):
        """生成预签名URL"""
        try:
            return self.s3.generate_presigned_url(
                'get_object',
                Params={'Bucket': self.bucket, 'Key': s3_key},
                ExpiresIn=expires
            )
        except ClientError as e:
            print(f"生成URL错误: {e}")
            return None

4.2 阿里云OSS集成

OSS文件操作示例

import oss2
from oss2.exceptions import OssError

class OSSManager:
    def __init__(self, access_key, secret_key, endpoint, bucket_name):
        auth = oss2.Auth(access_key, secret_key)
        self.bucket = oss2.Bucket(auth, endpoint, bucket_name)
    
    def upload_with_progress(self, local_path, oss_key):
        """带进度显示的上传"""
        def progress_callback(consumed_bytes, total_bytes):
            if total_bytes:
                percent = 100 * (consumed_bytes / total_bytes)
                print(f"\r上传进度: {percent:.2f}%", end='')
        
        try:
            result = self.bucket.put_object_from_file(
                oss_key, 
                local_path,
                progress_callback=progress_callback
            )
            print("\n上传完成")
            return result.status == 200
        except OssError as e:
            print(f"\nOSS上传错误: {e}")
            return False
    
    def download_large_file(self, oss_key, local_path, part_size=10*1024*1024):
        """分片下载大文件"""
        try:
            oss2.resumable_download(
                self.bucket, 
                oss_key, 
                local_path,
                part_size=part_size,
                num_threads=4
            )
            return True
        except OssError as e:
            print(f"OSS下载错误: {e}")
            return False

五、文件内容高级处理

5.1 二进制文件解析

结构化二进制文件解析

import struct
from collections import namedtuple

def parse_binary_file(filename):
    """解析自定义二进制格式文件"""
    # 定义文件头结构
    Header = namedtuple('Header', 'magic version num_records')
    header_format = '<4sII'  # 4字节魔数,2个无符号整数
    
    # 定义记录结构
    Record = namedtuple('Record', 'id timestamp value')
    record_format = '<Qdf'  # 无符号长整型,双精度浮点,单精度浮点
    
    with open(filename, 'rb') as f:
        # 读取文件头
        header_data = f.read(struct.calcsize(header_format))
        header = Header._make(struct.unpack(header_format, header_data))
        
        # 验证魔数
        if header.magic != b'BIN1':
            raise ValueError("无效的文件格式")
        
        # 读取记录
        records = []
        for _ in range(header.num_records):
            record_data = f.read(struct.calcsize(record_format))
            records.append(Record._make(struct.unpack(record_format, record_data)))
    
    return header, records

5.2 实时日志文件分析

实时日志分析器

import re
from collections import defaultdict
import time

class LogAnalyzer:
    def __init__(self, log_file, patterns):
        self.log_file = log_file
        self.patterns = patterns
        self.position = 0
        self.stats = defaultdict(int)
    
    def analyze(self):
        """分析日志文件"""
        with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
            # 跳到上次读取位置
            f.seek(self.position)
            
            for line in f:
                for name, pattern in self.patterns.items():
                    if re.search(pattern, line):
                        self.stats[name] += 1
            
            # 更新读取位置
            self.position = f.tell()
        
        return dict(self.stats)
    
    def continuous_monitor(self, interval=5):
        """持续监控日志文件"""
        try:
            while True:
                stats = self.analyze()
                if stats:
                    print(f"\n[{time.ctime()}] 统计结果:")
                    for name, count in stats.items():
                        print(f"{name}: {count}")
                time.sleep(interval)
        except KeyboardInterrupt:
            print("\n监控停止")

# 使用示例
patterns = {
    'ERROR': r'ERROR',
    'WARNING': r'WARNING',
    'HTTP_500': r'HTTP/1.\d"\s+500'
}
analyzer = LogAnalyzer('app.log', patterns)
analyzer.continuous_monitor()

六、安全文件操作

6.1 安全文件写入模式

原子写入模式

import os
import tempfile

def atomic_write(filename, data, mode='w', encoding='utf-8'):
    """原子性写入文件"""
    # 创建临时文件
    temp_dir = os.path.dirname(os.path.abspath(filename))
    with tempfile.NamedTemporaryFile(
        mode=mode,
        dir=temp_dir,
        prefix='.tmp_',
        delete=False,
        encoding=encoding
    ) as tmp:
        tmp.write(data)
        tmp.flush()
        os.fsync(tmp.fileno())
        tempname = tmp.name
    
    # 原子替换
    try:
        os.replace(tempname, filename)
    except:
        os.unlink(tempname)
        raise

6.2 文件权限管理

安全权限设置

import os
import stat

def set_secure_permissions(filename):
    """设置安全文件权限"""
    # 获取当前权限
    current_mode = os.stat(filename).st_mode
    
    # 移除组和其他用户的写权限
    new_mode = current_mode & ~stat.S_IWGRP & ~stat.S_IWOTH
    
    # 如果是敏感文件,移除所有组和其他用户的权限
    if filename.endswith(('.key', '.pem', '.env')):
        new_mode = new_mode & ~stat.S_IRGRP & ~stat.S_IROTH
    
    os.chmod(filename, new_mode)

def create_secure_file(filename, content):
    """创建安全文件"""
    # 使用原子写入
    atomic_write(filename, content)
    
    # 设置安全权限
    set_secure_permissions(filename)
    
    # 验证权限
    mode = os.stat(filename).st_mode
    if mode & (stat.S_IWGRP | stat.S_IWOTH):
        raise RuntimeError("文件权限设置失败")

七、总结

表2:文件操作性能优化技术

场景

优化技术

适用条件

大文件读取

内存映射(mmap)

需要随机访问的大文件

批量小文件

并行处理(multiprocessing)

大量独立小文件

顺序处理

缓冲读写(大缓冲区)

顺序读写场景

网络存储

断点续传/分片上传

不稳定网络环境

实时处理

增量读取(记录位置)

日志监控等场景

压缩文件

流式处理(不解压)

大型压缩文件

建议

  1. 避免频繁的小IO操作,尽量批量处理
  2. 使用with语句确保资源释放
  3. 对大文件考虑内存映射技术
  4. 大量文件处理使用并行技术
  5. 网络存储操作添加重试机制
  6. 关键操作添加原子性保证

通过本教程的高级技巧,我们可以在实际项目中实现更高效、更安全的文件操作,满足企业级应用的需求。


持续更新Python编程学习日志与技巧,敬请关注!



#编程# #python# #在头条记录我的2025#

相关推荐

教你把多个视频合并成一个视频的方法

一.情况介绍当你有一个m3u8文件和一个目录,目录中有连续的视频片段,这些片段可以连成一段完整的视频。m3u8文件打开后像这样:m3u8文件,可以理解为播放列表,里面是播放视频片段的顺序。视频片段像这...

零代码编程:用kimichat合并一个文件夹下的多个文件

一个文件夹里面有很多个srt字幕文件,如何借助kimichat来自动批量合并呢?在kimichat对话框中输入提示词:你是一个Python编程专家,完成如下的编程任务:这个文件夹:D:\downloa...

Java APT_java APT 生成代码

JavaAPT(AnnotationProcessingTool)是一种在Java编译阶段处理注解的工具。APT会在编译阶段扫描源代码中的注解,并根据这些注解生成代码、资源文件或其他输出,...

Unit Runtime:一键运行 AI 生成的代码,或许将成为你的复制 + 粘贴神器

在我们构建了UnitMesh架构之后,以及对应的demo之后,便着手于实现UnitMesh架构。于是,我们就继续开始UnitRuntime,以用于直接运行AI生成的代码。PS:...

挣脱臃肿的枷锁:为什么说Vert.x是Java开发者手中的一柄利剑?

如果你是一名Java开发者,那么你的职业生涯几乎无法避开Spring。它如同一位德高望重的老国王,统治着企业级应用开发的大片疆土。SpringBoot的约定大于配置、SpringCloud的微服务...

五年后,谷歌还在全力以赴发展 Kotlin

作者|FredericLardinois译者|Sambodhi策划|Tina自2017年谷歌I/O全球开发者大会上,谷歌首次宣布将Kotlin(JetBrains开发的Ja...

kotlin和java开发哪个好,优缺点对比

Kotlin和Java都是常见的编程语言,它们有各自的优缺点。Kotlin的优点:简洁:Kotlin程序相对于Java程序更简洁,可以减少代码量。安全:Kotlin在类型系统和空值安全...

移动端架构模式全景解析:从MVC到MVVM,如何选择最佳设计方案?

掌握不同架构模式的精髓,是构建可维护、可测试且高效移动应用的关键。在移动应用开发中,选择合适的软件架构模式对项目的可维护性、可测试性和团队协作效率至关重要。随着应用复杂度的增加,一个良好的架构能够帮助...

颜值非常高的XShell替代工具Termora,不一样的使用体验!

Termora是一款面向开发者和运维人员的跨平台SSH终端与文件管理工具,支持Windows、macOS及Linux系统,通过一体化界面简化远程服务器管理流程。其核心定位是解决多平台环境下远程连接、文...

预处理的底层原理和预处理编译运行异常的解决方案

若文章对您有帮助,欢迎关注程序员小迷。助您在编程路上越走越好![Mac-10.7.1LionIntel-based]Q:预处理到底干了什么事情?A:预处理,顾名思义,预先做的处理。源代码中...

为“架构”再建个模:如何用代码描述软件架构?

在架构治理平台ArchGuard中,为了实现对架构的治理,我们需要代码+模型描述所要处理的内容和数据。所以,在ArchGuard中,我们有了代码的模型、依赖的模型、变更的模型等,剩下的两个...

深度解析:Google Gemma 3n —— 移动优先的轻量多模态大模型

2025年6月,Google正式发布了Gemma3n,这是一款能够在2GB内存环境下运行的轻量级多模态大模型。它延续了Gemma家族的开源基因,同时在架构设计上大幅优化,目标是让...

比分网开发技术栈与功能详解_比分网有哪些

一、核心功能模块一个基本的比分网通常包含以下模块:首页/总览实时比分看板:滚动展示所有正在进行的比赛,包含比分、比赛时间、红黄牌等关键信息。热门赛事/焦点战:突出显示重要的、关注度高的比赛。赛事导航...

设计模式之-生成器_一键生成设计

一、【概念定义】——“分步构建复杂对象,隐藏创建细节”生成器模式(BuilderPattern):一种“分步构建型”创建型设计模式,它将一个复杂对象的构建与其表示分离,使得同样的构建过程可以创建...

构建第一个 Kotlin Android 应用_kotlin简介

第一步:安装AndroidStudio(推荐IDE)AndroidStudio是官方推荐的Android开发集成开发环境(IDE),内置对Kotlin的完整支持。1.下载And...