在 FastAPI 中使用原生的 SQL 语句并通过 mysql.connector 进行 MySQL 数据库操作,是一种轻量级且高效的数据库交互方式。与使用 ORM(如 SQLAlchemy)相比,直接使用原生 SQL 语句可以提供更大的灵活性和控制力。本文将详细介绍如何在 FastAPI 中使用 mysql.connector 管理数据库连接,包括连接池的设置、在请求生命周期中打开和关闭连接、以及编写高效且安全的数据库操作代码。

目录

  1. 前提条件
  2. 安装必要的库
  3. 配置数据库连接
  4. 设置连接池
  5. 整合 FastAPI 的生命周期事件
  6. 创建数据库依赖
  7. 编写示例路由
  8. 完整示例代码
  9. 最佳实践与注意事项
  10. 总结

1. 前提条件

确保您已经具备以下内容:

  • Python 3.7 及以上版本
  • FastAPI 安装并基本了解其使用
  • MySQL 数据库已安装并运行
  • 基本的 SQL 语句知识

2. 安装必要的库

首先,您需要安装 fastapimysql-connector-python 库。建议使用虚拟环境来管理项目依赖。

1
pip install fastapi uvicorn mysql-connector-python python-dotenv
  • fastapi:用于构建 Web API。
  • uvicorn:ASGI 服务器,用于运行 FastAPI 应用。
  • mysql-connector-python:MySQL 官方的 Python 连接器,用于与 MySQL 数据库交互。
  • python-dotenv:用于加载环境变量,管理敏感信息(如数据库凭证)。

3. 配置数据库连接

使用环境变量管理敏感信息

为了安全和灵活性,建议使用环境变量来存储数据库连接信息。创建一个 .env 文件来存储这些信息:

1
2
3
4
5
6
7
DB_HOST=localhost
DB_PORT=3306
DB_USER=your_username
DB_PASSWORD=your_password
DB_NAME=your_database
DB_POOL_NAME=fastapi_pool
DB_POOL_SIZE=5

注意:确保 .env 文件不被提交到版本控制系统(如 Git),可以在 .gitignore 文件中添加 .env

4. 设置连接池

mysql.connector 提供了连接池的功能,可以有效地管理数据库连接,提高性能并避免频繁地创建和销毁连接。以下是如何设置连接池的步骤:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# database.py

import os
from mysql.connector import pooling
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", 3306))
DB_USER = os.getenv("DB_USER", "root")
DB_PASSWORD = os.getenv("DB_PASSWORD", "")
DB_NAME = os.getenv("DB_NAME", "test_db")
DB_POOL_NAME = os.getenv("DB_POOL_NAME", "fastapi_pool")
DB_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", 5))

# 创建连接池
connection_pool = pooling.MySQLConnectionPool(
    pool_name=DB_POOL_NAME,
    pool_size=DB_POOL_SIZE,
    pool_reset_session=True,
    host=DB_HOST,
    port=DB_PORT,
    user=DB_USER,
    password=DB_PASSWORD,
    database=DB_NAME,
    autocommit=True  # 根据需要设置是否自动提交
)

关键参数说明

  • pool_name:连接池的名称。
  • pool_size:连接池中可用连接的数量。
  • pool_reset_session:当连接从池中取出时,是否重置会话。
  • autocommit:设置是否自动提交事务,根据需求调整。

5. 整合 FastAPI 的生命周期事件

为了确保在应用启动时初始化连接池,并在应用关闭时清理资源,可以利用 FastAPI 的生命周期事件 startupshutdown

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# main.py

from fastapi import FastAPI
from database import connection_pool

app = FastAPI()

@app.on_event("startup")
def startup_event():
    # 在这里可以执行任何启动时需要的操作
    # 由于连接池已经在 database.py 中初始化,这里不需要额外操作
    print("Application startup: Connection pool is ready.")

@app.on_event("shutdown")
def shutdown_event():
    # 关闭连接池中的所有连接
    connection_pool.close()
    print("Application shutdown: Connection pool is closed.")

