cross_doctor/app/user_worker.py

405 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import configparser
import json
from pypinyin import lazy_pinyin
from app.common_worker import check_param
from app.global_source import db_user, g_config
from app.user_db_func import *
def query_host_by_nodeid(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid is missing'))
# host = g_node2host.get_host(nodeid)
# if not host:
# return json.dumps(make_common_res(2, 'no host for this nodeid'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
# res['host'] = ScoNodeConfig.host2str(host)
return json.dumps(res)
def do_login(params):
userno = params.get('userno')
password = params.get('password')
ip = check_param(params, 'ip')
city_name = check_param(params, 'city_name')
token, last_modify_pdw_time, role = db_user.login(userno, password)
area_infos = db_user.query_all_area_infos()
if token is not None and len(token) > 0:
res = make_res(0, 'ok', '登录成功。')
res['token'] = token
areaid_list = db_user.query_areaid_list(userno)
if not areaid_list:
if role != 'manager':
return json.dumps(make_common_res(3, '无可用城市信息,请联系管理员添加城市权限'))
else:
areaid_list = list(area_infos.keys())
areaid_list = [x for x in set(areaid_list)]
area_infos = db_user.query_all_area_infos()
area_info_list = []
for area_id in areaid_list:
area_id = int(area_id)
area_name = area_infos[area_id]['area_name']
center = area_infos[area_id]['center']
nodeid = area_infos[area_id]['nodeid']
city_name = area_infos[area_id]['city_name']
area_info_list.append({
'nodeid': str(nodeid),
'city_name': city_name,
'area_id': str(area_id),
'area_name': area_name,
'center': center
})
time_diff = datetime.now() - last_modify_pdw_time
if time_diff.days > 30:
return json.dumps(make_common_res(5, '密码已过期,请修改密码后重试'))
res['node_list'] = area_info_list
res['usable_date'] = abs(time_diff.days - 30)
ret = db_user.insert_login_log(userno, 0, ip, city_name)
if ret != 1:
return json.dumps(make_common_res(6, '登录失败,请检查是否已经登录。'))
else:
res = make_res(-1, '登录失败,请检查用户名或者密码是否正确。', 'error')
res['token'] = ''
return json.dumps(res)
def do_authentication(params, token):
#token = params.get('token')
if token is None:
res = make_common_res(-1, '鉴权失败,请检查是否已经登录。')
return json.dumps(res)
authority = db_user.authentication(token)
author_map = {}
for author in authority:
author_map[author['resource']] = author['author']
res = make_common_res(0, 'ok')
res['authority'] = author_map
res['desc'] = ''
return json.dumps(res)
#token
def do_get_user_info(params, token):
#token = params.get('token')
if token is None:
res = make_common_res(-1, '鉴权失败,请检查是否已经登录。')
return json.dumps(res)
all_area_info = db_user.query_all_area_infos()
user = db_user.query_user(token)
if user is not None:
super_user = 0
if user['userno'] in g_config['executives']:
super_user = 1
role_page = db_user.query_role_page(user['role'])
user_t = {'userno': user['userno'], 'user_name': user['user_name'], 'role': user['role'],
'department': user['department'], 's': super_user, 'role_page': role_page}
if user['role'] == 'manager':
all_areas = db_user.query_all_area_infos()
user_area_info = list(all_areas.values())
else:
user_area_ids = db_user.query_areaid_list(user['userno'])
user_area_info = []
for area_id in user_area_ids:
user_area_info.append(all_area_info[int(area_id)])
res = make_common_res(0, 'ok')
res['data'] = user_area_info
res['token'] = user_t
res['desc'] = ''
else:
res = make_common_res(-1, '查询用户失败请检查token是否正确。')
return json.dumps(res)
def set_rerun_dates(params):
date_list = params.get('dates')
if not date_list:
res = make_common_res(-1, 'dates参数错误')
return json.dumps(res)
ret = db_user.insert_rerun_dates(date_list)
if not ret:
res = make_common_res(1, '入库失败')
else:
res = make_common_res(0, 'ok')
return json.dumps(res)
def clear_rerun_dates(params):
ret = db_user.clear_rerun_dates()
if not ret:
res = make_common_res(1, '清理失败')
else:
res = make_common_res(0, 'ok')
return json.dumps(res)
def get_rerun_dates(params):
day = params.get('day')
if not day:
day = int(get_today_str())
else:
day = int(day)
dates = db_user.query_rerun_dates(day)
date_list = []
if dates:
date_list = dates.split(',')
res = make_common_res(0, 'ok')
res['dates'] = date_list
return json.dumps(res)
def do_modify_password(params):
userid = params.get('userid')
if not userid:
return json.dumps(make_common_res(1, '用户信息缺失,请刷新后重试'))
password = params.get('password')
if not password:
return json.dumps(make_common_res(2, '密码信息缺失,请刷新后重试'))
new_password = params.get('new_password')
if not new_password:
return json.dumps(make_common_res(3, '新密码信息缺失,请刷新后重试'))
if len(new_password) < 6:
return json.dumps(make_common_res(4, '新密码长度不能小于6位'))
ip = check_param(params, 'ip')
city_name = check_param(params, 'city_name')
check_res, role, last_modify_pwd_time = db_user.check_user_info(userid, password)
if check_res == 0:
ret = db_user.modify_password(userid, new_password)
if ret == 1:
insert_log_ret = db_user.insert_login_log(userid, 1, ip, city_name)
if insert_log_ret != 1:
return json.dumps(make_common_res(5, '登录失败,请检查是否已经登录。'))
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(4, '修改密码失败,请稍后重试'))
elif check_res == 1:
return json.dumps(make_common_res(3, '用户名不存在,请检查后重试'))
else:
return json.dumps(make_common_res(4, '用户名或密码错误,请检查后重试'))
def query_user_list(params):
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(1, '用户id信息异常请检查后重试'))
if userid not in g_config['executives']:
return json.dumps(make_common_res(2, '用户无权限'))
keyword = check_param(params, 'keyword')
if not keyword:
keyword = ''
role = check_param(params, 'role')
if not role:
role = 'all'
start_date = check_param(params, 'start_date')
end_date = check_param(params, 'end_date')
if not start_date:
start_date = ''
if not end_date:
end_date = ''
page = check_param(params, 'page')
if not page:
page = 1
page_size = check_param(params, 'page_size')
if not page_size:
page_size = 10
start_index = (int(page) - 1) * int(page_size)
end_index = start_index + int(page_size)
role_dict = {
'headquarters_engineer': '总部工程师',
'project_manager': '项目经理',
'business_expert': '业务专家',
'project_engineer': '项目工程师',
'manager': '超级管理员',
}
user_list = db_user.query_users()
all_area_info = db_user.query_all_area_infos()
res_list = user_list.copy()
if keyword != '':
if any(char.isdigit() for char in keyword):
# 说明是手机号的某个子集
res_list = [item for item in user_list if keyword in item['userno']]
else:
res_list = find_user_info(keyword, user_list)
if role != 'all':
res_list = [item for item in res_list if item['role'] == role]
if start_date != '' and end_date != '':
res_list = [item for item in res_list if start_date <= item['last_modify_pwd_time'].strftime("%Y%m%d") <= end_date]
for item in res_list:
item['last_modify_pwd_time'] = item['last_modify_pwd_time'].strftime("%Y年%m月%d%H:%M:%S")
if item['role'] != 'manager':
area_id_list = item['area_ids'].split(',') if item['area_ids'] else []
nodeid_list = item['nodeids'].split(',') if item['nodeids'] else []
area_name = '' if len(area_id_list) < 1 else ','.join([all_area_info[int(o)]['area_name'] for o in area_id_list])
city_names = '' if len(nodeid_list) < 1 else ','.join([all_area_info[int(o)]['city_name'] for o in area_id_list])
item['city_names'] = city_names
item['area_names'] = area_name
else:
item['city_names'] = '全部'
item['area_names'] = '全部'
res = make_common_res(0, 'ok')
res['data'] = {
'user_list': res_list[start_index:end_index],
'area_list': list(all_area_info.values()),
'total': len(res_list)
}
return json.dumps(res)
def modify_user(params):
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(1, '用户id信息异常请检查后重试'))
if userid not in g_config['executives']:
return json.dumps(make_common_res(2, '用户无权限'))
modify_userid = check_param(params, 'modify_userid')
if not modify_userid:
return json.dumps(make_common_res(3, '缺少修改用户id信息请检查后重试'))
ip = check_param(params, 'ip')
city_name = check_param(params, 'city_name')
user_name = check_param(params, 'user_name')
user_info = db_user.query_user_info_sql(modify_userid)
if not user_info:
return json.dumps(make_common_res(5, '该用户不存在'))
if user_info['role'] == 'manager':
return json.dumps(make_common_res(4, '所选用户为超级管理员,暂不支持修改超级管理员信息'))
if not user_name:
user_name = ''
new_userid = check_param(params, 'new_userid')
if not new_userid:
new_userid = ''
password = check_param(params, 'password')
if not password:
password = ''
area_id_list = check_param(params, 'area_id_list')
if not area_id_list or len(area_id_list) < 1:
area_id_list = []
for area_id in area_id_list:
if not str(area_id).lstrip('-').isdigit():
return json.dumps(make_common_res(6, '区域id信息异常请检查后重试'))
role = check_param(params, 'role')
if not role or role not in ['headquarters_engineer', 'project_manager', 'business_expert', 'project_engineer']:
role = ''
modify_sql = ''
if user_name != '' and user_name != user_info['user_name']:
modify_sql += "set user_name='%s'" % user_name
if new_userid != '' and new_userid != user_info['userno']:
if modify_sql == '':
modify_sql += "set userno='%s'" % new_userid
else:
modify_sql += ",userno='%s'" % new_userid
if password != '' and password != user_info['password']:
if modify_sql == '':
modify_sql += "set password='%s'" % password
else:
modify_sql += ",password='%s'" % password
if role != '' and role != user_info['role']:
if modify_sql == '':
modify_sql += "set role='%s'" % role
else:
modify_sql += ",role='%s'" % role
all_area_info = db_user.query_all_area_infos()
res, ret = db_user.update_user_info(modify_sql, modify_userid, area_id_list, new_userid, all_area_info)
if res and ret == len(area_id_list):
insert_log_ret = db_user.insert_login_log(modify_userid, 2, ip, city_name, userid)
if insert_log_ret == 1:
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(5, '用户信息修改日志记录失败,请检查后重试'))
return json.dumps(make_common_res(5, '修改失败,请检查后重试'))
def del_user(params):
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(1, '用户id信息异常请检查后重试'))
if userid not in g_config['executives']:
return json.dumps(make_common_res(2, '用户无权限'))
modify_userid = check_param(params, 'modify_userid')
if not modify_userid:
return json.dumps(make_common_res(3, '缺少修改用户id信息请检查后重试'))
ip = check_param(params, 'ip')
city_name = check_param(params, 'city_name')
user_info = db_user.query_user_info_sql(modify_userid)
if user_info['role'] == 'manager':
return json.dumps(make_common_res(4, '所选用户为超级管理员,暂不支持删除超级管理员'))
ret = db_user.del_user_sql(modify_userid)
if ret == 1:
insert_log_ret = db_user.insert_login_log(modify_userid, 3, ip, city_name, userid)
if insert_log_ret == 1:
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(5, '用户信息修改日志记录失败,请检查后重试'))
else:
return json.dumps(make_common_res(5, '删除失败,请检查后重试'))
def create_user(params):
userid = check_param(params, 'userid')
if not userid:
return json.dumps(make_common_res(1, '用户id信息异常请检查后重试'))
if userid not in g_config['executives']:
return json.dumps(make_common_res(2, '用户无权限'))
modify_userid = check_param(params, 'modify_userid')
if not modify_userid:
return json.dumps(make_common_res(3, '缺少修改用户id信息请检查后重试'))
user_name = check_param(params, 'user_name')
if not user_name:
return json.dumps(make_common_res(4, '缺少用户名信息,请检查后重试'))
password = check_param(params, 'password')
if not password:
return json.dumps(make_common_res(5, '缺少密码信息,请检查后重试'))
if len(password) < 6:
return json.dumps(make_common_res(6, '密码长度不能小于6位请检查后重试'))
role = check_param(params, 'role')
if not role:
return json.dumps(make_common_res(7, '缺少角色信息,请检查后重试'))
if role not in ('headquarters_engineer', 'project_manager', 'business_expert', 'project_engineer'):
return json.dumps(make_common_res(8, '角色信息异常,请检查后重试'))
area_id_list = check_param(params, 'area_id_list')
if not area_id_list or len(area_id_list) < 1:
area_id_list = []
for area_id in area_id_list:
if not str(area_id).lstrip('-').isdigit():
return json.dumps(make_common_res(6, '区域id信息异常请检查后重试'))
ip = check_param(params, 'ip')
city_name = check_param(params, 'city_name')
all_area_info = db_user.query_all_area_infos()
ret = db_user.insert_user(modify_userid, password, user_name, area_id_list, all_area_info, role)
if ret:
insert_log_ret = db_user.insert_login_log(modify_userid, 4, ip, city_name, userid)
if insert_log_ret == 1:
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(7, '用户信息修改日志记录失败,请检查后重试'))
return json.dumps(make_common_res(7, '创建失败,请检查后重试'))
def find_user_info(key, value_list):
result = []
# 遍历名字列表
for value in value_list:
# 检查是否完全匹配
if key in value['user_name']:
result.append(value)
# 检查是否是首字母缩写匹配
elif len(key) >= 2 and len(value['user_name']) >= 2:
# 将名字转换为拼音并提取首字母
first_pinyin = lazy_pinyin(value['user_name'][0])[0]
second_pinyin = lazy_pinyin(value['user_name'][1])[0]
if key[0].lower() == first_pinyin[0].lower() and key[1].lower() == second_pinyin[0].lower():
result.append(value)
# # 检查是否包含关键字
# elif key in value['name']:
# result.append(value)
return result