百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Python3 pymysql 纯原生多条件查询 SQL 拼接方案

liuian 2025-10-14 01:03 8 浏览

一个不使用构建器类,纯原生编写的动态 SQL 拼接方案。
这种方式更加直接,适合简单场景或需要完全控制 SQL 生成过程的情况。

核心实现思路

纯原生实现的关键是:

  • 使用列表收集 SQL 片段和参数
  • 根据条件动态添加片段到列表中
  • 最后将列表拼接成完整 SQL 语句
  • 始终使用参数化查询防止 SQL 注入

完整实现方案

1. 基础查询函数

import pymysql
from typing import Dict, List, Union, Optional, Tuple

def build_dynamic_query(table: str, filters: Dict, columns: Union[str, List[str]] = "*") -> Tuple[str, List]:
    """
    构建动态查询SQL语句
    
    Args:
        table: 查询的表名
        filters: 查询条件字典
        columns: 要查询的字段列表
        
    Returns:
        完整的SQL语句和参数列表
    """
    # 处理查询字段
    if isinstance(columns, list):
        columns_str = ", ".join(columns)
    else:
        columns_str = columns
    
    # 初始化SQL片段和参数
    sql_fragments = [f"SELECT {columns_str} FROM {table}"]
    params = []
    
    # 处理WHERE条件
    where_clauses = []
    
    # 处理等于查询
    if 'eq' in filters:
        for column, value in filters['eq'].items():
            if value is not None:
                where_clauses.append(f"{column} = %s")
                params.append(value)
    
    # 处理不等于查询
    if 'ne' in filters:
        for column, value in filters['ne'].items():
            if value is not None:
                where_clauses.append(f"{column} != %s")
                params.append(value)
    
    # 处理大于查询
    if 'gt' in filters:
        for column, value in filters['gt'].items():
            if value is not None:
                where_clauses.append(f"{column} > %s")
                params.append(value)
    
    # 处理大于等于查询
    if 'ge' in filters:
        for column, value in filters['ge'].items():
            if value is not None:
                where_clauses.append(f"{column} >= %s")
                params.append(value)
    
    # 处理小于查询
    if 'lt' in filters:
        for column, value in filters['lt'].items():
            if value is not None:
                where_clauses.append(f"{column} < %s")
                params.append(value)
    
    # 处理小于等于查询
    if 'le' in filters:
        for column, value in filters['le'].items():
            if value is not None:
                where_clauses.append(f"{column} <= %s")
                params.append(value)
    
    # 处理模糊查询
    if 'like' in filters:
        for column, value_info in filters['like'].items():
            if isinstance(value_info, dict):
                value = value_info.get('value')
                mode = value_info.get('mode', 'both')
            else:
                value = value_info
                mode = 'both'
            
            if value:
                if mode == 'both':
                    pattern = f"%{value}%"
                elif mode == 'start':
                    pattern = f"{value}%"
                elif mode == 'end':
                    pattern = f"%{value}"
                else:
                    continue
                
                where_clauses.append(f"{column} LIKE %s")
                params.append(pattern)
    
    # 处理包含查询
    if 'in' in filters:
        for column, values in filters['in'].items():
            if values and isinstance(values, list) and len(values) > 0:
                placeholders = ", ".join(["%s"] * len(values))
                where_clauses.append(f"{column} IN ({placeholders})")
                params.extend(values)
    
    # 处理区间查询
    if 'between' in filters:
        for column, range_info in filters['between'].items():
            min_val = range_info.get('min')
            max_val = range_info.get('max')
            if min_val is not None and max_val is not None:
                where_clauses.append(f"{column} BETWEEN %s AND %s")
                params.extend([min_val, max_val])
    
    # 处理NULL查询
    if 'null' in filters:
        for column, is_null in filters['null'].items():
            if is_null:
                where_clauses.append(f"{column} IS NULL")
            else:
                where_clauses.append(f"{column} IS NOT NULL")
    
    # 处理OR条件组
    if 'or' in filters:
        for or_group in filters['or']:
            or_clauses = []
            for condition in or_group:
                column = condition.get('column')
                operator = condition.get('operator')
                value = condition.get('value')
                
                if column and operator and value is not None:
                    or_clauses.append(f"{column} {operator} %s")
                    params.append(value)
            
            if or_clauses:
                where_clauses.append(f"({' OR '.join(or_clauses)})")
    
    # 添加WHERE子句
    if where_clauses:
        sql_fragments.append("WHERE " + " AND ".join(where_clauses))
    
    # 处理排序
    if 'order_by' in filters:
        order_clauses = []
        for column, direction in filters['order_by'].items():
            direction = direction.upper() if direction else 'ASC'
            if direction not in ['ASC', 'DESC']:
                direction = 'ASC'
            order_clauses.append(f"{column} {direction}")
        
        if order_clauses:
            sql_fragments.append("ORDER BY " + ", ".join(order_clauses))
    
    # 处理分页
    if 'limit' in filters:
        limit = filters['limit']
        offset = filters.get('offset', 0)
        sql_fragments.append("LIMIT %s OFFSET %s")
        params.extend([limit, offset])
    
    # 拼接完整SQL
    sql = " ".join(sql_fragments)
    return sql, params

