# -*- coding:utf-8 -*- #import logging import pymysql import pymysql.cursors from datetime import datetime from flask import g from app.db_func_base import * import hashlib class UserDbHelper(TableDbHelperBase): def __init__(self, pool): self.db_pool = pool self.DB_Name = 'user' def re_init(self, pool): self.db_pool = pool self.DB_Name = 'user' def generate_md5(self, input_str): md5 = hashlib.md5() # 创建一个md5对象 md5.update(input_str.encode('utf-8')) # 使用utf-8编码 return md5.hexdigest() # 返回十六进制的哈希值 def login(self, userno, password_md5): sql_query = "select password,token, last_modify_pdw_time from `user` where userno='%s'" % (userno) users = self.do_select(sql_query) if len(users) != 1: logging.error('query_ledger error! %s' % (sql_query)) return None, None str_md5 = self.generate_md5(users[0]['password']) if str_md5==password_md5: return users[0]['token'], users[0]['last_modify_pdw_time'] else: return '', '' def authentication(self, token): sql_query = "select author.resource ,author.author from `user`,`author` where user.role=author.role and user.token='%s'" % (token) return self.do_select(sql_query) def query_user(self, token): sql_query = "select * from `user` where token='%s'" %(token) users = self.do_select(sql_query) if len(users) != 1: logging.error('query_ledger error! %s' % (sql_query)) return None return users[0] def query_areaid_list(self, userno) -> [str]: """ 查询用户关联的辖区ID列表 :param userno: :return: [str] """ user_info_sql = "select role from `user` where userno='%s'" % (userno) user_info = self.do_select(user_info_sql) if not user_info: return [] role = user_info[0]['role'] if role == 'manager': area_infos = self.query_all_area_infos() return list(area_infos.keys()) sql_query = "select area_id from `area_user` where userno='%s';" % (userno) res = self.do_select(sql_query) if len(res) < 1: logging.error('query_ledger error! %s' % (sql_query)) return None areaid_list = [] for item in res: areaid_list.append(item['area_id']) return areaid_list def query_all_area_infos(self) -> dict: """ 查询nodeid=>node_name """ sql_query = "select area_id,area_name,nodeid,city_name,center from tmnet.`city_bounds`;" res = self.do_select(sql_query) area_infos = dict() for item in res: area_infos[(item['area_id'])] = item return area_infos def insert_rerun_dates(self, datelist_str: str): """ 插入一条新记录 :param datelist_str: :return: """ tt = int(time.time()) day = timestamp2int(tt) sql_query = "delete from rerun_dates where day=%d;" % day self.do_execute(sql_query) sql_query = "insert into rerun_dates(day, datelist) values(%d,'%s');" % (day, datelist_str) ret = self.do_execute(sql_query) if not ret: logging.error(sql_query) logging.error("insert error") return ret def clear_rerun_dates(self): """ 插入一条新记录 :param datelist_str: :return: """ tt = int(time.time()) day = timestamp2int(tt) sql_query = "delete from rerun_dates where day=%d;" % day ret = self.do_execute(sql_query) if not ret: logging.error(sql_query) logging.error("delete error") return ret def query_rerun_dates(self, day: int) -> str: sql_query = "select datelist from `rerun_dates` where day=%d;" % (day) res = self.do_select(sql_query) if len(res) < 1: return None else: return res[0]['datelist'] def check_user_info(self, userid, password): sql = "select * from user where userno = '%s'" % (userid) res = self.do_select(sql) if not res or len(res) == 0: logging.error('查询用户当前组织信息失败,请检查当前用户信息是否存在异常! %s' % (sql)) return 1, None, None else: if self.generate_md5(res[0]['password']) != password: logging.error('用户密码错误,请检查用户密码是否正确! %s, %s' % (res[0]['password'], password)) return 2, None, None else: return 0, res[0]['role'], res[0]['last_modify_pdw_time'] def modify_password(self, userid, new_password): sql = "update user set password = '%s' where userno = '%s'" % (new_password, userid) return self.do_execute(sql)