cross_doctor/proto/phase_grpc.py

178 lines
6.4 KiB
Python
Raw Normal View History

import logging
import os
import sys
from typing import List
import grpc
from app.global_source import g_config
from google.protobuf.json_format import MessageToDict
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../proto')))
from proto import phase_server_pb2_grpc, phase_server_pb2
def channel_stub():
channel = grpc.insecure_channel(f"{g_config['rpc_host']}:{g_config['rpc_port']}")
stub = phase_server_pb2_grpc.PhaseServiceStub(channel)
return stub, channel
def get_exception_phase_cross():
stub, channel = channel_stub()
try:
return stub.ExceptionPhaseCross(phase_server_pb2.EmptyRequest(), timeout=30)
except Exception as e:
logging.error(e)
return phase_server_pb2.ExceptionPhaseCrossResponse()
finally:
channel.close()
def get_exception_phase_cross_info(citycode):
stub, channel = channel_stub()
try:
request_params = phase_server_pb2.ExceptionPhaseCrossInfoRequest(citycode=citycode)
return stub.ExceptionPhaseCrossInfo(request_params, timeout=30), None
except Exception as e:
return None, e
finally:
channel.close()
def get_green_cross_phase(citycode: int = 0, crossids: List[str] = []):
stub, channel = channel_stub()
try:
request_params = phase_server_pb2.ExceptionPhaseByCrossIDsRequest(citycode=citycode, crossid=crossids)
return stub.ExceptionPhaseByCrossIDs(request_params, timeout=30), None
except Exception as e:
return None, e
finally:
channel.close()
def GetCrossPhaseDetailByCrossIDs(citycode: int = 0, crossids: List[str] = [], gw_src_dir='', tp_start='', tp_end='',
week='', wave_id=''):
stub, channel = channel_stub()
try:
request_params = phase_server_pb2.GetCrossPhaseDetailByCrossIDsRequest(citycode=citycode, crossids=crossids,
gw_src_dir=gw_src_dir, tp_start=tp_start,
tp_end=tp_end, week=week,
wave_id=wave_id)
return stub.GetCrossPhaseDetailByCrossIDs(request_params, timeout=30), None
except Exception as e:
return None, e
finally:
channel.close()
#获取有配时方案路口ID
def GetPhaseCrossID():
stub, channel = channel_stub()
try:
return stub.GetPhaseCrossID(phase_server_pb2.EmptyRequest(), timeout=30), None
except Exception as e:
return None, e
finally:
channel.close()
def GetPhaseGreenRatio(citycode: int = 0, crossids: List[str] = [], gw_src_dir='', tp_start='', tp_end='', week='',
wave_id=''):
stub, channel = channel_stub()
try:
request_params = phase_server_pb2.PhaseGreenRatioRequest(citycode=citycode, crossids=crossids,
gw_src_dir=gw_src_dir, tp_start=tp_start,
tp_end=tp_end, week=week, wave_id=wave_id)
return stub.PhaseGreenRatio(request_params, timeout=30), None
except Exception as e:
return None, e
finally:
channel.close()
def GetQueryCrossPhaseRelativeOffset(citycode: int = 0, crossids: List[str] = [], tp: str = '', day: str = ''):
stub, channel = channel_stub()
try:
request_params = phase_server_pb2.QueryCrossPhaseRelativeOffsetRequest(citycode=citycode, crossids=crossids,
tp=tp, day=day)
return stub.QueryCrossPhaseRelativeOffset(request_params, timeout=30), None
except Exception as e:
return None, e
finally:
channel.close()
#更新单绿波配时方案时段与路口配时方案时段对比异常状态
def QueryGreenWaveCrossPhaseTpCheck(waveid: str, citycode: int):
stub, channel = channel_stub()
try:
request_params = phase_server_pb2.GreenWaveCrossPhaseTpCheckRequest(waveid=waveid, citycode=citycode)
return stub.GreenWaveCrossPhaseTpCheck(request_params, timeout=30), None
except Exception as e:
return None, e
finally:
channel.close()
def QueryCrossRunningPhase(citycode: int, crossids: [], date_list: [], tp=''):
"""
路口运行中配时方案
"""
stub, channel = channel_stub()
try:
request_params = phase_server_pb2.CrossRunningPhaseRequest(citycode=citycode, crossids=crossids,
date_list=date_list, tp=tp)
return stub.CrossRunningPhase(request_params, timeout=30), None
except Exception as e:
return None, e
finally:
channel.close()
def QueryCrossPhaseDiagnosis(citycode: int, crossid: str, date_list: List, tp: str, area_id: int):
"""
路口配时方案诊断
返回dict: {},None
如果报错返回: None, error
example:
response , error = QueryCrossPhaseDiagnosis(citycode,crossid,[20251010], "07:00-07:20", 350101)
if error:
print(error)
return f"{error}"
if len(response) > 0:
print(response)
"""
stub, channel = channel_stub()
try:
request_params = phase_server_pb2.CrossPhaseDiagnosisRequest(citycode=citycode, crossid=crossid,
date_list=date_list, tp=tp, area_id=area_id)
response = stub.CrossPhaseDiagnosis(request_params, timeout=30)
if response.code != 0:
raise Exception(response.msg)
data = MessageToDict(response.data, preserving_proto_field_name=True)
if len(data) > 0:
data['total_num'] = len(data['values'])
return data, None
except Exception as e:
return None, e
finally:
channel.close()
def QueryCrossPhaseByCity(citycode: int):
"""
根据城市ID查询有配时方案路口ID
"""
stub, channel = channel_stub()
try:
request_params = phase_server_pb2.CrossPhaseByCityRequest(citycode=citycode)
response = stub.CrossPhaseByCity(request_params, timeout=30)
if response.code != 0:
raise Exception(response.msg)
return list(response.data), None
except Exception as e:
return [], e
finally:
channel.close()