m3u8_download/utils/MySqlUtil.py
2025-03-16 19:03:36 +08:00

161 lines
4.2 KiB
Python

#!/usr/bin/python
# -*- coding:utf-8 -*-
import pymysql
from utils.Log import Log
import platform
import pandas as pd
from utils.LoadConfig import loadconfig
log = Log()
class MySQLError(Exception):
def __init__(self, message):
self.message = message
class MySqlUtil:
"""mysql util"""
db = None
cursor = None
def get_section(db_name):
"""根据系统环境变量获取section"""
platform_ = platform.system()
if platform_ == "Windows" or platform_ == "Darwin":
section = db_name + '_test'
else:
section = db_name + '_pro'
log.info("MYSQL 环境为 {}".format(section))
return section
def __init__(self, db_name):
platform_ = platform.system()
if platform_ == "Windows" or platform_ == "Darwin":
section = db_name + '_test'
else:
section = db_name + '_pro'
log.info("MYSQL 环境为 {}".format(section))
# 打开数据库连接
conf = loadconfig('yz_mysql')
self.host = conf.get('host')
self.port = int(conf.get('port'))
self.userName = conf.get('userName')
self.password = conf.get('password')
self.dbName = conf.get("dbName")
mysql_config = {"host": self.host,
"port": self.port,
"user": self.userName,
"passwd": self.password,
"db": self.dbName}
log.info(mysql_config)
# 链接数据库
def get_con(self):
""" 获取conn """
self.db = pymysql.Connect(
host=self.host,
port=self.port,
user=self.userName,
passwd=self.password,
db=self.dbName
)
self.cursor = self.db.cursor()
# 关闭链接
def close(self):
self.cursor.close()
self.db.close()
# 主键查询数据
def get_one(self, sql):
res = None
try:
self.get_con()
self.cursor.execute(sql)
res = self.cursor.fetchone()
self.close()
log.info("查询:" + sql)
except Exception as e:
log.error("查询失败!" + str(e))
return res
# 查询列表数据
def get_all(self, sql):
res = None
try:
self.get_con()
self.cursor.execute(sql)
res = self.cursor.fetchall()
self.close()
except Exception as e:
log.error("查询失败!" + str(e))
return res
def get_data_frame_sql(self, sql, columns):
self.get_con()
self.cursor.execute(sql)
res = self.cursor.fetchall()
df = pd.DataFrame(res, columns=columns)
return df
# 插入数据
def insert_parameter(self, sql, parameter):
count = 0
try:
self.get_con()
count = self.cursor.execute(sql, parameter)
self.db.commit()
self.close()
except Exception as e:
log.error("操作失败!" + str(e))
self.db.rollback()
return count
# 插入数据
def insert_parameters(self, sql, parameters):
count = 0
try:
self.get_con()
count = self.cursor.executemany(sql, parameters)
self.db.commit()
self.close()
except Exception as e:
log.error("操作失败!" + str(e))
self.db.rollback()
return count
# 插入数据
def __insert(self, sql):
count = 0
try:
self.get_con()
log.info('执行sql:\r\n' + sql)
count = self.cursor.execute(sql)
self.db.commit()
self.close()
log.info('sql执行完成')
except Exception as e:
# log.error("操作失败!" + str(e))
self.db.rollback()
raise MySQLError("mysql操作异常")
return count
# 保存数据
def save(self, sql):
return self.__insert(sql)
# 更新数据
def update(self, sql,):
return self.__insert(sql)
# 删除数据
def delete(self, sql):
return self.__insert(sql)
if __name__ == '__main__':
print('')