cross_doctor/app/user_db_func.py

135 lines
4.5 KiB
Python
Raw Normal View History

2025-10-10 14:38:22 +08:00
# -*- coding:utf-8 -*-
#import logging
import pymysql
import pymysql.cursors
from datetime import datetime
from flask import g
from app.db_func_base import *
import hashlib
class UserDbHelper(TableDbHelperBase):
def __init__(self, pool):
self.db_pool = pool
self.DB_Name = 'user'
2025-10-10 14:38:22 +08:00
def re_init(self, pool):
self.db_pool = pool
self.DB_Name = 'user'
2025-10-10 14:38:22 +08:00
def generate_md5(self, input_str):
md5 = hashlib.md5() # 创建一个md5对象
md5.update(input_str.encode('utf-8')) # 使用utf-8编码
return md5.hexdigest() # 返回十六进制的哈希值
def login(self, userno, password_md5):
sql_query = "select password,token, last_modify_pdw_time from `user` where userno='%s'" % (userno)
2025-10-10 14:38:22 +08:00
users = self.do_select(sql_query)
if len(users) != 1:
logging.error('query_ledger error! %s' % (sql_query))
return None, None
2025-10-10 14:38:22 +08:00
str_md5 = self.generate_md5(users[0]['password'])
if str_md5==password_md5:
return users[0]['token'], users[0]['last_modify_pdw_time']
2025-10-10 14:38:22 +08:00
else:
return '', ''
2025-10-10 14:38:22 +08:00
def authentication(self, token):
sql_query = "select author.resource ,author.author from `user`,`author` where user.role=author.role and user.token='%s'" % (token)
return self.do_select(sql_query)
def query_user(self, token):
sql_query = "select * from `user` where token='%s'" %(token)
users = self.do_select(sql_query)
if len(users) != 1:
logging.error('query_ledger error! %s' % (sql_query))
return None
return users[0]
def query_areaid_list(self, userno) -> [str]:
"""
查询用户关联的辖区ID列表
:param userno:
:return: [str]
"""
sql_query = "select area_id from `area_user` where userno='%s';" % (userno)
res = self.do_select(sql_query)
if len(res) < 1:
logging.error('query_ledger error! %s' % (sql_query))
2025-10-10 14:38:22 +08:00
return None
areaid_list = []
for item in res:
areaid_list.append(item['area_id'])
return areaid_list
def query_all_area_infos(self) -> dict:
""" 查询nodeid=>node_name """
sql_query = "select area_id,area_name,nodeid,city_name,center from tmnet.`city_bounds`;"
res = self.do_select(sql_query)
area_infos = dict()
for item in res:
area_infos[(item['area_id'])] = item
return area_infos
def insert_rerun_dates(self, datelist_str: str):
"""
插入一条新记录
:param datelist_str:
:return:
"""
tt = int(time.time())
day = timestamp2int(tt)
sql_query = "delete from rerun_dates where day=%d;" % day
self.do_execute(sql_query)
sql_query = "insert into rerun_dates(day, datelist) values(%d,'%s');" % (day, datelist_str)
ret = self.do_execute(sql_query)
if not ret:
logging.error(sql_query)
logging.error("insert error")
return ret
def clear_rerun_dates(self):
"""
插入一条新记录
:param datelist_str:
:return:
"""
tt = int(time.time())
day = timestamp2int(tt)
sql_query = "delete from rerun_dates where day=%d;" % day
ret = self.do_execute(sql_query)
if not ret:
logging.error(sql_query)
logging.error("delete error")
return ret
def query_rerun_dates(self, day: int) -> str:
sql_query = "select datelist from `rerun_dates` where day=%d;" % (day)
res = self.do_select(sql_query)
if len(res) < 1:
return None
else:
return res[0]['datelist']
2025-10-10 14:38:22 +08:00
def check_user_info(self, userid, password):
sql = "select * from user where userno = '%s'" % (userid)
2025-10-10 14:38:22 +08:00
res = self.do_select(sql)
if not res or len(res) == 0:
logging.error('查询用户当前组织信息失败,请检查当前用户信息是否存在异常! %s' % (sql))
return 1, None, None
2025-10-10 14:38:22 +08:00
else:
if self.generate_md5(res[0]['password']) != password:
logging.error('用户密码错误,请检查用户密码是否正确! %s, %s' % (res[0]['password'], password))
return 2, None, None
2025-10-10 14:38:22 +08:00
else:
return 0, res[0]['role'], res[0]['last_modify_pdw_time']
2025-10-10 14:38:22 +08:00
def modify_password(self, userid, new_password):
sql = "update user set password = '%s' where userno = '%s'" % (new_password, userid)
2025-10-10 14:38:22 +08:00
return self.do_execute(sql)