2. 数据库连接和执行函数

def create_db_connection(host: str, user: str, password: str, database: str, port: int = 3306):
    """创建数据库连接"""
    try:
        connection = pymysql.connect(
            host=host,
            user=user,
            password=password,
            database=database,
            port=port,
            cursorclass=pymysql.cursors.DictCursor,
            charset='utf8mb4'
        )
        return connection
    except Exception as e:
        print(f"数据库连接失败: {e}")
        raise

def execute_query(connection, sql: str, params: List = None) -> List[Dict]:
    """执行查询并返回结果"""
    try:
        with connection.cursor() as cursor:
            cursor.execute(sql, params or [])
            return cursor.fetchall()
    except Exception as e:
        print(f"查询执行失败: {e}")
        print(f"SQL: {sql}")
        print(f"Params: {params}")
        raise
    finally:
        connection.close()

def execute_update(connection, sql: str, params: List = None) -> int:
    """执行更新操作"""
    try:
        with connection.cursor() as cursor:
            result = cursor.execute(sql, params or [])
            connection.commit()
            return result
    except Exception as e:
        connection.rollback()
        print(f"更新执行失败: {e}")
        print(f"SQL: {sql}")
        print(f"Params: {params}")
        raise
    finally:
        connection.close()

3. 查询执行封装函数

def dynamic_search(table: str, filters: Dict, db_config: Dict,columns: Union[str, List[str]] = "*") -> List[Dict]:
    """
    动态查询函数
    
    Args:
        table: 表名
        filters: 查询条件
        columns: 查询字段
        db_config: 数据库配置
        
    Returns:
        查询结果列表
    """
    # 构建SQL和参数
    sql, params = build_dynamic_query(table, filters, columns)
    
    # 创建数据库连接
    connection = create_db_connection(**db_config)
    
    # 执行查询
    return execute_query(connection, sql, params)

使用示例

示例 1: 基础查询

# 数据库配置
db_config = {
    'host': 'localhost',
    'user': 'your_username',
    'password': 'your_password',
    'database': 'your_database'
}

# 示例1: 简单查询
filters1 = {
    'eq': {'status': 'active'},
    'ge': {'price': 1000},
    'le': {'price': 5000},
    'like': {'name': '手机'}
}

# 执行查询
results1 = dynamic_search('products', filters1, db_config=db_config)
print(f"查询结果1: {len(results1)} 条记录")

# 生成的SQL: 
# SELECT * FROM products WHERE status = %s AND price >= %s AND price <= %s AND name LIKE %s
# 参数: ['active', 1000, 5000, '%手机%']

示例 2: 复杂查询(包含 OR 条件和分页)

# 示例2: 复杂查询
filters2 = {
    'in': {'category_id': [1, 3, 5]},
    'between': {'created_at': {'min': '2023-01-01', 'max': '2023-12-31'}},
    'or': [
        [
            {'column': 'name', 'operator': 'LIKE', 'value': '%苹果%'},
            {'column': 'brand', 'operator': '=', 'value': 'Apple'}
        ]
    ],
    'order_by': {'price': 'ASC', 'created_at': 'DESC'},
    'limit': 20,
    'offset': 0
}

results2 = dynamic_search('products', filters2, ['id', 'name', 'price', 'brand'], db_config=db_config)
print(f"查询结果2: {len(results2)} 条记录")

# 生成的SQL:
# SELECT id, name, price, brand FROM products 
# WHERE category_id IN (%s, %s, %s) 
# AND created_at BETWEEN %s AND %s 
# AND (name LIKE %s OR brand = %s) 
# ORDER BY price ASC, created_at DESC 
# LIMIT %s OFFSET %s
# 参数: [1, 3, 5, '2023-01-01', '2023-12-31', '%苹果%', 'Apple', 20, 0]

示例 3: 动态条件组合

