125 lines
4.3 KiB
Python
125 lines
4.3 KiB
Python
import sqlite3
|
||
from typing import Optional, List, Tuple, Any
|
||
|
||
|
||
class SQLiteDB:
|
||
def __init__(self, db_path: str = "./sqlite/movies.db"):
|
||
self.db_path = db_path
|
||
self.conn: Optional[sqlite3.Connection] = None
|
||
self.cursor: Optional[sqlite3.Cursor] = None
|
||
self._init_db()
|
||
|
||
def _init_db(self) -> None:
|
||
"""初始化数据库并创建表(基于网页4、网页5、网页7的表结构设计)[4,5,7](@ref)"""
|
||
create_table_sql = """
|
||
CREATE TABLE IF NOT EXISTS movie (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL,
|
||
path TEXT NOT NULL,
|
||
is_download INTEGER DEFAULT 0
|
||
)
|
||
"""
|
||
self.connect()
|
||
if self.cursor:
|
||
self.cursor.execute(create_table_sql)
|
||
self.conn.commit()
|
||
|
||
def connect(self) -> None:
|
||
"""建立数据库连接(基于网页2、网页5的连接方式)[2,5](@ref)"""
|
||
try:
|
||
self.conn = sqlite3.connect(self.db_path)
|
||
self.cursor = self.ursor.cursor()
|
||
self.conn.execute("PRAGMA foreign_keys = ON") # 启用外键约束
|
||
except sqlite3.Error as e:
|
||
raise ConnectionError(f"数据库连接失败: {e}")
|
||
|
||
def close(self) -> None:
|
||
"""安全关闭连接(参考网页6的资源管理)[6](@ref)"""
|
||
if self.cursor:
|
||
self.cursor.close()
|
||
if self.conn:
|
||
self.conn.close()
|
||
|
||
def execute(
|
||
self,
|
||
sql: str,
|
||
params: Optional[Tuple[Any, ...]] = None,
|
||
commit: bool = True
|
||
) -> int:
|
||
"""
|
||
执行单条SQL语句(集成网页1、网页7的参数化查询)[1,7](@ref)
|
||
:param sql: 含占位符的SQL语句
|
||
:param params: 参数元组(防SQL注入)
|
||
:param commit: 是否自动提交事务
|
||
:return: 影响的行数
|
||
"""
|
||
try:
|
||
if params:
|
||
self.cursor.execute(sql, params)
|
||
else:
|
||
self.cursor.execute(sql)
|
||
if commit and self.conn:
|
||
self.conn.commit()
|
||
return self.cursor.rowcount
|
||
except sqlite3.Error as e:
|
||
if self.conn:
|
||
self.conn.rollback()
|
||
raise RuntimeError(f"SQL执行失败: {e}")
|
||
|
||
# ---- 高级操作方法 ----
|
||
def insert_movie(self, name: str, path: str) -> int:
|
||
"""插入电影记录(演示特定业务方法)"""
|
||
return self.execute(
|
||
"INSERT INTO movie (name, path) VALUES (?, ?)",
|
||
(name, path)
|
||
)
|
||
|
||
def mark_downloaded(self, movie_id: int) -> int:
|
||
"""标记电影为已下载(业务逻辑示例)[6](@ref)"""
|
||
return self.execute(
|
||
"UPDATE movie SET is_download = 1 WHERE id = ?",
|
||
(movie_id,)
|
||
)
|
||
|
||
def get_undownloaded(self) -> List[Tuple]:
|
||
"""查询未下载的电影(返回完整记录元组)[4,6](@ref)"""
|
||
self.execute("SELECT * FROM movie WHERE is_download = 0", commit=False)
|
||
return self.cursor.fetchall()
|
||
|
||
# ---- 上下文管理器支持 ----
|
||
def __enter__(self):
|
||
self.connect()
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
self.close()
|
||
|
||
|
||
# ==================== 使用示例 ====================
|
||
if __name__ == "__main__":
|
||
# 示例1:自动建表并插入数据
|
||
with SQLiteDB() as db:
|
||
# 批量插入演示(集成网页5的executemany方法)[5](@ref)
|
||
movies = [("泰坦尼克号", "/movies/titanic"), ("阿凡达", "/movies/avatar")]
|
||
db.executemany(
|
||
"INSERT INTO movie (name, path) VALUES (?, ?)",
|
||
movies
|
||
)
|
||
|
||
# 查询未下载记录(基于网页6的查询模式)[6](@ref)
|
||
print("待下载电影:", db.get_undownloaded())
|
||
#
|
||
# # 示例2:更新操作
|
||
# with SQLiteDB() as db:
|
||
# db.mark_downloaded(1)
|
||
# print("更新后的记录:", db.get_undownloaded())
|
||
#
|
||
# # 示例3:事务回滚演示
|
||
# try:
|
||
# with SQLiteDB() as db:
|
||
# db.execute("BEGIN TRANSACTION", commit=False)
|
||
# db.insert_movie("黑客帝国", "/movies/matrix")
|
||
# raise RuntimeError("模拟业务异常") # 触发回滚
|
||
# db.execute("COMMIT", commit=True)
|
||
# except Exception as e:
|
||
# print(f"事务回滚成功: {e}") |