注意mysql.connector.pooling.MySQLConnectionPool 不提供直接关闭连接池的方法。connection_pool.close() 实际上会关闭所有连接。如果您的库版本不支持 close(), 您可能需要手动管理连接关闭。

6. 创建数据库依赖

为了在每个请求中安全地获取和释放数据库连接,可以使用 FastAPI 的依赖注入系统。定义一个依赖项,用于获取连接并确保在请求结束时释放连接。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# dependencies.py

from fastapi import Depends, HTTPException
from mysql.connector import Error
from database import connection_pool

def get_db_connection():
    try:
        # 从连接池中获取一个连接
        connection = connection_pool.get_connection()
        if connection.is_connected():
            yield connection
        else:
            raise HTTPException(status_code=500, detail="Failed to connect to the database.")
    except Error as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        if 'connection' in locals() and connection.is_connected():
            connection.close()

解释

  • 使用 yield 来创建一个生成器,确保在请求结束时执行 finally 块,关闭连接。
  • 捕获任何连接错误,并返回适当的 HTTP 异常。

7. 编写示例路由

以下是一个示例路由,展示如何使用原生 SQL 语句进行数据库操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# routes.py

from fastapi import APIRouter, Depends
from dependencies import get_db_connection
from mysql.connector import cursor

router = APIRouter()

@router.get("/items/")
def read_items(db: cursor = Depends(get_db_connection)):
    query = "SELECT id, name, description FROM items"
    try:
        cursor = db.cursor(dictionary=True)
        cursor.execute(query)
        results = cursor.fetchall()
        return {"items": results}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        cursor.close()

@router.post("/items/")
def create_item(name: str, description: str, db: cursor = Depends(get_db_connection)):
    query = "INSERT INTO items (name, description) VALUES (%s, %s)"
    values = (name, description)
    try:
        cursor = db.cursor()
        cursor.execute(query, values)
        db.commit()
        return {"message": "Item created successfully.", "item_id": cursor.lastrowid}
    except Exception as e:
        db.rollback()
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        cursor.close()

关键点

  • 使用参数化查询:通过 %s 占位符和 values 元组,防止 SQL 注入。
  • 字典游标:使用 dictionary=True 获取字典格式的结果,便于 JSON 序列化。
  • 事务管理:在插入或更新操作后调用 db.commit(),在异常情况下调用 db.rollback() 以回滚事务。
  • 资源释放:使用 finally 块确保游标被关闭。

8. 完整示例代码

以下是一个完整的示例项目结构和代码,展示如何在 FastAPI 中使用 mysql.connector 进行原生 SQL 操作。

项目结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
fastapi_mysql_project/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── database.py
│   ├── dependencies.py
│   └── routes.py
├── .env
├── requirements.txt
└── README.md

8.1 app/database.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# app/database.py

import os
from mysql.connector import pooling
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", 3306))
DB_USER = os.getenv("DB_USER", "root")
DB_PASSWORD = os.getenv("DB_PASSWORD", "")
DB_NAME = os.getenv("DB_NAME", "test_db")
DB_POOL_NAME = os.getenv("DB_POOL_NAME", "fastapi_pool")
DB_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", 5))

# 创建连接池
connection_pool = pooling.MySQLConnectionPool(
    pool_name=DB_POOL_NAME,
    pool_size=DB_POOL_SIZE,
    pool_reset_session=True,
    host=DB_HOST,
    port=DB_PORT,
    user=DB_USER,
    password=DB_PASSWORD,
    database=DB_NAME,
    autocommit=True  # 根据需要设置是否自动提交
)

8.2 app/dependencies.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# app/dependencies.py

from fastapi import Depends, HTTPException
from mysql.connector import Error, connection
from app.database import connection_pool

def get_db_connection():
    try:
        # 从连接池中获取一个连接
        conn: connection.MySQLConnection = connection_pool.get_connection()
        if conn.is_connected():
            yield conn
        else:
            raise HTTPException(status_code=500, detail="Failed to connect to the database.")
    except Error as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        if 'conn' in locals() and conn.is_connected():
            conn.close()

