目 录CONTENT

文章目录
ACM

Python库 - PyMySQL

解小风
2025-06-26 / 0 评论 / 1 点赞 / 3 阅读 / 17009 字

简介

PyMySQL 用于使用 Python3 连接服务器的 MySQL 数据库(Python2 则使用 mysqldb)。

安装与配置

S1 安装 MySQL 软件

参见:安装 MySQL


S2 安装 PyMySQL 库

# 安装(临时使用清华源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple/ pymysql

# 查看版本
import pymysql
print(pymysql.__version__)

数据库基础操作通用模板

######### database_opn.py 文件内容#########
# -*- coding: utf-8 -*-
# @Time : 2025.06.26
# @Author : faramita

import re
import pymysql


# 全局配置
DATABASE_INFO = {
    "host" : "127.0.0.1", 
    "port" : 3306,
    "user" : "root",
    "password" : "123456"
    }
# host: 目标数据库服务的 ip,"localhost" 或 "127.0.0.1" 代表本地
# port: 目标数据库服务的 端口
# user: 目标数据库服务的用户名(即 MySQL 的用户名)
# password: 目标数据库服务的密码(即 MySQL 的密码)


class InitDatabase():
    """
    # 功能:初始化数据库
    """
    def __init__(self):
        print( "[INFO] 启动 InitDatabase 初始化 ..." )
        try:
            # 尝试连接 MySQL 数据库
            self.conn = pymysql.connect(
                host=DATABASE_INFO["host"], 
                port=DATABASE_INFO["port"], 
                user=DATABASE_INFO["user"], 
                password=DATABASE_INFO["password"], 
                charset='utf8mb4'
            )
            print( "[INFO] 已连接 MySQL 数据库!" )
        except Exception as err:
            print( f"[ERROR] 连接数据库失败: {err}" )
            raise


    def __del__(self):
        """
        # 功能:析构函数中关闭连接
        """
        self.connection_close()


    def _is_valid_name(self, name):
        """
        # 功能:校验数据库/表名是否合法
        # 输入:
        #    name: {str} 数据库名或表名
        """
        # 排除 None 或空字符串
        if not isinstance(name, str) or not name:
            return False
        # MySQL 数据库名最大长度为 64
        if len(name) > 64:
            return False
        return re.match(r'^[\w$\-#]+$', name) is not None


    def connection_close(self):
        """
        # 功能:显式关闭数据库连接
        """
        if hasattr(self, 'conn') and self.conn.open:
            self.conn.close()
            print( "[INFO] 已关闭数据库连接" )


    def database_create(self, db_name):
        """
        # 功能:创建数据库
        # 输入:
        #    db_name: {str} 数据库名
        # 输出:
        #    {bool} 是否成功
        """
        try:
            if not self._is_valid_name(db_name):
                raise ValueError("数据库名包含非法字符!")
            if not hasattr(self, 'conn') or not self.conn.open:
                raise ConnectionError("未连接数据库,请检查初始化是否成功!")

            # 获取操作游标
            with self.conn.cursor() as cursor:
                # 构建 SQL 语句:创建数据库
                sql_cmd = f"CREATE DATABASE IF NOT EXISTS `{db_name}`"
                # print( f"[INFO] 执行 SQL :{sql_cmd}" )
                # 执行 SQL 语句
                cursor.execute(sql_cmd)

            print( f"[INFO] 已创建数据库:{db_name}(若不存在)" )
            return True
        except Exception as err:
            print( f"[ERROR] 创建数据库 {db_name} 失败:{err}" )
            return False


    def database_delete(self, db_name):
        """
        # 功能:删除数据库
        # 输入:
        #    db_name: {str} 数据库名
        # 输出:
        #    {bool} 是否成功
        """
        try:
            if not self._is_valid_name(db_name):
                raise ValueError("数据库名包含非法字符!")
            if not hasattr(self, 'conn') or not self.conn.open:
                raise ConnectionError("未连接数据库,请检查初始化是否成功!")

            # 获取操作游标
            with self.conn.cursor() as cursor:
                # 构建 SQL 语句:删除数据库
                sql_cmd = f"DROP DATABASE IF EXISTS `{db_name}`"
                # print( f"[INFO] 执行 SQL :{sql_cmd}" )
                # 执行 SQL 语句
                cursor.execute(sql_cmd)

            print( f"[INFO] 已删除数据库:{db_name}(若存在)" )
            return True
        except Exception as err:
            print( f"[ERROR] 删除数据库 {db_name} 失败:{err}" )
            return False


