迁移用户登录接口

Signed-off-by: yinzijian <yinzijian@haomozhixing.onaliyun.com>
This commit is contained in:
yinzijian 2025-11-11 14:41:23 +08:00
parent 131b31013b
commit bab271bf4f
4 changed files with 295 additions and 48 deletions

View File

@ -83,6 +83,7 @@ def cross_problems_detail_api():
def update_cross_examine_record_state_api():
return update_cross_examine_record_state(request.json)
from app.user_views import *
if __name__ == '__main__':
pass

View File

@ -15,6 +15,9 @@ class UserDbHelper(TableDbHelperBase):
self.db_pool = pool
self.DB_Name = 'user'
def re_init(self, pool):
self.db_pool = pool
self.DB_Name = 'user'
def generate_md5(self, input_str):
md5 = hashlib.md5() # 创建一个md5对象
@ -22,18 +25,18 @@ class UserDbHelper(TableDbHelperBase):
return md5.hexdigest() # 返回十六进制的哈希值
def login(self, userno, password_md5):
sql_query = "select password,token from `user` where userno='%s'" % (userno)
sql_query = "select password,token, last_modify_pdw_time from `user` where userno='%s'" % (userno)
users = self.do_select(sql_query)
if len(users) != 1:
logging.error('query_ledger error! %s' % (sql_query))
return None
return None, None
str_md5 = self.generate_md5(users[0]['password'])
if str_md5==password_md5:
return users[0]['token']
return users[0]['token'], users[0]['last_modify_pdw_time']
else:
return ''
return '', ''
def authentication(self, token):
sql_query = "select author.resource ,author.author from `user`,`author` where user.role=author.role and user.token='%s'" % (token)
@ -48,60 +51,84 @@ class UserDbHelper(TableDbHelperBase):
return None
return users[0]
# def query_org_id(self, userid):
# sql1 = "select * from user where userid = '%s'" % (userid)
# res = self.do_select(sql1)
# if not res or len(res) == 0:
# orgid_list = []
# logging.error('查询用户当前组织信息失败,请检查当前用户信息是否存在异常! %s' % (sql1))
# elif res[0]['role'] != 'manager':
# orgid_list = []
# sql_query = "select orgid from `org_user` where userid='%s'" %(userid)
# orgs = self.do_select(sql_query)
# for org in orgs:
# orgid_list.append(org['orgid'])
# else:
# orgid_list = list(g_citycode_set)
#
# return orgid_list
def query_org(self, userid):
orgid_list = []
sql_query = "select orgid from `org_user` where userid='%s'" %(userid)
orgs = self.do_select(sql_query)
for org in orgs:
orgid_list.append(int(org['orgid']))
return orgid_list
def query_user_role(self, userid):
sql = "select * from user where userid = '%s'" % (userid)
res = self.do_select(sql)
if not res or len(res) == 0:
logging.error('查询用户当前组织信息失败,请检查当前用户信息是否存在异常! %s' % (sql))
def query_areaid_list(self, userno) -> [str]:
"""
查询用户关联的辖区ID列表
:param userno:
:return: [str]
"""
sql_query = "select area_id from `area_user` where userno='%s';" % (userno)
res = self.do_select(sql_query)
if len(res) < 1:
logging.error('query_ledger error! %s' % (sql_query))
return None
return res[0]
areaid_list = []
for item in res:
areaid_list.append(item['area_id'])
return areaid_list
def query_all_area_infos(self) -> dict:
""" 查询nodeid=>node_name """
sql_query = "select area_id,area_name,nodeid,city_name,center from tmnet.`city_bounds`;"
res = self.do_select(sql_query)
area_infos = dict()
for item in res:
area_infos[(item['area_id'])] = item
return area_infos
def insert_rerun_dates(self, datelist_str: str):
"""
插入一条新记录
:param datelist_str:
:return:
"""
tt = int(time.time())
day = timestamp2int(tt)
sql_query = "delete from rerun_dates where day=%d;" % day
self.do_execute(sql_query)
sql_query = "insert into rerun_dates(day, datelist) values(%d,'%s');" % (day, datelist_str)
ret = self.do_execute(sql_query)
if not ret:
logging.error(sql_query)
logging.error("insert error")
return ret
def clear_rerun_dates(self):
"""
插入一条新记录
:param datelist_str:
:return:
"""
tt = int(time.time())
day = timestamp2int(tt)
sql_query = "delete from rerun_dates where day=%d;" % day
ret = self.do_execute(sql_query)
if not ret:
logging.error(sql_query)
logging.error("delete error")
return ret
def query_rerun_dates(self, day: int) -> str:
sql_query = "select datelist from `rerun_dates` where day=%d;" % (day)
res = self.do_select(sql_query)
if len(res) < 1:
return None
else:
return res[0]['datelist']
def check_user_info(self, userid, password):
sql = "select * from user where userid = '%s' and status < 1" % (userid)
sql = "select * from user where userno = '%s'" % (userid)
res = self.do_select(sql)
if not res or len(res) == 0:
logging.error('查询用户当前组织信息失败,请检查当前用户信息是否存在异常! %s' % (sql))
return 1, None, None, None
return 1, None, None
else:
if self.generate_md5(res[0]['password']) != password:
logging.error('用户密码错误,请检查用户密码是否正确! %s, %s' % (res[0]['password'], password))
return 2, None, None, None
return 2, None, None
else:
return 0, res[0]['role'], res[0]['last_modify_pwd_time'], res[0]['user_name']
return 0, res[0]['role'], res[0]['last_modify_pdw_time']
def modify_password(self, userid, new_password):
sql = "update user set password = '%s' where userid = '%s'" % (new_password, userid)
sql = "update user set password = '%s' where userno = '%s'" % (new_password, userid)
return self.do_execute(sql)
def query_user_areas(self, userid):
sql = "select * from area_user where userno = '%s'" % (userid)
res = self.do_select(sql)
if not res or len(res) == 0:
logging.error('查询用户当前组织信息失败,请检查当前用户信息是否存在异常! %s' % (sql))
return None
return res

