cross_doctor/app/user_worker.py

156 lines
5.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import configparser
import json
from app.common_worker import check_param
from app.global_source import db_user
from app.user_db_func import *
def query_host_by_nodeid(params):
nodeid = check_param(params, 'nodeid')
if not nodeid:
return json.dumps(make_common_res(1, 'nodeid is missing'))
host = g_node2host.get_host(nodeid)
if not host:
return json.dumps(make_common_res(2, 'no host for this nodeid'))
res = make_common_res(0, 'ok')
res['nodeid'] = nodeid
res['host'] = ScoNodeConfig.host2str(host)
return json.dumps(res)
def do_login(params):
userno = params.get('userno')
password = params.get('password')
token, last_modify_pdw_time = db_user.login(userno, password)
if token is not None and len(token) > 0:
res = make_res(0, 'ok', '登录成功。')
res['token'] = token
areaid_list = db_user.query_areaid_list(userno)
areaid_list = [x for x in set(areaid_list)]
area_infos = db_user.query_all_area_infos()
area_info_list = []
for area_id in areaid_list:
area_id = int(area_id)
area_name = area_infos[area_id]['area_name']
center = area_infos[area_id]['center']
nodeid = area_infos[area_id]['nodeid']
city_name = area_infos[area_id]['city_name']
area_info_list.append({
'nodeid': str(nodeid),
'city_name': city_name,
'area_id': str(area_id),
'area_name': area_name,
'center': center
})
time_diff = datetime.now() - last_modify_pdw_time
if time_diff.days > 30:
return json.dumps(make_common_res(5, '密码已过期,请修改密码后重试'))
res['node_list'] = area_info_list
res['usable_date'] = abs(time_diff.days - 30)
else:
res = make_res(-1, '登录失败,请检查用户名或者密码是否正确。', 'error')
res['token'] = ''
return json.dumps(res)
def do_authentication(params, token):
#token = params.get('token')
if token is None:
res = make_common_res(-1, '鉴权失败,请检查是否已经登录。')
return json.dumps(res)
authority = db_user.authentication(token)
author_map = {}
for author in authority:
author_map[author['resource']] = author['author']
res = make_common_res(0, 'ok')
res['authority'] = author_map
res['desc'] = ''
return json.dumps(res)
#token
def do_get_user_info(params, token):
#token = params.get('token')
if token is None:
res = make_common_res(-1, '鉴权失败,请检查是否已经登录。')
return json.dumps(res)
user = db_user.query_user(token)
if user is not None:
user_t = {'userno': user['userno'], 'user_name': user['user_name'], 'role': user['role'],
'department': user['department']}
res = make_common_res(0, 'ok')
res['token'] = user_t
res['desc'] = ''
else:
res = make_common_res(-1, '查询用户失败请检查token是否正确。')
return json.dumps(res)
def set_rerun_dates(params):
date_list = params.get('dates')
if not date_list:
res = make_common_res(-1, 'dates参数错误')
return json.dumps(res)
ret = db_user.insert_rerun_dates(date_list)
if not ret:
res = make_common_res(1, '入库失败')
else:
res = make_common_res(0, 'ok')
return json.dumps(res)
def clear_rerun_dates(params):
ret = db_user.clear_rerun_dates()
if not ret:
res = make_common_res(1, '清理失败')
else:
res = make_common_res(0, 'ok')
return json.dumps(res)
def get_rerun_dates(params):
day = params.get('day')
if not day:
day = int(get_today_str())
else:
day = int(day)
dates = db_user.query_rerun_dates(day)
date_list = []
if dates:
date_list = dates.split(',')
res = make_common_res(0, 'ok')
res['dates'] = date_list
return json.dumps(res)
def do_modify_password(params):
userid = params.get('userid')
if not userid:
return json.dumps(make_common_res(1, '用户信息缺失,请刷新后重试'))
password = params.get('password')
if not password:
return json.dumps(make_common_res(2, '密码信息缺失,请刷新后重试'))
new_password = params.get('new_password')
if not new_password:
return json.dumps(make_common_res(3, '新密码信息缺失,请刷新后重试'))
check_res, role, last_modify_pwd_time = db_user.check_user_info(userid, password)
if check_res == 0:
ret = db_user.modify_password(userid, new_password)
if ret == 1:
return json.dumps(make_common_res(0, 'ok'))
else:
return json.dumps(make_common_res(4, '修改密码失败,请稍后重试'))
elif check_res == 1:
return json.dumps(make_common_res(3, '用户名不存在,请检查后重试'))
else:
return json.dumps(make_common_res(4, '用户名或密码错误,请检查后重试'))