#!/usr/bin/python # -*- coding:utf-8 -*- import pymysql from utils.Log import Log import platform import pandas as pd from utils.LoadConfig import loadconfig log = Log() class MySQLError(Exception): def __init__(self, message): self.message = message class MySqlUtil: """mysql util""" db = None cursor = None def get_section(db_name): """根据系统环境变量获取section""" platform_ = platform.system() if platform_ == "Windows" or platform_ == "Darwin": section = db_name + '_test' else: section = db_name + '_pro' log.info("MYSQL 环境为 {}".format(section)) return section def __init__(self, db_name): platform_ = platform.system() if platform_ == "Windows" or platform_ == "Darwin": section = db_name + '_test' else: section = db_name + '_pro' log.info("MYSQL 环境为 {}".format(section)) # 打开数据库连接 conf = loadconfig('yz_mysql') self.host = conf.get('host') self.port = int(conf.get('port')) self.userName = conf.get('userName') self.password = conf.get('password') self.dbName = conf.get("dbName") mysql_config = {"host": self.host, "port": self.port, "user": self.userName, "passwd": self.password, "db": self.dbName} log.info(mysql_config) # 链接数据库 def get_con(self): """ 获取conn """ self.db = pymysql.Connect( host=self.host, port=self.port, user=self.userName, passwd=self.password, db=self.dbName ) self.cursor = self.db.cursor() # 关闭链接 def close(self): self.cursor.close() self.db.close() # 主键查询数据 def get_one(self, sql): res = None try: self.get_con() self.cursor.execute(sql) res = self.cursor.fetchone() self.close() log.info("查询:" + sql) except Exception as e: log.error("查询失败!" + str(e)) return res # 查询列表数据 def get_all(self, sql): res = None try: self.get_con() self.cursor.execute(sql) res = self.cursor.fetchall() self.close() except Exception as e: log.error("查询失败!" + str(e)) return res def get_data_frame_sql(self, sql, columns): self.get_con() self.cursor.execute(sql) res = self.cursor.fetchall() df = pd.DataFrame(res, columns=columns) return df # 插入数据 def insert_parameter(self, sql, parameter): count = 0 try: self.get_con() count = self.cursor.execute(sql, parameter) self.db.commit() self.close() except Exception as e: log.error("操作失败!" + str(e)) self.db.rollback() return count # 插入数据 def insert_parameters(self, sql, parameters): count = 0 try: self.get_con() count = self.cursor.executemany(sql, parameters) self.db.commit() self.close() except Exception as e: log.error("操作失败!" + str(e)) self.db.rollback() return count # 插入数据 def __insert(self, sql): count = 0 try: self.get_con() log.info('执行sql:\r\n' + sql) count = self.cursor.execute(sql) self.db.commit() self.close() log.info('sql执行完成') except Exception as e: # log.error("操作失败!" + str(e)) self.db.rollback() raise MySQLError("mysql操作异常") return count # 保存数据 def save(self, sql): return self.__insert(sql) # 更新数据 def update(self, sql,): return self.__insert(sql) # 删除数据 def delete(self, sql): return self.__insert(sql) if __name__ == '__main__': print('')