64
app/user_views.py Normal file
View File

@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
# @Author: Owl
# @Date: 2025/11/10 18:12
# @Description:
from flask import Flask, request
from app.cross_eva_views import app
from app.user_worker import query_host_by_nodeid, do_login, do_authentication, do_get_user_info, set_rerun_dates, \
clear_rerun_dates, get_rerun_dates, do_modify_password
@app.route('/api/route', methods=['GET'])
def query_route():
return query_host_by_nodeid(dict(request.args))
#输入userno
#输入password(MD5)
#返回token
@app.route('/api/login', methods=['GET'])
def login():
return do_login(dict(request.args))
#输入token
#返回有操作权限的资源列表
@app.route('/api/authentication', methods=['GET'])
def authentication():
token = request.headers.get('token')
if not token:
token = None
return do_authentication(dict(request.args), token)
#userno
#返回用户基础信息
@app.route('/api/get_user_info', methods=['GET'])
def get_user_info():
token = request.headers.get('token')
if not token:
token = None
return do_get_user_info(dict(request.args), token)
@app.route('/rerun')
def api_list():
return app.send_static_file('rerun.html')
# return 'Hello, World!'
@app.route('/rerun/set', methods=['GET'])
def set_rerun():
return set_rerun_dates(dict(request.args))
@app.route('/rerun/del', methods=['GET'])
def del_rerun():
return clear_rerun_dates(dict(request.args))
@app.route('/rerun/get', methods=['GET'])
def get_rerun():
return get_rerun_dates(dict(request.args))
@app.route('/api/modify_password', methods=['POST'])
def modify_password():
return do_modify_password(request.get_json())

155
app/user_worker.py Normal file
View File