class DatabaseOperation():
    """
    # 功能:数据库操作相关函数
    """
    def __init__(self, db_name):
        print( "[INFO] 启动 DatabaseOperation 初始化 ..." )

        if not self._is_valid_name(db_name):
            raise ValueError("数据库名包含非法字符!")

        try:
            # 尝试连接 MySQL 数据库
            # 查询结果使用字典格式 pymysql.cursors.DictCursor
            self.conn = pymysql.connect(
                host=DATABASE_INFO["host"], 
                port=DATABASE_INFO["port"], 
                user=DATABASE_INFO["user"], 
                password=DATABASE_INFO["password"], 
                database=db_name, 
                charset='utf8mb4',
                cursorclass=pymysql.cursors.DictCursor
            )
            print( f"[INFO] 已连接数据库:{db_name}" )
        except Exception as err:
            print( f"[ERROR] 连接数据库 {db_name} 失败: {err}" )
            raise


    def __del__(self):
        """
        # 功能:析构函数中关闭连接
        """
        self.connection_close()


    def _is_valid_name(self, name):
        """
        # 功能:校验数据库/表名是否合法
        # 输入:
        #    name: {str} 数据库名或表名
        """
        # 排除 None 或空字符串
        if not isinstance(name, str) or not name:
            return False
        # MySQL 数据库名最大长度为 64
        if len(name) > 64:
            return False
        return re.match(r'^[\w$\-#]+$', name) is not None


    def connection_close(self):
        """
        # 功能:显式关闭数据库连接
        """
        if hasattr(self, 'conn') and self.conn.open:
            self.conn.close()
            print( "[INFO] 已关闭数据库连接" )


    def table_create(self, table_name, table_info):
        """
        # 功能:创建数据表
        # 输入:
        #    table_name: {str} 数据表名
        #    table_info: {str} SQL 语句中表结构信息,如:
        #       table_info = 
                    ID int NOT NULL PRIMARY KEY AUTO_INCREMENT COMMENT '自增主键',
                    Name varchar(255) COMMENT '姓名',
                    Age int COMMENT '年龄',
                    Score REAL COMMENT '分数',
                    UpdateTime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据插入与更新时间'
        # 输出:
        #    {bool} 是否成功
        """
        try:
            if not self._is_valid_name(table_name):
                raise ValueError("数据表名包含非法字符!")
            if not hasattr(self, 'conn') or not self.conn.open:
                raise ConnectionError("未连接数据库!")
            if not isinstance(table_info, str) or not table_info.strip():
                raise ValueError("表结构信息 (table_info) 不能为空")

            # 获取操作游标
            with self.conn.cursor() as cursor:
                # 构建 SQL 语句:删除原有数据表
                sql_cmd = f"DROP TABLE IF EXISTS `{table_name}`"
                # print( f"[INFO] 执行 SQL :{sql_cmd}" )
                # 执行 SQL 语句
                cursor.execute(sql_cmd)

                # 构建 SQL 语句:建表
                sql_cmd = f"""CREATE TABLE `{table_name}` ({table_info}) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;"""
                # print( f"[INFO] 执行 SQL :{sql_cmd}" )
                # 执行 SQL 语句
                cursor.execute(sql_cmd)

            print( f"[INFO] 已新建数据表: {table_name}" )
            return True
        except Exception as err:
            print( f"[ERROR] 新建数据表 {table_name} 失败:{err} " )
            return False


    def table_delete(self, table_name):
        """
        # 功能:删除数据表
        # 输入:
        #    table_name: {str} 数据表名
        # 输出:
        #    {bool} 是否成功
        """
        try:
            if not self._is_valid_name(table_name):
                raise ValueError("数据表名包含非法字符!")
            if not hasattr(self, 'conn') or not self.conn.open:
                raise ConnectionError("未连接数据库!")

            # 获取操作游标
            with self.conn.cursor() as cursor:
                # 构建 SQL 语句:删除原有数据表
                sql_cmd = f"DROP TABLE IF EXISTS `{table_name}`"
                # print( f"[INFO] 执行 SQL :{sql_cmd}" )
                # 执行 SQL 语句
                cursor.execute(sql_cmd)

            print( f"[INFO] 已删除数据表: {table_name}" )
            return True
        except Exception as err:
            print( f"[ERROR] 删除数据表 {table_name} 失败:{err} " )
            return False


    def table_data_read(self, table_name):
        """
        # 功能:读取数据表所有数据
        # 输入:
        #    table_name: {str} 数据表名
        # 输出:
        #    data_all: {list} 数据表所有数据
        """
        try:
            if not self._is_valid_name(table_name):
                raise ValueError("数据表名包含非法字符!")
            if not hasattr(self, 'conn') or not self.conn.open:
                raise ConnectionError("未连接数据库!")

            # 获取操作游标,并使用 DictCursor 保证结果格式统一
            with self.conn.cursor(cursor=pymysql.cursors.DictCursor) as cursor:
                # 构建 SQL 语句:读取数据表所有数据
                sql_cmd = f"SELECT * FROM `{table_name}`"
                # print( f"[INFO] 执行 SQL :{sql_cmd}" )
                # 执行 SQL 语句
                cursor.execute(sql_cmd)
                # 获取所有返回结果行
                value_rows = cursor.fetchall()

            data_all = list(value_rows)
            print( f"[INFO] 已读取数据表: {table_name}" )
        except Exception as err:
            data_all = ['ERROR']
            print( f"[ERROR] 读取数据表 {table_name} 失败:{err} " )
        finally:
            return data_all
            # 由于在连接数据库时设置了 cursorclass=pymysql.cursors.DictCursor
            # 因此 cursor.fetchall() 返回的是一个由字典组成的列表
            # 因此 data_all 形如:
            #   [
            #       {'id': 1, 'name': 'Alice', 'age': 20},
            #       {'id': 2, 'name': 'Bob', 'age': 22}
            #   ]


    def table_data_search(self, table_name, search_info):
        """
        # 功能:查询表数据
        # 输入:
        #    table_name: {str} 数据表名
        #    search_info: {str} SQL 语句中具体搜索内容,如:
        #       search_info = "Name='Alice' AND Score > 85"
        # 输出:
        #    data_searched: {list} 查询数据
        """
        try:
            if not self._is_valid_name(table_name):
                raise ValueError("数据表名包含非法字符!")
            if not hasattr(self, 'conn') or not self.conn.open:
                raise ConnectionError("未连接数据库!")
            if not isinstance(search_info, str) or not search_info.strip():
                raise ValueError("具体搜索内容 (search_info) 不能为空")

            # 获取操作游标,并使用 DictCursor 保证结果格式统一
            with self.conn.cursor(cursor=pymysql.cursors.DictCursor) as cursor:
                # 构建 SQL 语句:查询
                sql_cmd = f"SELECT * FROM `{table_name}` WHERE {search_info}"
                # print( f"[INFO] 执行 SQL :{sql_cmd}" )
                # 执行 SQL 语句
                cursor.execute(sql_cmd)
                # 获取所有返回结果行
                value_rows = cursor.fetchall()
                # 只获取一行结果
                # value_rows = cursor.fetchone()

            data_searched = list(value_rows)
            print( f"[INFO] 已查询数据: {search_info}" )
        except Exception as err:
            data_searched = ['ERROR']
            print( f"[ERROR] 查询数据 {search_info} 失败: {err}" )
        finally:
            return data_searched
            # 由于在连接数据库时设置了 cursorclass=pymysql.cursors.DictCursor
            # 因此 cursor.fetchall() 返回的是一个由字典组成的列表
            # 因此 data_searched 形如:
            #   [
            #       {'id': 1, 'name': 'Alice', 'age': 20},
            #       {'id': 2, 'name': 'Bob', 'age': 22}
            #   ]


    def table_data_insert(self, table_name, insert_data):
        """
        # 功能:插入表数据
        # 输入:
        #    table_name: {str} 数据表名
        #    insert_data: {dict} 插入的数据,键为字段名,值为对应的数据,如:
        #       insert_data = {"Name": "Alice", "Score": 95}
        # 输出:
        #    {bool} 是否成功
        """
        try:
            if not self._is_valid_name(table_name):
                raise ValueError("数据表名包含非法字符!")
            if not hasattr(self, 'conn') or not self.conn.open:
                raise ConnectionError("未连接数据库!")
            if not isinstance(insert_data, dict) or not insert_data:
                raise ValueError("插入数据 (insert_data) 必须为非空字典")

            # 构建字段名、占位符、值
            columns = ', '.join([f'`{key}`' for key in insert_data.keys()])
            placeholders = ', '.join(['%s'] * len(insert_data))
            values = list(insert_data.values())

            # 获取操作游标
            with self.conn.cursor() as cursor:
                # 构建 SQL 语句:插入
                sql_cmd = f"INSERT INTO `{table_name}` ({columns}) VALUES ({placeholders})"
                # print( f"[INFO] 执行 SQL :{sql_cmd} - {values}" )
                # 执行 SQL 语句
                cursor.execute(sql_cmd, values)
                # 提交到数据库执行
                self.conn.commit()

            print( f"[INFO] 已插入数据到 {table_name}: {insert_data}" )
            return True
        except Exception as err:
            # 如果发生错误则回滚并打印错误信息
            self.conn.rollback()
            print( f"[ERROR] 插入数据到 {table_name} 失败: {err}" )
            return False


    def table_data_update(self, table_name, update_data, update_where):
        """
        # 功能:更新表数据
        # 输入:
        #    table_name: {str} 数据表名
        #    update_data: {dict} 更新的数据,键为字段名,值为对应的新数据,如:
        #       update_data = {"Name": "Bob", "Score": 90}
        #    update_where: {str} 更新条件,如:
        #       update_where = "StudentID=1"
        # 输出:
        #    {bool} 是否成功
        """
        try:
            if not self._is_valid_name(table_name):
                raise ValueError("数据表名包含非法字符!")
            if not hasattr(self, 'conn') or not self.conn.open:
                raise ConnectionError("未连接数据库!")
            if not isinstance(update_data, dict) or not update_data:
                raise ValueError("更新数据 (update_data) 必须为非空字典")
            if not isinstance(update_where, str) or not update_where.strip():
                raise ValueError("更新条件 (update_where) 必须为非空字符串")

            # 构建 SET 部分(参数化值)
            set_clauses = [f"`{key}` = %s" for key in update_data.keys()]
            set_part = ', '.join(set_clauses)
            # 收集所有要更新的值
            values = list(update_data.values())

            # 获取操作游标
            with self.conn.cursor() as cursor:
                # 构建 SQL 语句:更新
                sql_cmd = f"UPDATE `{table_name}` SET {set_part} WHERE {update_where};"
                # print( f"[INFO] 执行 SQL :{sql_cmd} - {values}" )
                # 执行 SQL 语句
                cursor.execute(sql_cmd, values)
                # 提交到数据库执行
                self.conn.commit()

            print(f"[INFO] 已更新数据在 {table_name}: {update_data} WHERE {update_where}")
            return True
        except Exception as err:
            # 如果发生错误则回滚并打印错误信息
            self.conn.rollback()
            print(f"[ERROR] 更新数据在 {table_name} 失败: {err}")
            return False


    def table_data_delete(self, table_name, delete_where):
        """
        # 功能:删除表数据
        # 输入:
        #    table_name: {str} 数据表名
        #    delete_where: {str} 删除条件,如:
        #       delete_where = "StudentID=1"
        # 输出:
        #    {bool} 是否成功
        """
        try:
            if not self._is_valid_name(table_name):
                raise ValueError("数据表名包含非法字符!")
            if not hasattr(self, 'conn') or not self.conn.open:
                raise ConnectionError("未连接数据库!")
            if not isinstance(delete_where, str) or not delete_where.strip():
                raise ValueError("删除条件 (delete_where) 必须为非空字符串")

            # 获取操作游标
            with self.conn.cursor() as cursor:
                # 构建 SQL 语句:删除
                sql_cmd = f"DELETE FROM `{table_name}` WHERE {delete_where};"
                # print( f"[INFO] 执行 SQL :{sql_cmd}" )
                # 执行 SQL 语句
                cursor.execute(sql_cmd)
                # 提交到数据库执行
                self.conn.commit()

            print(f"[INFO] 已删除数据在 {table_name} WHERE {delete_where}")
            return True
        except Exception as err:
            # 如果发生错误则回滚并打印错误信息
            self.conn.rollback()
            print(f"[ERROR] 删除数据在 {table_name} 失败: {err}")
            return False

1
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区