8.3 app/routes.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# app/routes.py

from fastapi import APIRouter, Depends, HTTPException
from mysql.connector import cursor
from app.dependencies import get_db_connection

router = APIRouter()

@router.get("/items/")
def read_items(db: connection.MySQLConnection = Depends(get_db_connection)):
    query = "SELECT id, name, description FROM items"
    try:
        cursor = db.cursor(dictionary=True)
        cursor.execute(query)
        results = cursor.fetchall()
        return {"items": results}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        cursor.close()

@router.post("/items/")
def create_item(name: str, description: str, db: connection.MySQLConnection = Depends(get_db_connection)):
    query = "INSERT INTO items (name, description) VALUES (%s, %s)"
    values = (name, description)
    try:
        cursor = db.cursor()
        cursor.execute(query, values)
        db.commit()
        return {"message": "Item created successfully.", "item_id": cursor.lastrowid}
    except Exception as e:
        db.rollback()
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        cursor.close()

8.4 app/main.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# app/main.py

from fastapi import FastAPI
from app.routes import router

app = FastAPI()

app.include_router(router)

@app.on_event("startup")
def startup_event():
    print("Application startup: Connection pool is ready.")

@app.on_event("shutdown")
def shutdown_event():
    from app.database import connection_pool
    connection_pool.close()
    print("Application shutdown: Connection pool is closed.")

8.5 requirements.txt

1
2
3
4
fastapi
uvicorn
mysql-connector-python
python-dotenv

8.6 .env

1
2
3
4
5
6
7
DB_HOST=localhost
DB_PORT=3306
DB_USER=your_username
DB_PASSWORD=your_password
DB_NAME=your_database
DB_POOL_NAME=fastapi_pool
DB_POOL_SIZE=5

9. 最佳实践与注意事项

9.1 使用连接池

  • 连接池大小:根据应用的预期负载和数据库服务器的能力设置适当的连接池大小。过大的连接池可能导致数据库过载,过小则可能导致连接等待时间过长。
  • 自动提交:根据应用需求设置 autocommit。对于需要事务管理的操作,可以关闭自动提交,并手动控制提交和回滚。

9.2 依赖注入

  • 依赖的作用域:确保数据库连接的依赖项的作用域是 per-request,即每个请求获取一个连接并在请求结束时释放。
  • 错误处理:在依赖项中捕获连接错误,并返回适当的 HTTP 异常,避免将敏感的数据库错误暴露给客户端。

9.3 安全性

  • 参数化查询:始终使用参数化查询来防止 SQL 注入攻击。
  • 最小权限原则:数据库用户应具有最低必要的权限,避免使用具有广泛权限的用户进行数据库操作。

9.4 性能优化

  • 索引:确保数据库表中经常查询的字段建立了索引,以提高查询性能。
  • 批量操作:对于需要处理大量数据的操作,考虑使用批量插入或更新来减少数据库交互次数。

9.5 日志记录

  • 错误日志:记录数据库操作中的错误,帮助调试和监控应用。
  • 查询日志:对于性能调优,可以记录和分析慢查询。

9.6 异步编程

mysql.connector 是一个同步库。如果您的应用需要高并发和异步操作,考虑使用异步的 MySQL 连接器,如 aiomysqlasyncmy,以充分利用 FastAPI 的异步特性。

10. 总结

在 FastAPI 中使用原生 SQL 语句和 mysql.connector 进行数据库操作,是一种高效且灵活的方式。通过合理地管理数据库连接池、整合 FastAPI 的生命周期事件、使用依赖注入系统获取和释放连接,可以确保应用的稳定性和性能。遵循最佳实践,如使用参数化查询、设置适当的连接池大小、确保安全性和性能优化,能够进一步提升应用质量。

通过本文提供的详细示例和指导,您应该能够在 FastAPI 项目中成功地集成和管理 MySQL 数据库连接,编写高效、安全的数据库操作代码。如果您有更多问题或需要进一步的帮助,请随时提问!