@ -0,0 +1,155 @@
import configparser
import json
from app.common_worker import check_param
from app.global_source import db_user
from app.user_db_func import *
def query_host_by_nodeid(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid is missing'))
host = g_node2host.get_host(nodeid)
if not host:
return json.dumps(make_common_res(2, 'no host for this nodeid'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['host'] = ScoNodeConfig.host2str(host)
return json.dumps(res)
def do_login(params):
userno = params.get('userno')
password = params.get('password')
token, last_modify_pdw_time = db_user.login(userno, password)
if token is not None and len(token) > 0:
res = make_res(0, 'ok', '登录成功。')
res['token'] = token
areaid_list = db_user.query_areaid_list(userno)
areaid_list = [x for x in set(areaid_list)]
area_infos = db_user.query_all_area_infos()
area_info_list = []
for area_id in areaid_list:
area_id = int(area_id)
area_name = area_infos[area_id]['area_name']
center = area_infos[area_id]['center']
nodeid = area_infos[area_id]['nodeid']
city_name = area_infos[area_id]['city_name']
area_info_list.append({
'nodeid': str(nodeid),
'city_name': city_name,
'area_id': str(area_id),
'area_name': area_name,
'center': center
})
time_diff = datetime.now() - last_modify_pdw_time
if time_diff.days > 30:
return json.dumps(make_common_res(5, '密码已过期,请修改密码后重试'))
res['node_list'] = area_info_list
res['usable_date'] = abs(time_diff.days - 30)
else:
res = make_res(-1, '登录失败,请检查用户名或者密码是否正确。', 'error')
res['token'] = ''
return json.dumps(res)
def do_authentication(params, token):
#token = params.get('token')
if token is None:
res = make_common_res(-1, '鉴权失败,请检查是否已经登录。')
return json.dumps(res)
authority = db_user.authentication(token)
author_map = {}
for author in authority:
author_map[author['resource']] = author['author']
res = make_common_res(0, 'ok')
res['authority'] = author_map
res['desc'] = ''
return json.dumps(res)
#token
def do_get_user_info(params, token):
#token = params.get('token')
if token is None:
res = make_common_res(-1, '鉴权失败,请检查是否已经登录。')
return json.dumps(res)
user = db_user.query_user(token)
if user is not None:
user_t = {'userno': user['userno'], 'user_name': user['user_name'], 'role': user['role'],
'department': user['department']}
res = make_common_res(0, 'ok')
res['token'] = user_t
res['desc'] = ''
else:
res = make_common_res(-1, '查询用户失败请检查token是否正确。')
return json.dumps(res)
def set_rerun_dates(params):
date_list = params.get('dates')
if not date_list:
res = make_common_res(-1, 'dates参数错误')
return json.dumps(res)
ret = db_user.insert_rerun_dates(date_list)
if not ret:
res = make_common_res(1, '入库失败')
else:
res = make_common_res(0, 'ok')
return json.dumps(res)
def clear_rerun_dates(params):
ret = db_user.clear_rerun_dates()
if not ret:
res = make_common_res(1, '清理失败')
else:
res = make_common_res(0, 'ok')
return json.dumps(res)
def get_rerun_dates(params):
day = params.get('day')
if not day:
day = int(get_today_str())
else:
day = int(day)
dates = db_user.query_rerun_dates(day)
date_list = []
if dates:
date_list = dates.split(',')
res = make_common_res(0, 'ok')
res['dates'] = date_list
return json.dumps(res)
def do_modify_password(params):
userid = params.get('userid')
if not userid:
return json.dumps(make_common_res(1, '用户信息缺失,请刷新后重试'))
password = params.get('password')
if not password:
return json.dumps(make_common_res(2, '密码信息缺失,请刷新后重试'))
new_password = params.get('new_password')
if not new_password:
return json.dumps(make_common_res(3, '新密码信息缺失,请刷新后重试'))
check_res, role, last_modify_pwd_time = db_user.check_user_info(userid, password)
if check_res == 0:
ret = db_user.modify_password(userid, new_password)
if ret == 1:
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(4, '修改密码失败,请稍后重试'))
elif check_res == 1:
return json.dumps(make_common_res(3, '用户名不存在,请检查后重试'))
else:
return json.dumps(make_common_res(4, '用户名或密码错误,请检查后重试'))