在 FastAPI 中使用原生的 SQL 语句并通过 mysql.connector
进行 MySQL 数据库操作,是一种轻量级且高效的数据库交互方式。与使用 ORM(如 SQLAlchemy)相比,直接使用原生 SQL 语句可以提供更大的灵活性和控制力。本文将详细介绍如何在 FastAPI 中使用 mysql.connector
管理数据库连接,包括连接池的设置、在请求生命周期中打开和关闭连接、以及编写高效且安全的数据库操作代码。
- 前提条件
- 安装必要的库
- 配置数据库连接
- 设置连接池
- 整合 FastAPI 的生命周期事件
- 创建数据库依赖
- 编写示例路由
- 完整示例代码
- 最佳实践与注意事项
- 总结
1. 前提条件#
确保您已经具备以下内容:
- Python 3.7 及以上版本
- FastAPI 安装并基本了解其使用
- MySQL 数据库已安装并运行
- 基本的 SQL 语句知识
2. 安装必要的库#
首先,您需要安装 fastapi
和 mysql-connector-python
库。建议使用虚拟环境来管理项目依赖。
1
| pip install fastapi uvicorn mysql-connector-python python-dotenv
|
- fastapi:用于构建 Web API。
- uvicorn:ASGI 服务器,用于运行 FastAPI 应用。
- mysql-connector-python:MySQL 官方的 Python 连接器,用于与 MySQL 数据库交互。
- python-dotenv:用于加载环境变量,管理敏感信息(如数据库凭证)。
3. 配置数据库连接#
使用环境变量管理敏感信息#
为了安全和灵活性,建议使用环境变量来存储数据库连接信息。创建一个 .env
文件来存储这些信息:
1
2
3
4
5
6
7
| DB_HOST=localhost
DB_PORT=3306
DB_USER=your_username
DB_PASSWORD=your_password
DB_NAME=your_database
DB_POOL_NAME=fastapi_pool
DB_POOL_SIZE=5
|
注意:确保 .env
文件不被提交到版本控制系统(如 Git),可以在 .gitignore
文件中添加 .env
。
4. 设置连接池#
mysql.connector
提供了连接池的功能,可以有效地管理数据库连接,提高性能并避免频繁地创建和销毁连接。以下是如何设置连接池的步骤:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| # database.py
import os
from mysql.connector import pooling
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", 3306))
DB_USER = os.getenv("DB_USER", "root")
DB_PASSWORD = os.getenv("DB_PASSWORD", "")
DB_NAME = os.getenv("DB_NAME", "test_db")
DB_POOL_NAME = os.getenv("DB_POOL_NAME", "fastapi_pool")
DB_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", 5))
# 创建连接池
connection_pool = pooling.MySQLConnectionPool(
pool_name=DB_POOL_NAME,
pool_size=DB_POOL_SIZE,
pool_reset_session=True,
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME,
autocommit=True # 根据需要设置是否自动提交
)
|
关键参数说明:
- pool_name:连接池的名称。
- pool_size:连接池中可用连接的数量。
- pool_reset_session:当连接从池中取出时,是否重置会话。
- autocommit:设置是否自动提交事务,根据需求调整。
5. 整合 FastAPI 的生命周期事件#
为了确保在应用启动时初始化连接池,并在应用关闭时清理资源,可以利用 FastAPI 的生命周期事件 startup
和 shutdown
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| # main.py
from fastapi import FastAPI
from database import connection_pool
app = FastAPI()
@app.on_event("startup")
def startup_event():
# 在这里可以执行任何启动时需要的操作
# 由于连接池已经在 database.py 中初始化,这里不需要额外操作
print("Application startup: Connection pool is ready.")
@app.on_event("shutdown")
def shutdown_event():
# 关闭连接池中的所有连接
connection_pool.close()
print("Application shutdown: Connection pool is closed.")
|
注意:mysql.connector.pooling.MySQLConnectionPool
不提供直接关闭连接池的方法。connection_pool.close()
实际上会关闭所有连接。如果您的库版本不支持 close()
, 您可能需要手动管理连接关闭。
6. 创建数据库依赖#
为了在每个请求中安全地获取和释放数据库连接,可以使用 FastAPI 的依赖注入系统。定义一个依赖项,用于获取连接并确保在请求结束时释放连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| # dependencies.py
from fastapi import Depends, HTTPException
from mysql.connector import Error
from database import connection_pool
def get_db_connection():
try:
# 从连接池中获取一个连接
connection = connection_pool.get_connection()
if connection.is_connected():
yield connection
else:
raise HTTPException(status_code=500, detail="Failed to connect to the database.")
except Error as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
if 'connection' in locals() and connection.is_connected():
connection.close()
|
解释:
- 使用
yield
来创建一个生成器,确保在请求结束时执行 finally
块,关闭连接。 - 捕获任何连接错误,并返回适当的 HTTP 异常。
7. 编写示例路由#
以下是一个示例路由,展示如何使用原生 SQL 语句进行数据库操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| # routes.py
from fastapi import APIRouter, Depends
from dependencies import get_db_connection
from mysql.connector import cursor
router = APIRouter()
@router.get("/items/")
def read_items(db: cursor = Depends(get_db_connection)):
query = "SELECT id, name, description FROM items"
try:
cursor = db.cursor(dictionary=True)
cursor.execute(query)
results = cursor.fetchall()
return {"items": results}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
cursor.close()
@router.post("/items/")
def create_item(name: str, description: str, db: cursor = Depends(get_db_connection)):
query = "INSERT INTO items (name, description) VALUES (%s, %s)"
values = (name, description)
try:
cursor = db.cursor()
cursor.execute(query, values)
db.commit()
return {"message": "Item created successfully.", "item_id": cursor.lastrowid}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
finally:
cursor.close()
|
关键点:
- 使用参数化查询:通过
%s
占位符和 values
元组,防止 SQL 注入。 - 字典游标:使用
dictionary=True
获取字典格式的结果,便于 JSON 序列化。 - 事务管理:在插入或更新操作后调用
db.commit()
,在异常情况下调用 db.rollback()
以回滚事务。 - 资源释放:使用
finally
块确保游标被关闭。
8. 完整示例代码#
以下是一个完整的示例项目结构和代码,展示如何在 FastAPI 中使用 mysql.connector
进行原生 SQL 操作。
项目结构#
1
2
3
4
5
6
7
8
9
10
11
12
| fastapi_mysql_project/
│
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── database.py
│ ├── dependencies.py
│ └── routes.py
│
├── .env
├── requirements.txt
└── README.md
|
8.1 app/database.py
#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| # app/database.py
import os
from mysql.connector import pooling
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", 3306))
DB_USER = os.getenv("DB_USER", "root")
DB_PASSWORD = os.getenv("DB_PASSWORD", "")
DB_NAME = os.getenv("DB_NAME", "test_db")
DB_POOL_NAME = os.getenv("DB_POOL_NAME", "fastapi_pool")
DB_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", 5))
# 创建连接池
connection_pool = pooling.MySQLConnectionPool(
pool_name=DB_POOL_NAME,
pool_size=DB_POOL_SIZE,
pool_reset_session=True,
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME,
autocommit=True # 根据需要设置是否自动提交
)
|
8.2 app/dependencies.py
#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| # app/dependencies.py
from fastapi import Depends, HTTPException
from mysql.connector import Error, connection
from app.database import connection_pool
def get_db_connection():
try:
# 从连接池中获取一个连接
conn: connection.MySQLConnection = connection_pool.get_connection()
if conn.is_connected():
yield conn
else:
raise HTTPException(status_code=500, detail="Failed to connect to the database.")
except Error as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
if 'conn' in locals() and conn.is_connected():
conn.close()
|
8.3 app/routes.py
#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| # app/routes.py
from fastapi import APIRouter, Depends, HTTPException
from mysql.connector import cursor
from app.dependencies import get_db_connection
router = APIRouter()
@router.get("/items/")
def read_items(db: connection.MySQLConnection = Depends(get_db_connection)):
query = "SELECT id, name, description FROM items"
try:
cursor = db.cursor(dictionary=True)
cursor.execute(query)
results = cursor.fetchall()
return {"items": results}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
cursor.close()
@router.post("/items/")
def create_item(name: str, description: str, db: connection.MySQLConnection = Depends(get_db_connection)):
query = "INSERT INTO items (name, description) VALUES (%s, %s)"
values = (name, description)
try:
cursor = db.cursor()
cursor.execute(query, values)
db.commit()
return {"message": "Item created successfully.", "item_id": cursor.lastrowid}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
finally:
cursor.close()
|
8.4 app/main.py
#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| # app/main.py
from fastapi import FastAPI
from app.routes import router
app = FastAPI()
app.include_router(router)
@app.on_event("startup")
def startup_event():
print("Application startup: Connection pool is ready.")
@app.on_event("shutdown")
def shutdown_event():
from app.database import connection_pool
connection_pool.close()
print("Application shutdown: Connection pool is closed.")
|
8.5 requirements.txt
#
1
2
3
4
| fastapi
uvicorn
mysql-connector-python
python-dotenv
|
8.6 .env
#
1
2
3
4
5
6
7
| DB_HOST=localhost
DB_PORT=3306
DB_USER=your_username
DB_PASSWORD=your_password
DB_NAME=your_database
DB_POOL_NAME=fastapi_pool
DB_POOL_SIZE=5
|
9. 最佳实践与注意事项#
9.1 使用连接池#
- 连接池大小:根据应用的预期负载和数据库服务器的能力设置适当的连接池大小。过大的连接池可能导致数据库过载,过小则可能导致连接等待时间过长。
- 自动提交:根据应用需求设置
autocommit
。对于需要事务管理的操作,可以关闭自动提交,并手动控制提交和回滚。
9.2 依赖注入#
- 依赖的作用域:确保数据库连接的依赖项的作用域是
per-request
,即每个请求获取一个连接并在请求结束时释放。 - 错误处理:在依赖项中捕获连接错误,并返回适当的 HTTP 异常,避免将敏感的数据库错误暴露给客户端。
9.3 安全性#
- 参数化查询:始终使用参数化查询来防止 SQL 注入攻击。
- 最小权限原则:数据库用户应具有最低必要的权限,避免使用具有广泛权限的用户进行数据库操作。
9.4 性能优化#
- 索引:确保数据库表中经常查询的字段建立了索引,以提高查询性能。
- 批量操作:对于需要处理大量数据的操作,考虑使用批量插入或更新来减少数据库交互次数。
9.5 日志记录#
- 错误日志:记录数据库操作中的错误,帮助调试和监控应用。
- 查询日志:对于性能调优,可以记录和分析慢查询。
9.6 异步编程#
mysql.connector
是一个同步库。如果您的应用需要高并发和异步操作,考虑使用异步的 MySQL 连接器,如 aiomysql
或 asyncmy
,以充分利用 FastAPI 的异步特性。
10. 总结#
在 FastAPI 中使用原生 SQL 语句和 mysql.connector
进行数据库操作,是一种高效且灵活的方式。通过合理地管理数据库连接池、整合 FastAPI 的生命周期事件、使用依赖注入系统获取和释放连接,可以确保应用的稳定性和性能。遵循最佳实践,如使用参数化查询、设置适当的连接池大小、确保安全性和性能优化,能够进一步提升应用质量。
通过本文提供的详细示例和指导,您应该能够在 FastAPI 项目中成功地集成和管理 MySQL 数据库连接,编写高效、安全的数据库操作代码。如果您有更多问题或需要进一步的帮助,请随时提问!