def search_products(filters: Dict) -> List[Dict]:
    """产品搜索函数"""
    # 基础查询条件
    query_filters = {}
    
    # 处理名称搜索
    if 'keyword' in filters and filters['keyword']:
        query_filters['like'] = {
            'name': {'value': filters['keyword'], 'mode': 'both'},
            'description': {'value': filters['keyword'], 'mode': 'both'}
        }
    
    # 处理价格筛选
    price_filters = {}
    if 'min_price' in filters and filters['min_price']:
        price_filters['ge'] = {'price': filters['min_price']}
    if 'max_price' in filters and filters['max_price']:
        price_filters['le'] = {'price': filters['max_price']}
    if price_filters:
        query_filters.update(price_filters)
    
    # 处理分类筛选
    if 'category' in filters and filters['category']:
        if isinstance(filters['category'], list):
            query_filters['in'] = {'category_id': filters['category']}
        else:
            query_filters['eq'] = {'category_id': filters['category']}
    
    # 处理状态筛选
    if 'status' in filters and filters['status']:
        query_filters['eq'] = {'status': filters['status']}
    
    # 处理排序
    if 'sort' in filters and filters['sort']:
        sort_info = filters['sort'].split(':')
        sort_column = sort_info[0]
        sort_dir = sort_info[1].upper() if len(sort_info) > 1 else 'ASC'
        query_filters['order_by'] = {sort_column: sort_dir}
    
    # 处理分页
    if 'page' in filters and 'page_size' in filters:
        page = filters['page']
        page_size = filters['page_size']
        offset = (page - 1) * page_size
        query_filters['limit'] = page_size
        query_filters['offset'] = offset
    
    return dynamic_search('products', query_filters, db_config=db_config)

# 使用动态搜索函数
search_params = {
    'keyword': '笔记本电脑',
    'min_price': 3000,
    'max_price': 8000,
    'category': [2, 4],
    'sort': 'price:asc',
    'page': 1,
    'page_size': 15
}

products = search_products(search_params)
print(f"搜索结果: {len(products)} 条记录")

支持的查询条件类型

条件类型

关键字

示例

说明

等于

eq

{'eq': {'status': 'active'}}

字段等于指定值

不等于

ne

{'ne': {'status': 'deleted'}}

字段不等于指定值

大于

gt

{'gt': {'price': 1000}}

字段大于指定值

大于等于

ge

{'ge': {'price': 1000}}

字段大于等于指定值

小于

lt

{'lt': {'price': 5000}}

字段小于指定值

小于等于

le

{'le': {'price': 5000}}

字段小于等于指定值

模糊查询

like

{'like': {'name': '手机'}}

支持前缀、后缀、全匹配

包含查询

in

{'in': {'category_id': [1,2,3]}}

字段值在指定列表中

区间查询

between

{'between': {'price': {'min': 1000, 'max': 5000}}}

字段值在指定区间内

NULL 查询

null

{'null': {'description': True}}

字段值为 NULL 或非 NULL

OR 条件组

or

{'or': [[{'column': 'name', 'operator': 'LIKE', 'value': '%苹果%'}]]}

OR 连接的条件组

排序

order_by

{'order_by': {'price': 'ASC'}}

结果排序

分页

limit + offset

{'limit': 20, 'offset': 0}

结果分页

安全性考虑

  1. 防止 SQL 注入:所有参数都使用%s占位符参数值通过 pymysql 的参数化查询传递永远不直接拼接用户输入到 SQL 字符串中
  2. 输入验证:建议在使用前验证输入参数的类型和格式可以添加字段白名单,限制允许查询的字段
  3. SQL 注入防护示例:
# 安全的方式
filters = {'eq': {'username': user_input}}

# 不安全的方式 (永远不要这样做!)
# sql = f"SELECT * FROM users WHERE username = '{user_input}'"

性能优化建议

  1. 索引优化:为常用查询条件的字段建立索引特别是用于 WHERE、ORDER BY 的字段
  2. 查询优化:只查询需要的字段,避免使用SELECT *合理使用 LIMIT 限制返回结果数量
  3. 连接管理:考虑使用连接池管理数据库连接避免频繁创建和关闭连接

总结

这个纯原生的动态 SQL 拼接方案具有以下特点:

  1. 简洁直接: 不使用复杂的类结构,代码易于理解和维护
  2. 安全可靠: 严格使用参数化查询,有效防止 SQL 注入
  3. 功能完整: 支持各种常见的查询条件类型
  4. 灵活扩展: 容易添加新的查询条件类型
  5. 易于集成: 可以方便地集成到现有的项目中

这种方式适合对 SQL 生成过程需要完全控制的场景,或者项目规模较小、不需要复杂抽象的情况。在实际使用中,你可以根据具体需求对这个基础方案进行调整和扩展。

