Skip to content

Commit

Permalink
Merge https://github.com/grpc/grpc into v3-server
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Jan 5, 2024
2 parents 9429a46 + a3e24ed commit b59f610
Show file tree
Hide file tree
Showing 34 changed files with 1,339 additions and 299 deletions.
2 changes: 2 additions & 0 deletions requirements.bazel.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ setuptools==44.1.1
xds-protos==0.0.11
absl-py==1.4.0
googleapis-common-protos==1.61.0
opentelemetry-sdk==1.21.0
opentelemetry-api==1.21.0
80 changes: 80 additions & 0 deletions src/python/grpcio/grpc/_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class _RPCState(object):
rpc_start_time: Optional[float] # In relative seconds
rpc_end_time: Optional[float] # In relative seconds
method: Optional[str]
target: Optional[str]

def __init__(
self,
Expand Down Expand Up @@ -163,6 +164,7 @@ def __init__(
self.rpc_start_time = None
self.rpc_end_time = None
self.method = None
self.target = None

# The semantics of grpc.Future.cancel and grpc.Future.cancelled are
# slightly wonky, so they have to be tracked separately from the rest of the
Expand Down Expand Up @@ -1048,22 +1050,35 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
_channel: cygrpc.Channel
_managed_call: IntegratedCallFactory
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any

__slots__ = [
"_channel",
"_managed_call",
"_method",
"_target",
"_request_serializer",
"_response_deserializer",
"_context",
]

# pylint: disable=too-many-arguments
def __init__(
self,
channel: cygrpc.Channel,
managed_call: IntegratedCallFactory,
method: bytes,
target: bytes,
request_serializer: Optional[SerializingFunction],
response_deserializer: Optional[DeserializingFunction],
):
self._channel = channel
self._managed_call = managed_call
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
Expand Down Expand Up @@ -1123,6 +1138,7 @@ def _blocking(
else:
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._channel.segregated_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand Down Expand Up @@ -1194,6 +1210,7 @@ def future(
event_handler = _event_handler(state, self._response_deserializer)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand All @@ -1213,20 +1230,32 @@ def future(
class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
_channel: cygrpc.Channel
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any

__slots__ = [
"_channel",
"_method",
"_target",
"_request_serializer",
"_response_deserializer",
"_context",
]

# pylint: disable=too-many-arguments
def __init__(
self,
channel: cygrpc.Channel,
method: bytes,
target: bytes,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction,
):
self._channel = channel
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
Expand Down Expand Up @@ -1278,6 +1307,7 @@ def __call__( # pylint: disable=too-many-locals
operations_and_tags = tuple((ops, None) for ops in operations)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._channel.segregated_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand All @@ -1297,22 +1327,35 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
_channel: cygrpc.Channel
_managed_call: IntegratedCallFactory
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any

__slots__ = [
"_channel",
"_managed_call",
"_method",
"_target",
"_request_serializer",
"_response_deserializer",
"_context",
]

# pylint: disable=too-many-arguments
def __init__(
self,
channel: cygrpc.Channel,
managed_call: IntegratedCallFactory,
method: bytes,
target: bytes,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction,
):
self._channel = channel
self._managed_call = managed_call
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
Expand Down Expand Up @@ -1354,6 +1397,7 @@ def __call__( # pylint: disable=too-many-locals
)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand All @@ -1374,22 +1418,35 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
_channel: cygrpc.Channel
_managed_call: IntegratedCallFactory
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any

__slots__ = [
"_channel",
"_managed_call",
"_method",
"_target",
"_request_serializer",
"_response_deserializer",
"_context",
]

# pylint: disable=too-many-arguments
def __init__(
self,
channel: cygrpc.Channel,
managed_call: IntegratedCallFactory,
method: bytes,
target: bytes,
request_serializer: Optional[SerializingFunction],
response_deserializer: Optional[DeserializingFunction],
):
self._channel = channel
self._managed_call = managed_call
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
Expand All @@ -1413,6 +1470,7 @@ def _blocking(
)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._channel.segregated_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand Down Expand Up @@ -1501,6 +1559,7 @@ def future(
)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand Down Expand Up @@ -1530,22 +1589,35 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
_channel: cygrpc.Channel
_managed_call: IntegratedCallFactory
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any

__slots__ = [
"_channel",
"_managed_call",
"_method",
"_target",
"_request_serializer",
"_response_deserializer",
"_context",
]

# pylint: disable=too-many-arguments
def __init__(
self,
channel: cygrpc.Channel,
managed_call: IntegratedCallFactory,
method: bytes,
target: bytes,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
):
self._channel = channel
self._managed_call = managed_call
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
Expand Down Expand Up @@ -1579,6 +1651,7 @@ def __call__(
event_handler = _event_handler(state, self._response_deserializer)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand Down Expand Up @@ -1947,6 +2020,7 @@ class Channel(grpc.Channel):
_channel: cygrpc.Channel
_call_state: _ChannelCallState
_connectivity_state: _ChannelConnectivityState
_target: str

def __init__(
self,
Expand Down Expand Up @@ -1974,6 +2048,7 @@ def __init__(
_augment_options(core_options, compression),
credentials,
)
self._target = target
self._call_state = _ChannelCallState(self._channel)
self._connectivity_state = _ChannelConnectivityState(self._channel)
cygrpc.fork_register_channel(self)
Expand Down Expand Up @@ -2013,6 +2088,7 @@ def unary_unary(
self._channel,
_channel_managed_call_management(self._call_state),
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
Expand All @@ -2031,6 +2107,7 @@ def unary_stream(
return _SingleThreadedUnaryStreamMultiCallable(
self._channel,
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
Expand All @@ -2039,6 +2116,7 @@ def unary_stream(
self._channel,
_channel_managed_call_management(self._call_state),
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
Expand All @@ -2053,6 +2131,7 @@ def stream_unary(
self._channel,
_channel_managed_call_management(self._call_state),
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
Expand All @@ -2067,6 +2146,7 @@ def stream_stream(
self._channel,
_channel_managed_call_management(self._call_state),
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
Expand Down
3 changes: 2 additions & 1 deletion src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ cdef class _CallState:
cdef set due
# call_tracer_capsule should have type of grpc._observability.ClientCallTracerCapsule
cdef object call_tracer_capsule
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name) except *
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *
cdef void maybe_delete_call_tracer(self) except *


cdef class _ChannelState:

cdef bytes target
cdef object condition
cdef grpc_channel *c_channel
# A boolean field indicating that the channel is open (if True) or is being
Expand Down
11 changes: 6 additions & 5 deletions src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,23 @@ cdef class _CallState:
return
_observability.delete_call_tracer(self.call_tracer_capsule)

cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name) except *:
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *:
# TODO(xuanwn): use channel args to exclude those metrics.
for exclude_prefix in _observability._SERVICES_TO_EXCLUDE:
if exclude_prefix in method_name:
return
with _observability.get_plugin() as plugin:
if not (plugin and plugin.observability_enabled):
return
capsule = plugin.create_client_call_tracer(method_name)
capsule = plugin.create_client_call_tracer(method_name, target)
capsule_ptr = cpython.PyCapsule_GetPointer(capsule, CLIENT_CALL_TRACER)
_set_call_tracer(self.c_call, capsule_ptr)
self.call_tracer_capsule = capsule

cdef class _ChannelState:

def __cinit__(self):
def __cinit__(self, target):
self.target = target
self.condition = threading.Condition()
self.open = True
self.integrated_call_states = {}
Expand Down Expand Up @@ -248,7 +249,7 @@ cdef void _call(
grpc_slice_unref(method_slice)
if host_slice_ptr:
grpc_slice_unref(host_slice)
call_state.maybe_set_client_call_tracer_on_call(method)
call_state.maybe_set_client_call_tracer_on_call(method, channel_state.target)
if context is not None:
set_census_context_on_call(call_state, context)
if credentials is not None:
Expand Down Expand Up @@ -473,7 +474,7 @@ cdef class Channel:
ChannelCredentials channel_credentials):
arguments = () if arguments is None else tuple(arguments)
fork_handlers_and_grpc_init()
self._state = _ChannelState()
self._state = _ChannelState(target)
self._state.c_call_completion_queue = (
grpc_completion_queue_create_for_next(NULL))
self._state.c_connectivity_completion_queue = (
Expand Down
Loading

0 comments on commit b59f610

Please sign in to comment.