import logging import os import sys from typing import List import grpc from app.global_source import g_config from google.protobuf.json_format import MessageToDict sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../proto'))) from proto import phase_server_pb2_grpc, phase_server_pb2 def channel_stub(): channel = grpc.insecure_channel(f"{g_config['rpc_host']}:{g_config['rpc_port']}") stub = phase_server_pb2_grpc.PhaseServiceStub(channel) return stub, channel def get_exception_phase_cross(): stub, channel = channel_stub() try: return stub.ExceptionPhaseCross(phase_server_pb2.EmptyRequest(), timeout=30) except Exception as e: logging.error(e) return phase_server_pb2.ExceptionPhaseCrossResponse() finally: channel.close() def get_exception_phase_cross_info(citycode): stub, channel = channel_stub() try: request_params = phase_server_pb2.ExceptionPhaseCrossInfoRequest(citycode=citycode) return stub.ExceptionPhaseCrossInfo(request_params, timeout=30), None except Exception as e: return None, e finally: channel.close() def get_green_cross_phase(citycode: int = 0, crossids: List[str] = []): stub, channel = channel_stub() try: request_params = phase_server_pb2.ExceptionPhaseByCrossIDsRequest(citycode=citycode, crossid=crossids) return stub.ExceptionPhaseByCrossIDs(request_params, timeout=30), None except Exception as e: return None, e finally: channel.close() def GetCrossPhaseDetailByCrossIDs(citycode: int = 0, crossids: List[str] = [], gw_src_dir='', tp_start='', tp_end='', week='', wave_id=''): stub, channel = channel_stub() try: request_params = phase_server_pb2.GetCrossPhaseDetailByCrossIDsRequest(citycode=citycode, crossids=crossids, gw_src_dir=gw_src_dir, tp_start=tp_start, tp_end=tp_end, week=week, wave_id=wave_id) return stub.GetCrossPhaseDetailByCrossIDs(request_params, timeout=30), None except Exception as e: return None, e finally: channel.close() #获取有配时方案路口ID def GetPhaseCrossID(): stub, channel = channel_stub() try: return stub.GetPhaseCrossID(phase_server_pb2.EmptyRequest(), timeout=30), None except Exception as e: return None, e finally: channel.close() def GetPhaseGreenRatio(citycode: int = 0, crossids: List[str] = [], gw_src_dir='', tp_start='', tp_end='', week='', wave_id=''): stub, channel = channel_stub() try: request_params = phase_server_pb2.PhaseGreenRatioRequest(citycode=citycode, crossids=crossids, gw_src_dir=gw_src_dir, tp_start=tp_start, tp_end=tp_end, week=week, wave_id=wave_id) return stub.PhaseGreenRatio(request_params, timeout=30), None except Exception as e: return None, e finally: channel.close() def GetQueryCrossPhaseRelativeOffset(citycode: int = 0, crossids: List[str] = [], tp: str = '', day: str = ''): stub, channel = channel_stub() try: request_params = phase_server_pb2.QueryCrossPhaseRelativeOffsetRequest(citycode=citycode, crossids=crossids, tp=tp, day=day) return stub.QueryCrossPhaseRelativeOffset(request_params, timeout=30), None except Exception as e: return None, e finally: channel.close() #更新单绿波配时方案时段与路口配时方案时段对比异常状态 def QueryGreenWaveCrossPhaseTpCheck(waveid: str, citycode: int): stub, channel = channel_stub() try: request_params = phase_server_pb2.GreenWaveCrossPhaseTpCheckRequest(waveid=waveid, citycode=citycode) return stub.GreenWaveCrossPhaseTpCheck(request_params, timeout=30), None except Exception as e: return None, e finally: channel.close() def QueryCrossRunningPhase(citycode: int, crossids: [], date_list: [], tp=''): """ 路口运行中配时方案 """ stub, channel = channel_stub() try: request_params = phase_server_pb2.CrossRunningPhaseRequest(citycode=citycode, crossids=crossids, date_list=date_list, tp=tp) return stub.CrossRunningPhase(request_params, timeout=30), None except Exception as e: return None, e finally: channel.close() def QueryCrossPhaseDiagnosis(citycode: int, crossid: str, date_list: List, tp: str, area_id: int): """ 路口配时方案诊断 返回dict: {},None 如果报错返回: None, error example: response , error = QueryCrossPhaseDiagnosis(citycode,crossid,[20251010], "07:00-07:20", 350101) if error: print(error) return f"{error}" if len(response) > 0: print(response) """ stub, channel = channel_stub() try: request_params = phase_server_pb2.CrossPhaseDiagnosisRequest(citycode=citycode, crossid=crossid, date_list=date_list, tp=tp, area_id=area_id) response = stub.CrossPhaseDiagnosis(request_params, timeout=30) if response.code != 0: raise Exception(response.msg) data = MessageToDict(response.data, preserving_proto_field_name=True) if len(data) > 0: data['total_num'] = len(data['values']) return data, None except Exception as e: return None, e finally: channel.close() def QueryCrossPhaseByCity(citycode: int): """ 根据城市ID查询有配时方案路口ID """ stub, channel = channel_stub() try: request_params = phase_server_pb2.CrossPhaseByCityRequest(citycode=citycode) response = stub.CrossPhaseByCity(request_params, timeout=30) if response.code != 0: raise Exception(response.msg) return list(response.data), None except Exception as e: return [], e finally: channel.close() def QueryCrossPhaseTpStatistics(citycode: int, area_id: int, schedule_week: str, date_list: List[str]): """ 统计城市配时时段 """ stub, channel = channel_stub() try: request_params = phase_server_pb2.CrossPhaseStatisticsRequest(citycode=citycode, area_id=area_id, schedule_week=schedule_week, date_list=date_list, ) response = stub.CrossPhaseStatistics(request_params, timeout=30) if response.code != 0: raise Exception(response.msg) return response, None except Exception as e: return None, e finally: channel.close() def ModifyCrossPhaseDiagnosis(crossid: str, citycode: int, area_id: int): """ 更新路口配时方案诊断数据 """ stub, channel = channel_stub() try: request_params = phase_server_pb2.UpdateCrossPhaseDiagnosisRequest(crossid=crossid, citycode=citycode, area_id=area_id) response = stub.UpdateCrossPhaseDiagnosis(request_params, timeout=30) if response.code != 0: raise Exception(response.msg) return response, None except Exception as e: return None, e finally: channel.close() def GetCrossPhaseDiagnosis(citycode: int, area_id: int, schedule_week: str, tp: str): """ 根据城市ID查询配时方案诊断 schedule_week = '1,2,3,4,5' tp = '07:00-07:20' """ stub, channel = channel_stub() try: request_params = phase_server_pb2.CrossPhaseDiagnosisByCityRequest(citycode=citycode, area_id=area_id, schedule_week=schedule_week, tp=tp) response = stub.CrossPhaseDiagnosisByCity(request_params, timeout=30) if response.code != 0: raise Exception(response.msg) return list(response.data), None except Exception as e: return None, e finally: channel.close()