import sqlite3 from typing import Optional, List, Tuple, Any import os class SQLiteDB: def __init__(self, ): current_directory = os.path.dirname(os.path.abspath(__file__)) root_path = os.path.abspath(os.path.dirname(current_directory) + os.path.sep + ".") project_name = root_path.split(os.path.sep)[-1] project_root_path = os.path.abspath(os.path.dirname(__file__)).split(project_name)[0] + project_name+'/sqlite/movies.db' self.db_path = project_root_path self.conn: Optional[sqlite3.Connection] = None self.cursor: Optional[sqlite3.Cursor] = None self._init_db() def _init_db(self) -> None: """初始化数据库并创建表(基于网页4、网页5、网页7的表结构设计)[4,5,7](@ref)""" create_table_sql = """ CREATE TABLE IF NOT EXISTS movie ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, path TEXT NOT NULL, is_download INTEGER DEFAULT 0 ) """ self.connect() if self.cursor: self.cursor.execute(create_table_sql) self.conn.commit() def connect(self) -> None: """建立数据库连接(基于网页2、网页5的连接方式)[2,5](@ref)""" try: self.conn = sqlite3.connect(self.db_path) self.cursor = self.conn.cursor() # 修正拼写错误 self.conn.execute("PRAGMA foreign_keys = ON") # 启用外键约束 except sqlite3.Error as e: raise ConnectionError(f"数据库连接失败: {e}") def close(self) -> None: """安全关闭连接(参考网页6的资源管理)[6](@ref)""" if self.cursor: self.cursor.close() if self.conn: self.conn.close() def execute( self, sql: str, params: Optional[Tuple[Any, ...]] = None, commit: bool = True ) -> int: """ 执行单条SQL语句(集成网页1、网页7的参数化查询)[1,7](@ref) :param sql: 含占位符的SQL语句 :param params: 参数元组(防SQL注入) :param commit: 是否自动提交事务 :return: 影响的行数 """ try: if params: self.cursor.execute(sql, params) else: self.cursor.execute(sql) if commit and self.conn: self.conn.commit() return self.cursor.rowcount except sqlite3.Error as e: if self.conn: self.conn.rollback() raise RuntimeError(f"SQL执行失败: {e}") # ---- 高级操作方法 ---- def insert_movie(self, name: str, path: str) -> int: """插入电影记录(演示特定业务方法)""" return self.execute( "INSERT INTO movie (name, path) VALUES (?, ?)", (name, path) ) def mark_downloaded(self, movie_id: int) -> int: """标记电影为已下载(业务逻辑示例)[6](@ref)""" return self.execute( "UPDATE movie SET is_download = 1 WHERE id = ?", (movie_id,) ) def get_undownloaded(self) -> List[Tuple]: """查询未下载的电影(返回完整记录元组)[4,6](@ref)""" self.execute("SELECT * FROM movie WHERE is_download = 0", commit=False) return self.cursor.fetchall() # ---- 上下文管理器支持 ---- def __enter__(self): self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() # ==================== 使用示例 ==================== if __name__ == "__main__": # 示例1:自动建表并插入数据 with SQLiteDB() as db: # 批量插入演示(集成网页5的executemany方法)[5](@ref) movies = [("泰坦尼克号", "/movies/titanic"), ("阿凡达", "/movies/avatar")] db.insert_movie("泰坦尼克号","/movies/titanic") # 查询未下载记录(基于网页6的查询模式)[6](@ref) print("待下载电影:", db.get_undownloaded()) # # # 示例2:更新操作 # with SQLiteDB() as db: # db.mark_downloaded(1) # print("更新后的记录:", db.get_undownloaded()) # # # 示例3:事务回滚演示 # try: # with SQLiteDB() as db: # db.execute("BEGIN TRANSACTION", commit=False) # db.insert_movie("黑客帝国", "/movies/matrix") # raise RuntimeError("模拟业务异常") # 触发回滚 # db.execute("COMMIT", commit=True) # except Exception as e: # print(f"事务回滚成功: {e}")