# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc import warnings import phase_server_pb2 as phase__server__pb2 GRPC_GENERATED_VERSION = '1.71.0' GRPC_VERSION = grpc.__version__ _version_not_supported = False try: from grpc._utilities import first_version_is_lower _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) except ImportError: _version_not_supported = True if _version_not_supported: raise RuntimeError( f'The grpc package installed is at version {GRPC_VERSION},' + f' but the generated code in phase_server_pb2_grpc.py depends on' + f' grpcio>={GRPC_GENERATED_VERSION}.' + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' ) class PhaseServiceStub(object): """Missing associated documentation comment in .proto file.""" def __init__(self, channel): """Constructor. Args: channel: A grpc.Channel. """ self.ExceptionPhaseCross = channel.unary_unary( '/phase_server.PhaseService/ExceptionPhaseCross', request_serializer=phase__server__pb2.EmptyRequest.SerializeToString, response_deserializer=phase__server__pb2.ExceptionPhaseCrossResponse.FromString, _registered_method=True) self.ExceptionPhaseCrossInfo = channel.unary_unary( '/phase_server.PhaseService/ExceptionPhaseCrossInfo', request_serializer=phase__server__pb2.ExceptionPhaseCrossInfoRequest.SerializeToString, response_deserializer=phase__server__pb2.ExceptionPhaseCrossInfoResponse.FromString, _registered_method=True) self.ExceptionPhaseByCrossIDs = channel.unary_unary( '/phase_server.PhaseService/ExceptionPhaseByCrossIDs', request_serializer=phase__server__pb2.ExceptionPhaseByCrossIDsRequest.SerializeToString, response_deserializer=phase__server__pb2.ExceptionPhaseByCrossIDsResponse.FromString, _registered_method=True) self.GetCrossPhaseDetailByCrossIDs = channel.unary_unary( '/phase_server.PhaseService/GetCrossPhaseDetailByCrossIDs', request_serializer=phase__server__pb2.GetCrossPhaseDetailByCrossIDsRequest.SerializeToString, response_deserializer=phase__server__pb2.GetCrossPhaseDetailByCrossIDsResponse.FromString, _registered_method=True) self.GetPhaseCrossID = channel.unary_unary( '/phase_server.PhaseService/GetPhaseCrossID', request_serializer=phase__server__pb2.EmptyRequest.SerializeToString, response_deserializer=phase__server__pb2.GetPhaseCrossIDResponse.FromString, _registered_method=True) self.PhaseGreenRatio = channel.unary_unary( '/phase_server.PhaseService/PhaseGreenRatio', request_serializer=phase__server__pb2.PhaseGreenRatioRequest.SerializeToString, response_deserializer=phase__server__pb2.PhaseGreenRatioResponse.FromString, _registered_method=True) self.CrossList = channel.unary_unary( '/phase_server.PhaseService/CrossList', request_serializer=phase__server__pb2.CrossListRequest.SerializeToString, response_deserializer=phase__server__pb2.CrossListResponse.FromString, _registered_method=True) self.QueryCrossPhaseRelativeOffset = channel.unary_unary( '/phase_server.PhaseService/QueryCrossPhaseRelativeOffset', request_serializer=phase__server__pb2.QueryCrossPhaseRelativeOffsetRequest.SerializeToString, response_deserializer=phase__server__pb2.QueryCrossPhaseRelativeOffsetResponse.FromString, _registered_method=True) self.GreenWaveCrossPhaseTpCheck = channel.unary_unary( '/phase_server.PhaseService/GreenWaveCrossPhaseTpCheck', request_serializer=phase__server__pb2.GreenWaveCrossPhaseTpCheckRequest.SerializeToString, response_deserializer=phase__server__pb2.GreenWaveCrossPhaseTpCheckResponse.FromString, _registered_method=True) self.CrossRunningPhase = channel.unary_unary( '/phase_server.PhaseService/CrossRunningPhase', request_serializer=phase__server__pb2.CrossRunningPhaseRequest.SerializeToString, response_deserializer=phase__server__pb2.CrossRunningPhaseResponse.FromString, _registered_method=True) self.CrossPhaseDiagnosis = channel.unary_unary( '/phase_server.PhaseService/CrossPhaseDiagnosis', request_serializer=phase__server__pb2.CrossPhaseDiagnosisRequest.SerializeToString, response_deserializer=phase__server__pb2.CrossPhaseDiagnosisResponse.FromString, _registered_method=True) class PhaseServiceServicer(object): """Missing associated documentation comment in .proto file.""" def ExceptionPhaseCross(self, request, context): """获取异常路口 """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def ExceptionPhaseCrossInfo(self, request, context): """异常路口详情列表 """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def ExceptionPhaseByCrossIDs(self, request, context): """根据路口ID返回异常配时 """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def GetCrossPhaseDetailByCrossIDs(self, request, context): """根据路口ID返回路口配时方案详情 """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def GetPhaseCrossID(self, request, context): """获取有配时方案路口列表 """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def PhaseGreenRatio(self, request, context): """获取配时方案协调/非协调方向绿灯时长比 """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def CrossList(self, request, context): """获取配时方案协调/非协调方向绿灯时长比 """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def QueryCrossPhaseRelativeOffset(self, request, context): """依据日期,时刻查询多路口配时方案与相对相位差 """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def GreenWaveCrossPhaseTpCheck(self, request, context): """单条绿波配时方案时段与路口配时方案时段校验 """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def CrossRunningPhase(self, request, context): """路口运行中的配时方案 """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def CrossPhaseDiagnosis(self, request, context): """路口配时方案诊断列表 """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def add_PhaseServiceServicer_to_server(servicer, server): rpc_method_handlers = { 'ExceptionPhaseCross': grpc.unary_unary_rpc_method_handler( servicer.ExceptionPhaseCross, request_deserializer=phase__server__pb2.EmptyRequest.FromString, response_serializer=phase__server__pb2.ExceptionPhaseCrossResponse.SerializeToString, ), 'ExceptionPhaseCrossInfo': grpc.unary_unary_rpc_method_handler( servicer.ExceptionPhaseCrossInfo, request_deserializer=phase__server__pb2.ExceptionPhaseCrossInfoRequest.FromString, response_serializer=phase__server__pb2.ExceptionPhaseCrossInfoResponse.SerializeToString, ), 'ExceptionPhaseByCrossIDs': grpc.unary_unary_rpc_method_handler( servicer.ExceptionPhaseByCrossIDs, request_deserializer=phase__server__pb2.ExceptionPhaseByCrossIDsRequest.FromString, response_serializer=phase__server__pb2.ExceptionPhaseByCrossIDsResponse.SerializeToString, ), 'GetCrossPhaseDetailByCrossIDs': grpc.unary_unary_rpc_method_handler( servicer.GetCrossPhaseDetailByCrossIDs, request_deserializer=phase__server__pb2.GetCrossPhaseDetailByCrossIDsRequest.FromString, response_serializer=phase__server__pb2.GetCrossPhaseDetailByCrossIDsResponse.SerializeToString, ), 'GetPhaseCrossID': grpc.unary_unary_rpc_method_handler( servicer.GetPhaseCrossID, request_deserializer=phase__server__pb2.EmptyRequest.FromString, response_serializer=phase__server__pb2.GetPhaseCrossIDResponse.SerializeToString, ), 'PhaseGreenRatio': grpc.unary_unary_rpc_method_handler( servicer.PhaseGreenRatio, request_deserializer=phase__server__pb2.PhaseGreenRatioRequest.FromString, response_serializer=phase__server__pb2.PhaseGreenRatioResponse.SerializeToString, ), 'CrossList': grpc.unary_unary_rpc_method_handler( servicer.CrossList, request_deserializer=phase__server__pb2.CrossListRequest.FromString, response_serializer=phase__server__pb2.CrossListResponse.SerializeToString, ), 'QueryCrossPhaseRelativeOffset': grpc.unary_unary_rpc_method_handler( servicer.QueryCrossPhaseRelativeOffset, request_deserializer=phase__server__pb2.QueryCrossPhaseRelativeOffsetRequest.FromString, response_serializer=phase__server__pb2.QueryCrossPhaseRelativeOffsetResponse.SerializeToString, ), 'GreenWaveCrossPhaseTpCheck': grpc.unary_unary_rpc_method_handler( servicer.GreenWaveCrossPhaseTpCheck, request_deserializer=phase__server__pb2.GreenWaveCrossPhaseTpCheckRequest.FromString, response_serializer=phase__server__pb2.GreenWaveCrossPhaseTpCheckResponse.SerializeToString, ), 'CrossRunningPhase': grpc.unary_unary_rpc_method_handler( servicer.CrossRunningPhase, request_deserializer=phase__server__pb2.CrossRunningPhaseRequest.FromString, response_serializer=phase__server__pb2.CrossRunningPhaseResponse.SerializeToString, ), 'CrossPhaseDiagnosis': grpc.unary_unary_rpc_method_handler( servicer.CrossPhaseDiagnosis, request_deserializer=phase__server__pb2.CrossPhaseDiagnosisRequest.FromString, response_serializer=phase__server__pb2.CrossPhaseDiagnosisResponse.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( 'phase_server.PhaseService', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) server.add_registered_method_handlers('phase_server.PhaseService', rpc_method_handlers) # This class is part of an EXPERIMENTAL API. class PhaseService(object): """Missing associated documentation comment in .proto file.""" @staticmethod def ExceptionPhaseCross(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/phase_server.PhaseService/ExceptionPhaseCross', phase__server__pb2.EmptyRequest.SerializeToString, phase__server__pb2.ExceptionPhaseCrossResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True) @staticmethod def ExceptionPhaseCrossInfo(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/phase_server.PhaseService/ExceptionPhaseCrossInfo', phase__server__pb2.ExceptionPhaseCrossInfoRequest.SerializeToString, phase__server__pb2.ExceptionPhaseCrossInfoResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True) @staticmethod def ExceptionPhaseByCrossIDs(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/phase_server.PhaseService/ExceptionPhaseByCrossIDs', phase__server__pb2.ExceptionPhaseByCrossIDsRequest.SerializeToString, phase__server__pb2.ExceptionPhaseByCrossIDsResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True) @staticmethod def GetCrossPhaseDetailByCrossIDs(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/phase_server.PhaseService/GetCrossPhaseDetailByCrossIDs', phase__server__pb2.GetCrossPhaseDetailByCrossIDsRequest.SerializeToString, phase__server__pb2.GetCrossPhaseDetailByCrossIDsResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True) @staticmethod def GetPhaseCrossID(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/phase_server.PhaseService/GetPhaseCrossID', phase__server__pb2.EmptyRequest.SerializeToString, phase__server__pb2.GetPhaseCrossIDResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True) @staticmethod def PhaseGreenRatio(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/phase_server.PhaseService/PhaseGreenRatio', phase__server__pb2.PhaseGreenRatioRequest.SerializeToString, phase__server__pb2.PhaseGreenRatioResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True) @staticmethod def CrossList(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/phase_server.PhaseService/CrossList', phase__server__pb2.CrossListRequest.SerializeToString, phase__server__pb2.CrossListResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True) @staticmethod def QueryCrossPhaseRelativeOffset(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/phase_server.PhaseService/QueryCrossPhaseRelativeOffset', phase__server__pb2.QueryCrossPhaseRelativeOffsetRequest.SerializeToString, phase__server__pb2.QueryCrossPhaseRelativeOffsetResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True) @staticmethod def GreenWaveCrossPhaseTpCheck(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/phase_server.PhaseService/GreenWaveCrossPhaseTpCheck', phase__server__pb2.GreenWaveCrossPhaseTpCheckRequest.SerializeToString, phase__server__pb2.GreenWaveCrossPhaseTpCheckResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True) @staticmethod def CrossRunningPhase(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/phase_server.PhaseService/CrossRunningPhase', phase__server__pb2.CrossRunningPhaseRequest.SerializeToString, phase__server__pb2.CrossRunningPhaseResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True) @staticmethod def CrossPhaseDiagnosis(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/phase_server.PhaseService/CrossPhaseDiagnosis', phase__server__pb2.CrossPhaseDiagnosisRequest.SerializeToString, phase__server__pb2.CrossPhaseDiagnosisResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True)