cross_doctor/app/user_db_func.py

108 lines
4.0 KiB
Python

# -*- coding:utf-8 -*-
#import logging
import pymysql
import pymysql.cursors
from datetime import datetime
from flask import g
from app.db_func_base import *
import hashlib
class UserDbHelper(TableDbHelperBase):
def __init__(self, pool):
self.db_pool = pool
self.DB_Name = 'user'
def generate_md5(self, input_str):
md5 = hashlib.md5() # 创建一个md5对象
md5.update(input_str.encode('utf-8')) # 使用utf-8编码
return md5.hexdigest() # 返回十六进制的哈希值
def login(self, userno, password_md5):
sql_query = "select password,token from `user` where userno='%s'" % (userno)
users = self.do_select(sql_query)
if len(users) != 1:
logging.error('query_ledger error! %s' % (sql_query))
return None
str_md5 = self.generate_md5(users[0]['password'])
if str_md5==password_md5:
return users[0]['token']
else:
return ''
def authentication(self, token):
sql_query = "select author.resource ,author.author from `user`,`author` where user.role=author.role and user.token='%s'" % (token)
return self.do_select(sql_query)
def query_user(self, token):
sql_query = "select * from `user` where token='%s'" %(token)
users = self.do_select(sql_query)
if len(users) != 1:
logging.error('query_ledger error! %s' % (sql_query))
return None
return users[0]
# def query_org_id(self, userid):
# sql1 = "select * from user where userid = '%s'" % (userid)
# res = self.do_select(sql1)
# if not res or len(res) == 0:
# orgid_list = []
# logging.error('查询用户当前组织信息失败,请检查当前用户信息是否存在异常! %s' % (sql1))
# elif res[0]['role'] != 'manager':
# orgid_list = []
# sql_query = "select orgid from `org_user` where userid='%s'" %(userid)
# orgs = self.do_select(sql_query)
# for org in orgs:
# orgid_list.append(org['orgid'])
# else:
# orgid_list = list(g_citycode_set)
#
# return orgid_list
def query_org(self, userid):
orgid_list = []
sql_query = "select orgid from `org_user` where userid='%s'" %(userid)
orgs = self.do_select(sql_query)
for org in orgs:
orgid_list.append(int(org['orgid']))
return orgid_list
def query_user_role(self, userid):
sql = "select * from user where userid = '%s'" % (userid)
res = self.do_select(sql)
if not res or len(res) == 0:
logging.error('查询用户当前组织信息失败,请检查当前用户信息是否存在异常! %s' % (sql))
return None
return res[0]
def check_user_info(self, userid, password):
sql = "select * from user where userid = '%s' and status < 1" % (userid)
res = self.do_select(sql)
if not res or len(res) == 0:
logging.error('查询用户当前组织信息失败,请检查当前用户信息是否存在异常! %s' % (sql))
return 1, None, None, None
else:
if self.generate_md5(res[0]['password']) != password:
logging.error('用户密码错误,请检查用户密码是否正确! %s, %s' % (res[0]['password'], password))
return 2, None, None, None
else:
return 0, res[0]['role'], res[0]['last_modify_pwd_time'], res[0]['user_name']
def modify_password(self, userid, new_password):
sql = "update user set password = '%s' where userid = '%s'" % (new_password, userid)
return self.do_execute(sql)
def query_user_areas(self, userid):
sql = "select * from area_user where userno = '%s'" % (userid)
res = self.do_select(sql)
if not res or len(res) == 0:
logging.error('查询用户当前组织信息失败,请检查当前用户信息是否存在异常! %s' % (sql))
return None
return res