from app.db_func_base import TableDbHelperBase class TmnetDbHelper(TableDbHelperBase): def __init__(self, pool): self.db_pool = pool self.DB_Name = 'tmnet' def reload_cross_info(self, nodeid, last_reload_time): sql1 = "select * from cross_ledger_update_info where nodeid = %s and crossid is not null" % nodeid sql2 = "select * from road_ledger_update_info where nodeid = %s" % nodeid crosses_list, cross_res_list = self.do_select(sql1), [] for cross_row in crosses_list: if cross_row['update_time'] > last_reload_time: cross_res_list.append(cross_row) roads_list, road_res_list = self.do_select(sql2), [] for road_row in roads_list: if road_row['update_time'] > last_reload_time: road_res_list.append(road_row) return cross_res_list, road_res_list def query_phase_by_cross_list(self, citycode, cross_list): conn, cursor = self.connect() try: sql = f'''select cm.xl_crossid from phasetable.cross_mapping as cm join phasetable.day_schedule as ds on cm.jj_crossid = ds.crossid and cm.nodeid = cm.nodeid where cm.nodeid = %s and cm.xl_crossid in %s group by cm.xl_crossid''' print(cursor.mogrify(sql,(citycode,cross_list))) cursor.execute(sql,(citycode,cross_list)) result = cursor.fetchall() self.close(conn, cursor) return result, None except Exception as error: self.close(conn, cursor) return None, error def query_cross_info(self, nodeid): sql = "select * from `cross` where nodeid = %s" % nodeid return self.do_select(sql) def query_cross_info_new(self, citycode): conn, cursor = self.connect() try: sql = f'''select * from `cross` where nodeid = %s''' print(cursor.mogrify(sql, (citycode))) cursor.execute(sql, (citycode)) result = cursor.fetchall() self.close(conn, cursor) return result, None except Exception as error: self.close(conn, cursor) return None, error def query_cross_list_sql(self, nodeid, area_id): sql = """ select if(t2.name is not null, t2.name, t1.name) as name, t1.crossid, if (t2.location is not null, t2.location, t1.location) as location, t1.nodeid, t1.area_id from `cross` as t1 left join cross_ledger_update_info as t2 on t1.crossid=t2.crossid where t1.at_edge=0 and t1.nodeid = %s and t1.area_id = %s """ % (nodeid, area_id) return self.do_select(sql) def query_cross_inroads(self, crossid, nodeid): sql = """ select t1.roadid, if(t2.from_crossid is not null, t2.from_crossid, t1.from_crossid) as from_crossid, if(t2.to_crossid is not null, t2.to_crossid, t1.to_crossid) as to_crossid, if(t2.name is not null, t2.name, t1.name) as name, if(t2.src_direct is not null, t2.src_direct, t1.src_direct) as src_direct from (select * from road where nodeid = '%s' and recordstate=0 and to_crossid='%s' and (is_sup_road is null or is_sup_road<>1)) t1 left join (select * from road_ledger_update_info where nodeid = '%s' and recordstate=0 and to_crossid='%s' and (is_sup_road is null or is_sup_road<>1)) t2 on t1.roadid = t2.roadid; """ % (nodeid, crossid, nodeid, crossid) return self.do_select(sql) def query_base_info(self, nodeid, area_id): sql = "select * from ledger.leger_base_info where nodeid= %d and area_id = %s" % (int(nodeid), int(area_id)) return self.do_select(sql) def query_city_tp_info(self, nodeid, area_id): sql = f"select tp_desc, peak_tp from cross_doctor_config.area_tp_config where nodeid = {nodeid} and area_id = {area_id}" return self.do_select(sql) def query_all_cross_examine_records(self, yes_str): sql = "select * from cross_doctor_matedata.cross_phase_problems_record where date(update_time) = '%s' and end_date is null" % yes_str return self.do_select(sql) def insert_cross_examine_records(self, insert_list): conn, cursor = self.connect() try: sql = """insert into cross_doctor_matedata.cross_phase_problems_record(start_hm,end_hm,crossid,phase_type, phase_detail,cont_times,final_state,level_color,first_date,change_red_daynums) values(%s, %s, %s, %s,%s, %s, %s, %s,%s, %s) """ values = [ ( d['start_hm'], d['end_hm'], d['crossid'], d['phase_type'], d['phase_detail'], d['cont_times'], d['final_state'], d['level_color'], d['first_date'], d['change_red_daynums'] ) for d in insert_list ] ret = cursor.executemany(sql, values) if ret == len(insert_list): conn.commit() self.close(conn, cursor) return ret, None else: conn.rollback() self.close(conn, cursor) return 0, "insert_cross_examine_records error" except Exception as e: conn.rollback() self.close(conn, cursor) return 0, e def update_cross_examine_records(self, update_list): conn, cursor = self.connect() try: sql = """ update cross_doctor_matedata.cross_phase_problems_record set phase_type = %s, phase_detail = %s, cont_times = %s, final_state = %s, level_color = %s, change_red_daynums = %s, end_date = %s where crossid = %s and start_hm = %s and first_date = %s """ values = [ ( d['phase_type'], d['phase_detail'], d['cont_times'], d['final_state'], d['level_color'], d['change_red_daynums'], d['end_date'], d['crossid'], d['start_hm'], d['first_date'] ) for d in update_list ] ret = cursor.executemany(sql, values) if ret == len(update_list): conn.commit() self.close(conn, cursor) return ret, None else: conn.rollback() self.close(conn, cursor) return ret, None except Exception as e: conn.rollback() self.close(conn, cursor) return 0, e