Skip to content

Commit

Permalink
Merge https://github.com/grpc/grpc into scribs
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Jan 3, 2024
2 parents 858449e + b84d5ef commit d424ea2
Show file tree
Hide file tree
Showing 31 changed files with 295 additions and 1,270 deletions.
2 changes: 0 additions & 2 deletions requirements.bazel.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,3 @@ setuptools==44.1.1
xds-protos==0.0.11
absl-py==1.4.0
googleapis-common-protos==1.61.0
opentelemetry-sdk==1.21.0
opentelemetry-api==1.21.0
31 changes: 0 additions & 31 deletions src/python/grpcio/grpc/_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ class _RPCState(object):
rpc_start_time: Optional[float] # In relative seconds
rpc_end_time: Optional[float] # In relative seconds
method: Optional[str]
target: Optional[str]

def __init__(
self,
Expand Down Expand Up @@ -164,7 +163,6 @@ def __init__(
self.rpc_start_time = None
self.rpc_end_time = None
self.method = None
self.target = None

# The semantics of grpc.Future.cancel and grpc.Future.cancelled are
# slightly wonky, so they have to be tracked separately from the rest of the
Expand Down Expand Up @@ -1050,7 +1048,6 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
_channel: cygrpc.Channel
_managed_call: IntegratedCallFactory
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any
Expand All @@ -1061,14 +1058,12 @@ def __init__(
channel: cygrpc.Channel,
managed_call: IntegratedCallFactory,
method: bytes,
target: bytes,
request_serializer: Optional[SerializingFunction],
response_deserializer: Optional[DeserializingFunction],
):
self._channel = channel
self._managed_call = managed_call
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
Expand Down Expand Up @@ -1128,7 +1123,6 @@ def _blocking(
else:
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._channel.segregated_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand Down Expand Up @@ -1200,7 +1194,6 @@ def future(
event_handler = _event_handler(state, self._response_deserializer)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand All @@ -1220,7 +1213,6 @@ def future(
class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
_channel: cygrpc.Channel
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any
Expand All @@ -1230,13 +1222,11 @@ def __init__(
self,
channel: cygrpc.Channel,
method: bytes,
target: bytes,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction,
):
self._channel = channel
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
Expand Down Expand Up @@ -1288,7 +1278,6 @@ def __call__( # pylint: disable=too-many-locals
operations_and_tags = tuple((ops, None) for ops in operations)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._channel.segregated_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand All @@ -1308,7 +1297,6 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
_channel: cygrpc.Channel
_managed_call: IntegratedCallFactory
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any
Expand All @@ -1319,14 +1307,12 @@ def __init__(
channel: cygrpc.Channel,
managed_call: IntegratedCallFactory,
method: bytes,
target: bytes,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction,
):
self._channel = channel
self._managed_call = managed_call
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
Expand Down Expand Up @@ -1368,7 +1354,6 @@ def __call__( # pylint: disable=too-many-locals
)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand All @@ -1389,7 +1374,6 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
_channel: cygrpc.Channel
_managed_call: IntegratedCallFactory
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any
Expand All @@ -1400,14 +1384,12 @@ def __init__(
channel: cygrpc.Channel,
managed_call: IntegratedCallFactory,
method: bytes,
target: bytes,
request_serializer: Optional[SerializingFunction],
response_deserializer: Optional[DeserializingFunction],
):
self._channel = channel
self._managed_call = managed_call
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
Expand All @@ -1431,7 +1413,6 @@ def _blocking(
)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._channel.segregated_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand Down Expand Up @@ -1520,7 +1501,6 @@ def future(
)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand Down Expand Up @@ -1550,7 +1530,6 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
_channel: cygrpc.Channel
_managed_call: IntegratedCallFactory
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any
Expand All @@ -1561,14 +1540,12 @@ def __init__(
channel: cygrpc.Channel,
managed_call: IntegratedCallFactory,
method: bytes,
target: bytes,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
):
self._channel = channel
self._managed_call = managed_call
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
Expand Down Expand Up @@ -1602,7 +1579,6 @@ def __call__(
event_handler = _event_handler(state, self._response_deserializer)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
Expand Down Expand Up @@ -1971,7 +1947,6 @@ class Channel(grpc.Channel):
_channel: cygrpc.Channel
_call_state: _ChannelCallState
_connectivity_state: _ChannelConnectivityState
_target: str

def __init__(
self,
Expand Down Expand Up @@ -1999,7 +1974,6 @@ def __init__(
_augment_options(core_options, compression),
credentials,
)
self._target = target
self._call_state = _ChannelCallState(self._channel)
self._connectivity_state = _ChannelConnectivityState(self._channel)
cygrpc.fork_register_channel(self)
Expand Down Expand Up @@ -2039,7 +2013,6 @@ def unary_unary(
self._channel,
_channel_managed_call_management(self._call_state),
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
Expand All @@ -2058,7 +2031,6 @@ def unary_stream(
return _SingleThreadedUnaryStreamMultiCallable(
self._channel,
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
Expand All @@ -2067,7 +2039,6 @@ def unary_stream(
self._channel,
_channel_managed_call_management(self._call_state),
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
Expand All @@ -2082,7 +2053,6 @@ def stream_unary(
self._channel,
_channel_managed_call_management(self._call_state),
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
Expand All @@ -2097,7 +2067,6 @@ def stream_stream(
self._channel,
_channel_managed_call_management(self._call_state),
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
Expand Down
3 changes: 1 addition & 2 deletions src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ cdef class _CallState:
cdef set due
# call_tracer_capsule should have type of grpc._observability.ClientCallTracerCapsule
cdef object call_tracer_capsule
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name) except *
cdef void maybe_delete_call_tracer(self) except *


cdef class _ChannelState:

cdef bytes target
cdef object condition
cdef grpc_channel *c_channel
# A boolean field indicating that the channel is open (if True) or is being
Expand Down
11 changes: 5 additions & 6 deletions src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,22 @@ cdef class _CallState:
return
_observability.delete_call_tracer(self.call_tracer_capsule)

cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *:
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name) except *:
# TODO(xuanwn): use channel args to exclude those metrics.
for exclude_prefix in _observability._SERVICES_TO_EXCLUDE:
if exclude_prefix in method_name:
return
with _observability.get_plugin() as plugin:
if not (plugin and plugin.observability_enabled):
return
capsule = plugin.create_client_call_tracer(method_name, target)
capsule = plugin.create_client_call_tracer(method_name)
capsule_ptr = cpython.PyCapsule_GetPointer(capsule, CLIENT_CALL_TRACER)
_set_call_tracer(self.c_call, capsule_ptr)
self.call_tracer_capsule = capsule

cdef class _ChannelState:

def __cinit__(self, target):
self.target = target
def __cinit__(self):
self.condition = threading.Condition()
self.open = True
self.integrated_call_states = {}
Expand Down Expand Up @@ -249,7 +248,7 @@ cdef void _call(
grpc_slice_unref(method_slice)
if host_slice_ptr:
grpc_slice_unref(host_slice)
call_state.maybe_set_client_call_tracer_on_call(method, channel_state.target)
call_state.maybe_set_client_call_tracer_on_call(method)
if context is not None:
set_census_context_on_call(call_state, context)
if credentials is not None:
Expand Down Expand Up @@ -474,7 +473,7 @@ cdef class Channel:
ChannelCredentials channel_credentials):
arguments = () if arguments is None else tuple(arguments)
fork_handlers_and_grpc_init()
self._state = _ChannelState(target)
self._state = _ChannelState()
self._state.c_call_completion_queue = (
grpc_completion_queue_create_for_next(NULL))
self._state.c_connectivity_completion_queue = (
Expand Down
12 changes: 4 additions & 8 deletions src/python/grpcio/grpc/_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ObservabilityPlugin(

@abc.abstractmethod
def create_client_call_tracer(
self, method_name: bytes, target: bytes
self, method_name: bytes
) -> ClientCallTracerCapsule:
"""Creates a ClientCallTracerCapsule.
Expand All @@ -75,7 +75,6 @@ def create_client_call_tracer(
Args:
method_name: The method name of the call in byte format.
target: The channel target of the call in byte format.
Returns:
A PyCapsule which stores a ClientCallTracer object.
Expand Down Expand Up @@ -141,7 +140,7 @@ def create_server_call_tracer_factory(

@abc.abstractmethod
def record_rpc_latency(
self, method: str, target: str, rpc_latency: float, status_code: Any
self, method: str, rpc_latency: float, status_code: Any
) -> None:
"""Record the latency of the RPC.
Expand All @@ -150,8 +149,7 @@ def record_rpc_latency(
Args:
method: The fully-qualified name of the RPC method being invoked.
target: The target name of the RPC method being invoked.
rpc_latency: The latency for the RPC in seconds, equals to the time between
rpc_latency: The latency for the RPC, equals to the time between
when the client invokes the RPC and when the client receives the status.
status_code: An element of grpc.StatusCode in string format representing the
final status for the RPC.
Expand Down Expand Up @@ -282,6 +280,4 @@ def maybe_record_rpc_latency(state: "_channel._RPCState") -> None:
return
rpc_latency_s = state.rpc_end_time - state.rpc_start_time
rpc_latency_ms = rpc_latency_s * 1000
plugin.record_rpc_latency(
state.method, state.target, rpc_latency_ms, state.code
)
plugin.record_rpc_latency(state.method, rpc_latency_ms, state.code)
12 changes: 2 additions & 10 deletions src/python/grpcio_observability/_parallel_compile_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file has been automatically generated from a template file.
# Please make modifications to
# `$REPO_ROOT/templates/src/python/grpcio/_parallel_compile_patch.py.template`
# instead. This file can be regenerated from the template by running
# `tools/buildgen/generate_projects.sh`.

"""Patches the compile() to allow enable parallel compilation of C/C++.
build_ext has lots of C/C++ files and normally them one by one.
Expand All @@ -34,11 +27,10 @@
import multiprocessing

BUILD_EXT_COMPILER_JOBS = multiprocessing.cpu_count()
except ValueError:
BUILD_EXT_COMPILER_JOBS = 1


# monkey-patch for parallel compilation
# TODO(xuanwn): Use a template for this file.
def _parallel_compile(
self,
sources,
Expand All @@ -53,7 +45,7 @@ def _parallel_compile(
# setup the same way as distutils.ccompiler.CCompiler
# https://github.com/python/cpython/blob/31368a4f0e531c19affe2a1becd25fc316bc7501/Lib/distutils/ccompiler.py#L564
macros, objects, extra_postargs, pp_opts, build = self._setup_compile(
str(output_dir), macros, include_dirs, sources, depends, extra_postargs
output_dir, macros, include_dirs, sources, depends, extra_postargs
)
cc_args = self._get_cc_args(pp_opts, debug, extra_preargs)

Expand Down
Loading

0 comments on commit d424ea2

Please sign in to comment.