cross_doctor/app/user_db_func.py

249 lines
9.0 KiB
Python
Raw Normal View History

2025-10-10 14:38:22 +08:00
# -*- coding:utf-8 -*-
#import logging
import pymysql
import pymysql.cursors
from datetime import datetime
from flask import g
from app.db_func_base import *
import hashlib
class UserDbHelper(TableDbHelperBase):
def __init__(self, pool):
self.db_pool = pool
self.DB_Name = 'user'
2025-10-10 14:38:22 +08:00
def re_init(self, pool):
self.db_pool = pool
self.DB_Name = 'user'
2025-10-10 14:38:22 +08:00
def generate_md5(self, input_str):
md5 = hashlib.md5() # 创建一个md5对象
md5.update(input_str.encode('utf-8')) # 使用utf-8编码
return md5.hexdigest() # 返回十六进制的哈希值
def login(self, userno, password_md5):
2025-11-15 15:03:22 +08:00
sql_query = "select password,token, last_modify_pwd_time, role from `user` where userno='%s'" % (userno)
2025-10-10 14:38:22 +08:00
users = self.do_select(sql_query)
if len(users) != 1:
logging.error('query_ledger error! %s' % (sql_query))
return None, None
2025-10-10 14:38:22 +08:00
str_md5 = self.generate_md5(users[0]['password'])
if str_md5==password_md5:
2025-11-15 15:03:22 +08:00
return users[0]['token'], users[0]['last_modify_pwd_time'], users[0]['role']
2025-10-10 14:38:22 +08:00
else:
2025-11-15 15:03:22 +08:00
return '', '', ''
2025-10-10 14:38:22 +08:00
def authentication(self, token):
sql_query = "select author.resource ,author.author from `user`,`author` where user.role=author.role and user.token='%s'" % (token)
return self.do_select(sql_query)
def query_user(self, token):
sql_query = "select * from `user` where token='%s'" %(token)
users = self.do_select(sql_query)
if len(users) != 1:
logging.error('query_ledger error! %s' % (sql_query))
return None
return users[0]
def query_areaid_list(self, userno) -> [str]:
"""
查询用户关联的辖区ID列表
:param userno:
:return: [str]
"""
2025-11-14 11:16:12 +08:00
user_info_sql = "select role from `user` where userno='%s'" % (userno)
user_info = self.do_select(user_info_sql)
if not user_info:
return []
role = user_info[0]['role']
if role == 'manager':
area_infos = self.query_all_area_infos()
return list(area_infos.keys())
sql_query = "select area_id from `area_user` where userno='%s';" % (userno)
res = self.do_select(sql_query)
if len(res) < 1:
logging.error('query_ledger error! %s' % (sql_query))
2025-10-10 14:38:22 +08:00
return None
areaid_list = []
for item in res:
areaid_list.append(item['area_id'])
return areaid_list
def query_all_area_infos(self) -> dict:
""" 查询nodeid=>node_name """
sql_query = "select area_id,area_name,nodeid,city_name,center from tmnet.`city_bounds`;"
res = self.do_select(sql_query)
area_infos = dict()
for item in res:
area_infos[(item['area_id'])] = item
return area_infos
def insert_rerun_dates(self, datelist_str: str):
"""
插入一条新记录
:param datelist_str:
:return:
"""
tt = int(time.time())
day = timestamp2int(tt)
sql_query = "delete from rerun_dates where day=%d;" % day
self.do_execute(sql_query)
sql_query = "insert into rerun_dates(day, datelist) values(%d,'%s');" % (day, datelist_str)
ret = self.do_execute(sql_query)
if not ret:
logging.error(sql_query)
logging.error("insert error")
return ret
def clear_rerun_dates(self):
"""
插入一条新记录
:param datelist_str:
:return:
"""
tt = int(time.time())
day = timestamp2int(tt)
sql_query = "delete from rerun_dates where day=%d;" % day
ret = self.do_execute(sql_query)
if not ret:
logging.error(sql_query)
logging.error("delete error")
return ret
def query_rerun_dates(self, day: int) -> str:
sql_query = "select datelist from `rerun_dates` where day=%d;" % (day)
res = self.do_select(sql_query)
if len(res) < 1:
return None
else:
return res[0]['datelist']
2025-10-10 14:38:22 +08:00
def check_user_info(self, userid, password):
sql = "select * from user where userno = '%s'" % (userid)
2025-10-10 14:38:22 +08:00
res = self.do_select(sql)
if not res or len(res) == 0:
logging.error('查询用户当前组织信息失败,请检查当前用户信息是否存在异常! %s' % (sql))
return 1, None, None
2025-10-10 14:38:22 +08:00
else:
if self.generate_md5(res[0]['password']) != password:
logging.error('用户密码错误,请检查用户密码是否正确! %s, %s' % (res[0]['password'], password))
return 2, None, None
2025-10-10 14:38:22 +08:00
else:
return 0, res[0]['role'], res[0]['last_modify_pwd_time']
2025-10-10 14:38:22 +08:00
def modify_password(self, userid, new_password):
sql = "update user set password = '%s' where userno = '%s'" % (new_password, userid)
2025-11-15 15:03:22 +08:00
return self.do_execute(sql)
def query_users(self):
sql = """
select t1.*, t2.area_ids, t2.nodeids from
(select userno, user_name, role, password, last_modify_pwd_time from user where state != 1) t1
left join
(select userno, GROUP_CONCAT(area_id SEPARATOR ',') as area_ids, GROUP_CONCAT(nodeid SEPARATOR ',') as nodeids from area_user group by userno) t2
on t1.userno = t2.userno
"""
return self.do_select(sql)
def query_user_info_sql(self, userid):
sql = "select * from user where userno = '%s'" % (userid)
res = self.do_select(sql)
if res:
return res[0]
return None
def update_user_info(self, modify_sql, userid, area_id_list, new_userid, all_area_info_dict):
conn, cursor = self.connect()
conn.begin()
sql1 = "update user %s where userno = '%s'" % (modify_sql, userid)
sql2 = "delete from area_user where userno = '%s'" % (userid)
values = []
if not new_userid:
for area_id in area_id_list:
values.append((userid, area_id, all_area_info_dict[int(area_id)]['nodeid']))
else:
for area_id in area_id_list:
values.append((new_userid, area_id, all_area_info_dict[int(area_id)]['nodeid']))
sql3 = "insert into area_user(userno, area_id, nodeid) values(%s, %s, %s)"
try:
if modify_sql != '':
ret = cursor.execute(sql1)
else:
ret = 1
cursor.execute(sql2)
if len(area_id_list) > 0:
ret2 = cursor.executemany(sql3, values)
else:
ret2 = 0
if ret != 1 or ret2 != len(area_id_list):
conn.rollback()
return False, ret2
else:
conn.commit()
return True, ret2
except Exception as e:
logging.error(e)
conn.rollback()
return False, 0
def insert_login_log(self, userid, op_type, ip, ip_city, op_user=None):
sql = """
insert into login_log (userid, op_type, ip, ip_city) values ('%s', '%s', '%s', '%s')
""" % (userid, op_type, ip, ip_city)
if op_user:
sql = """
insert into login_log (userid, op_type, ip, ip_city,op_user) values ('%s', '%s', '%s', '%s', '%s')
""" % (userid, op_type, ip, ip_city, op_user)
return self.do_execute(sql)
def del_user_sql(self, userid):
sql = """
update user set state = 1 where userno = '%s'
""" % (userid)
return self.do_execute(sql)
2026-01-13 15:44:17 +08:00
def insert_user(self, userid, password, user_name, area_id_list, all_area_info_dict, role):
2025-11-15 15:03:22 +08:00
values = []
for area_id in area_id_list:
values.append((userid, area_id, all_area_info_dict[int(area_id)]['nodeid']))
tmp_token = 'iuqwefhjdbcsajhdshcgaiudncjadhajn_' + userid
sql1 = """
2026-01-13 15:44:17 +08:00
insert into user(userno, user_name, password, token, department, role) values('%s', '%s', '%s', '%s', '信号调优团队', '%s')
""" % (userid, user_name, password, tmp_token, role)
2025-11-15 15:03:22 +08:00
sql2 = "insert into area_user(userno, area_id, nodeid) values(%s, %s, %s)"
conn, cursor = self.connect()
conn.begin()
try:
ret = self.do_execute(sql1)
if len(area_id_list) > 0:
ret2 = cursor.executemany(sql2, values)
else:
ret2 = 0
if ret != 1 or ret2 != len(area_id_list):
conn.rollback()
return False
else:
conn.commit()
return True
except Exception as e:
logging.error(e)
conn.rollback()
return False
2026-01-13 15:44:17 +08:00
def query_role_page(self, role):
sql = """
select * from role_page where platform = 'cross_doctor' and role = '%s'
""" % (role)
row_list = self.do_select(sql)
res = []
if row_list:
page_permission = row_list[0]['page_permission']
for item in page_permission.split(','):
res.append(item)
return res