相关推荐

Spring Boot + Vue.js 实现前后端分离(附源码)

作者:梁小生0101链接:juejin.im/post/5c622fb5e51d457f9f2c2381SpringBoot+Vue.js前后端涉及基本概念介绍,搭建记录,本文会列举出用到环...

C#一步一步实现自己的插件框架(四),从此告别代码紧耦合

初学者写程序一般就是拖控件,双击,然后写上执行的代码,这样在窗口中就有很多事件代码,如果要实现各按钮的状态,那得在很多地方修改代码,极为复杂.通过参考CSHARPDEVELOP的代码就说明和网上各位...

基于UI组件的Vue可视化布局、快速生成.vue代码

一、项目简介基于UI组件的Vue可视化布局、快速生成.vue代码二、实现功能通用(文本、链接、换行、div、图片)支持elementUI支持iViewUI(button、icon、radio、sel...

【开源资讯】ViewUI 4.2.0(原 iView)发布,企业级 UI 组件库

简介iView作者Aresn于2019年创办了北京视图更新科技有限公司,开始自由、全职地维护iView及其相关的软件。ViewUI即为原先的iView,从2019年10月起...

Python GUI 编程入门教程 第25章:记账本应用升级—类别统计与图表

25.1项目目标在第24章的月份筛选功能基础上,新增:类别输入:记录时选择支出/收入类别,例如:餐饮、交通、购物、工资、理财等类别统计:计算选定月份的各类别总额类别图表:生成饼图,展示各类别所占...

Python GUI 编程入门教程 第8章:文件处理、数据库操作与网络通信

8.1文件操作:处理本地文件与文件对话框在Tkinter应用中,文件操作是常见的需求。Tkinter提供了简单的文件对话框来帮助用户选择文件,并能通过Python内建的文件处理模块来读取和写入文件。...

手把手教你用Python做个可视化的“剪刀石头布”小游戏

/1前言/最近在学习PyQt5可视化界面,这是一个内容非常丰富的gui库,相对于tkinter库,功能更加强大,界面更加美观,操作也不难。于是我开始小试牛刀,用PyQt5做个可视化的“剪刀石头布”...

掌握基础技能快速用Python设计界面

我们在设计软件界面的时候,应该掌握一定的基础知识,不能我们看起来非常费解也很累。到后面设计界面的时候,很多基础知识不可能如你开始学的时候讲的那样仔细。熟练掌握Python的基本语法,如变量、数据类型...

Python GUI 编程入门教程 第22章:综合实战项目——记账本应用

22.1项目目标我们要开发一个带数据库的记账本,主要功能:添加收支记录(日期、类别、金额、备注)显示所有记录(表格形式)支持删除记录自动保存到SQLite数据库统计总收支22.2项目结构budge...

Python GUI 编程入门教程 第10章:高级布局与界面美化

10.1高级布局管理:使用grid和placeTkinter提供了三种常用的布局管理方式:pack、grid和place。在本章中,我们重点介绍grid和place,这两种布局方式相较于pack更加...

别再手动复制粘贴了!Python一招搞定取PDF内容,效率提升10倍!

别再手动复制粘贴了!Python一招搞定取PDF内容,效率提升10倍!还在为PDF内容提取头疼?100页的文档要折腾一下午?今天教你用Python几行代码搞定,10秒钟解决战斗,办公室小白也能轻松学会...

DearPyGui:GUI 性能秒杀 PyQt,揭秘 GPU 加速的 DearPyGui

什么是DearPyGui?嘿,最近我发现了一个超有意思的PythonGUI框架——DearPyGui。名字有点拗口,但它可不是随便起的。它基于C++和GPU渲染,性能吊打传统的Tki...

Python GUI 编程入门教程 第7章:事件绑定、动画效果与外部交互

7.1事件绑定:响应用户操作在Tkinter中,事件绑定允许你为控件添加响应函数,以处理用户的输入事件,如鼠标点击、键盘输入等。事件可以是各种形式的交互,如点击按钮、键盘按键等。7.1.1绑定鼠标...

Python GUI 编程入门教程 第21章:综合实战项目——记事本应用

21.1项目目标我们要实现一个简易版的记事本,具备以下功能:新建、打开、保存文件复制、粘贴、剪切、全选设置字体大小查找文字显示应用信息界面大致效果如下:+----------------------...

Python GUI 编程入门教程 第14章:构建复杂图形界面

14.1界面布局管理在Tkinter中,界面控件的排列是通过布局管理器来实现的。Tkinter提供了三种布局管理器:pack、grid和place,每种布局管理器都有其独特的用途和优势。14.1.1...