diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9c6a4d4..6a067c7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -25,8 +25,7 @@ jobs: run: | python -m pip install --upgrade pip pip install poetry - poetry install --no-root -E aiohttp -E flask -E httpx -E jsonschema -E pydantic -E requests \ - -E docstring-parser -E werkzeug -E openapi-ui-bundles + poetry install --no-root -E test - name: Run pre-commit hooks run: poetry run pre-commit run --hook-stage merge-commit --all-files - name: Run tests diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2196bc8..6046182 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,14 @@ Changelog ========= +1.6.0 (2022-07-05) +------------------ + +- JSON-RPC client requests retry support added +- aio-pika integration and backend updated for aio-pika 8.0 +- type aliases for middlewares added +- httpx minimal version updated due to found vulnerability + 1.5.0 (2022-05-22) ------------------ diff --git a/Makefile b/Makefile index b1315d0..21f4536 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,11 @@ init: pip install poetry --upgrade - poetry install --no-root + # Updates poetry.lock in case pyproject.toml was updated for install: + poetry update + poetry install --no-root --extras test +export PYTHONWARNINGS=ignore::DeprecationWarning test: poetry run py.test diff --git a/docs/source/index.rst b/docs/source/index.rst index 4f46a6b..4e2f8db 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -75,6 +75,7 @@ The User Guide pjrpc/extending pjrpc/testing pjrpc/tracing + pjrpc/retries pjrpc/specification pjrpc/webui pjrpc/examples diff --git a/docs/source/pjrpc/api/client.rst b/docs/source/pjrpc/api/client.rst index 6f73a1f..2833db5 100644 --- a/docs/source/pjrpc/api/client.rst +++ b/docs/source/pjrpc/api/client.rst @@ -29,6 +29,13 @@ Tracer :members: +Retry +~~~~~ + +.. automodule:: pjrpc.client.retry + :members: + + Integrations ~~~~~~~~~~~~ diff --git a/docs/source/pjrpc/retries.rst b/docs/source/pjrpc/retries.rst new file mode 100644 index 0000000..859f88b --- /dev/null +++ b/docs/source/pjrpc/retries.rst @@ -0,0 +1,60 @@ +.. _retires: + +Retries +======= + +``pjrpc`` supports request retries based on response code or received exception using customizable backoff strategy. +``pjrpc`` provides several built-in backoff algorithms (see :py:mod:`pjrpc.client.retry`), but you can +implement your own one like this: + +.. code-block:: python + + import dataclasses as dc + import random + from pjrpc.client.retry import Backoff + + @dc.dataclass(frozen=True) + class RandomBackoff(Backoff): + def __call__(self) -> Iterator[float]: + return (random.random() for _ in range(self.attempts)) + + +Retry strategy can be configured for all client requests by passing a strategy to a client constructor +as a `retry_strategy` argument or for a particular request as a `_retry_strategy` when calling `send` method. + +The following example illustrate request retries api usage: + +.. code-block:: python + + import asyncio + import random + + import pjrpc + from pjrpc.client.backend import aiohttp as pjrpc_client + from pjrpc.client.retry import ExponentialBackoff, PeriodicBackoff, RetryStrategy + + + async def main(): + default_retry_strategy = RetryStrategy( + exceptions={TimeoutError}, + backoff=PeriodicBackoff(attempts=3, interval=1.0, jitter=lambda: random.gauss(mu=0.5, sigma=0.1)), + ) + + async with pjrpc_client.Client('http://localhost/api/v1', retry_strategy=default_retry_strategy) as client: + response = await client.send( + pjrpc.Request('sum', params=[1, 2], id=1), + _retry_strategy=RetryStrategy( + exceptions={TimeoutError}, + codes={2001}, + backoff=ExponentialBackoff( + attempts=3, base=1.0, factor=2.0, jitter=lambda: random.gauss(mu=0.5, sigma=0.1), + ), + ), + ) + print(f"1 + 2 = {response.result}") + + result = await client.proxy.sum(1, 2) + print(f"1 + 2 = {result}") + + + asyncio.run(main()) diff --git a/examples/aiohttp_client_retry.py b/examples/aiohttp_client_retry.py new file mode 100644 index 0000000..72c54fa --- /dev/null +++ b/examples/aiohttp_client_retry.py @@ -0,0 +1,32 @@ +import asyncio +import random + +import pjrpc +from pjrpc.client.backend import aiohttp as pjrpc_client +from pjrpc.client.retry import ExponentialBackoff, PeriodicBackoff, RetryStrategy + + +async def main(): + default_retry_strategy = RetryStrategy( + exceptions={TimeoutError}, + backoff=PeriodicBackoff(attempts=3, interval=1.0, jitter=lambda: random.gauss(mu=0.5, sigma=0.1)), + ) + + async with pjrpc_client.Client('http://localhost/api/v1', retry_strategy=default_retry_strategy) as client: + response = await client.send( + pjrpc.Request('sum', params=[1, 2], id=1), + _retry_strategy=RetryStrategy( + exceptions={TimeoutError}, + codes={2001}, + backoff=ExponentialBackoff( + attempts=3, base=1.0, factor=2.0, jitter=lambda: random.gauss(mu=0.5, sigma=0.1), + ), + ), + ) + print(f"1 + 2 = {response.result}") + + result = await client.proxy.sum(1, 2) + print(f"1 + 2 = {result}") + + +asyncio.run(main()) diff --git a/examples/middlewares.py b/examples/middlewares.py index 6fd2dea..78bec00 100644 --- a/examples/middlewares.py +++ b/examples/middlewares.py @@ -1,17 +1,23 @@ +from typing import Any + from aiohttp import web import pjrpc.server +from pjrpc.common import Request from pjrpc.server.integration import aiohttp +from pjrpc.server.typedefs import AsyncHandlerType, ContextType, MiddlewareResponse methods = pjrpc.server.MethodRegistry() @methods.add(context='request') -async def method(request): +async def method(request: Any) -> None: print("method") -async def middleware1(request, context, handler): +async def middleware1( + request: Request, context: ContextType, handler: AsyncHandlerType, +) -> MiddlewareResponse: print("middleware1 started") result = await handler(request, context) print("middleware1 finished") @@ -19,7 +25,9 @@ async def middleware1(request, context, handler): return result -async def middleware2(request, context, handler): +async def middleware2( + request: Request, context: ContextType, handler: AsyncHandlerType, +) -> MiddlewareResponse: print("middleware2 started") result = await handler(request, context) print("middleware2 finished") diff --git a/examples/rabbitmq/aio_pika/client_response_queue.py b/examples/rabbitmq/aio_pika/client_response_queue.py new file mode 100755 index 0000000..8206760 --- /dev/null +++ b/examples/rabbitmq/aio_pika/client_response_queue.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +"""By default, RabbitMQ JSON-RPC clients generate a temporary result queue +for their requests, but in very special cases, the client may want to choose +a specific result queue. + +This example shows using a specific queue with specific properties as well.""" +import asyncio +import logging + +from yarl import URL + +import pjrpc.client.backend.aio_pika + + +async def client_with_specific_queue() -> None: + """aio_pika client demonstrating the use of a specific result_queue""" + logging.basicConfig(level=logging.INFO, format="%(message)s") + + client = pjrpc.client.backend.aio_pika.Client( + broker_url=URL("amqp://guest:guest@localhost:5672/v1"), + queue_name="jsonrpc", + result_queue_name="pjrpc-aio_pika-example-jsonrpc-results", + result_queue_args={ + "exclusive": True, + "auto_delete": True, + "durable": True, + "arguments": None, + }, + ) + await client.connect() + + result = await client.proxy.sum(1, 2) + print(f"1 + 2 = {result}") + + await client.notify("tick") + await client.notify("schedule_shutdown") + await client.close() + + +if __name__ == "__main__": + asyncio.run(client_with_specific_queue()) diff --git a/pjrpc/__about__.py b/pjrpc/__about__.py index 8a2a4ae..2805763 100644 --- a/pjrpc/__about__.py +++ b/pjrpc/__about__.py @@ -2,7 +2,7 @@ __description__ = 'Extensible JSON-RPC library' __url__ = 'https://github.com/dapper91/pjrpc' -__version__ = '1.5.0' +__version__ = '1.6.0' __author__ = 'Dmitry Pershin' __email__ = 'dapper91@mail.ru' diff --git a/pjrpc/client/backend/aio_pika.py b/pjrpc/client/backend/aio_pika.py index 5d2af2d..cddc0c8 100644 --- a/pjrpc/client/backend/aio_pika.py +++ b/pjrpc/client/backend/aio_pika.py @@ -4,6 +4,8 @@ from typing import Any, Dict, Optional, cast import aio_pika +from aio_pika.abc import AbstractIncomingMessage +from yarl import URL import pjrpc from pjrpc.client import AbstractAsyncClient @@ -28,7 +30,7 @@ class Client(AbstractAsyncClient): def __init__( self, - broker_url: str, + broker_url: URL, queue_name: Optional[str] = None, conn_args: Optional[Dict[str, Any]] = None, exchange_name: Optional[str] = None, @@ -42,19 +44,19 @@ def __init__( super().__init__(**kwargs) self._connection = aio_pika.connection.Connection(broker_url, **(conn_args or {})) - self._channel: Optional[aio_pika.Channel] = None + self._channel: Optional[aio_pika.abc.AbstractChannel] = None self._exchange_name = exchange_name self._exchange_args = exchange_args - self._exchange: Optional[aio_pika.Exchange] = None + self._exchange: Optional[aio_pika.abc.AbstractExchange] = None self._routing_key = cast(str, routing_key or queue_name) self._result_queue_name = result_queue_name self._result_queue_args = result_queue_args - self._result_queue: Optional[aio_pika.Queue] = None + self._result_queue: Optional[aio_pika.abc.AbstractQueue] = None self._consumer_tag: Optional[str] = None - self._futures: Dict[str, asyncio.Future] = {} + self._futures: Dict[str, asyncio.Future[str]] = {} async def connect(self) -> None: """ @@ -65,16 +67,13 @@ async def connect(self) -> None: self._channel = channel = await self._connection.channel() if self._exchange_name: - self._exchange = aio_pika.Exchange( - self._connection, channel, self._exchange_name, **(self._exchange_args or {}) - ) - await self._exchange.declare() + self._exchange = await channel.declare_exchange(self._exchange_name, **(self._exchange_args or {})) if self._result_queue_name: - self._result_queue = aio_pika.Queue( - self._connection, channel, self._result_queue_name, **(self._result_queue_args or {}) + assert channel + self._result_queue = await channel.declare_queue( + self._result_queue_name, **(self._result_queue_args or {}) ) - await self._result_queue.declare() self._consumer_tag = await self._result_queue.consume(self._on_result_message, no_ack=True) async def close(self) -> None: @@ -99,8 +98,9 @@ async def close(self) -> None: future.set_exception(asyncio.CancelledError) - async def _on_result_message(self, message: aio_pika.IncomingMessage) -> None: + async def _on_result_message(self, message: AbstractIncomingMessage) -> None: correlation_id = message.correlation_id + assert correlation_id future = self._futures.pop(correlation_id, None) if future is None: @@ -147,7 +147,7 @@ async def _request(self, request_text: str, is_notification: bool = False, **kwa **kwargs, ) - future: asyncio.Future = asyncio.Future() + future: asyncio.Future[str] = asyncio.Future() self._futures[request_id] = future try: diff --git a/pjrpc/client/client.py b/pjrpc/client/client.py index d4c7aeb..c768c40 100644 --- a/pjrpc/client/client.py +++ b/pjrpc/client/client.py @@ -6,7 +6,8 @@ from typing import Any, Awaitable, Callable, Dict, Generator, Iterable, Optional, Tuple, Type, Union, cast from pjrpc import AbstractRequest, AbstractResponse, BatchRequest, BatchResponse, Request, Response, common -from pjrpc.common import exceptions, generators, v20 +from pjrpc.client import retry +from pjrpc.common import UNSET, UnsetType, exceptions, generators, v20 from pjrpc.common.typedefs import JsonRpcRequestId, MethodType from .tracer import Tracer @@ -264,13 +265,16 @@ class BaseAbstractClient(abc.ABC): :param response_class: response class :param batch_request_class: batch request class :param batch_response_class: batch response class + :param error_cls: JSON-RPC error base class :param id_gen_impl: identifier generator :param json_loader: json loader :param json_dumper: json dumper :param json_encoder: json encoder :param json_decoder: json decoder - :param error_cls: JSON-RPC error base class :param strict: if ``True`` checks that a request and a response identifiers match + :param request_args: backend request argument + :param tracers: request tracers list + :param retry_strategy: request retry strategy """ class Proxy: @@ -301,6 +305,7 @@ def __init__( strict: bool = True, request_args: Optional[Dict[str, Any]] = None, tracers: Iterable[Tracer] = (), + retry_strategy: Optional[retry.RetryStrategy] = None, ): self.request_class = request_class self.response_class = response_class @@ -315,6 +320,7 @@ def __init__( self.strict = strict self._request_args = request_args or {} self._tracers = tracers + self._retry_strategy = retry_strategy def __call__( self, @@ -448,7 +454,11 @@ def call( return response.result def send( - self, request: Request, _trace_ctx: SimpleNamespace = SimpleNamespace(), **kwargs: Any, + self, + request: Request, + _trace_ctx: SimpleNamespace = SimpleNamespace(), + _retry_strategy: Union[UnsetType, retry.RetryStrategy] = UNSET, + **kwargs: Any, ) -> Optional[Response]: """ Sends a JSON-RPC request. @@ -456,6 +466,7 @@ def send( :param request: request instance :param kwargs: additional client request argument :param _trace_ctx: tracers request context + :param _retry_strategy: request retry strategy :returns: response instance """ @@ -465,6 +476,7 @@ def send( response_class=self.response_class, validator=self._relate, _trace_ctx=_trace_ctx, + _retry_strategy=_retry_strategy, **kwargs, ), ) @@ -477,6 +489,10 @@ def wrapper( _trace_ctx: SimpleNamespace, **kwargs: Any, ) -> Optional[AbstractResponse]: + """ + Adds tracing logic to the method. + """ + for tracer in self._tracers: tracer.on_request_begin(_trace_ctx, request) @@ -494,6 +510,31 @@ def wrapper( return wrapper + def retried(method: Callable[..., Any]) -> Callable[..., Any]: + @ft.wraps(method) + def wrapper( + self: 'AbstractClient', + request: AbstractRequest, + _retry_strategy: Union[UnsetType, retry.RetryStrategy] = UNSET, + **kwargs: Any, + ) -> Optional[AbstractResponse]: + """ + Adds retrying logic to the method. + """ + + retry_strategy = self._retry_strategy if isinstance(_retry_strategy, UnsetType) else _retry_strategy + if retry_strategy: + wrapped_method = retry.retry(method, retry_strategy) + else: + wrapped_method = method + + response = wrapped_method(self, request, **kwargs) + + return response + + return wrapper + + @retried @traced def _send( self, @@ -545,7 +586,11 @@ async def _request(self, request_text: str, is_notification: bool = False, **kwa """ async def send( - self, request: Request, _trace_ctx: SimpleNamespace = SimpleNamespace(), **kwargs: Any, + self, + request: Request, + _trace_ctx: SimpleNamespace = SimpleNamespace(), + _retry_strategy: Union[UnsetType, retry.RetryStrategy] = UNSET, + **kwargs: Any, ) -> Optional[Response]: """ Sends a JSON-RPC request. @@ -553,12 +598,18 @@ async def send( :param request: request instance :param kwargs: additional client request argument :param _trace_ctx: tracers request context + :param _retry_strategy: request retry strategy :returns: response instance """ return cast( Response, await self._send( - request, _trace_ctx=_trace_ctx, response_class=self.response_class, validator=self._relate, **kwargs, + request, + _trace_ctx=_trace_ctx, + _retry_strategy=_retry_strategy, + response_class=self.response_class, + validator=self._relate, + **kwargs, ), ) @@ -570,6 +621,10 @@ async def wrapper( _trace_ctx: SimpleNamespace = SimpleNamespace(), **kwargs: Any, ) -> Response: + """ + Adds tracing logic to the method. + """ + for tracer in self._tracers: tracer.on_request_begin(_trace_ctx, request) @@ -587,6 +642,31 @@ async def wrapper( return wrapper + def retried(method: Callable[..., Awaitable[Any]]) -> Callable[..., Any]: + @ft.wraps(method) + async def wrapper( + self: 'AbstractClient', + request: AbstractRequest, + _retry_strategy: Union[UnsetType, retry.RetryStrategy] = UNSET, + **kwargs: Any, + ) -> Optional[AbstractResponse]: + """ + Adds retrying logic to the method. + """ + + retry_strategy = self._retry_strategy if isinstance(_retry_strategy, UnsetType) else _retry_strategy + if retry_strategy: + wrapped_method = retry.retry_async(method, retry_strategy) + else: + wrapped_method = method + + response = await wrapped_method(self, request, **kwargs) + + return response + + return wrapper + + @retried @traced async def _send( self, diff --git a/pjrpc/client/retry.py b/pjrpc/client/retry.py new file mode 100644 index 0000000..d70ce77 --- /dev/null +++ b/pjrpc/client/retry.py @@ -0,0 +1,195 @@ +import asyncio +import dataclasses as dc +import itertools as it +import logging +import time +from typing import Any, Awaitable, Callable, Generator, Iterator, Optional, Set, Type + +from pjrpc.common import AbstractResponse + +logger = logging.getLogger(__package__) + +Jitter = Callable[[], float] + + +@dc.dataclass(frozen=True) +class Backoff: + """ + JSON-RPC request retry strategy. + + :param attempts: retries number + :param jitter: retry delay jitter generator + """ + + attempts: int + jitter: Jitter = lambda: 0.0 + + def __call__(self) -> Iterator[float]: + """ + Returns delay iterator. + """ + + raise NotImplementedError + + +@dc.dataclass(frozen=True) +class PeriodicBackoff(Backoff): + """ + Periodic request retry strategy. + + :param interval: retry delay + """ + + interval: float = 1.0 + + def __call__(self) -> Iterator[float]: + def gen() -> Generator[float, None, None]: + for _ in range(self.attempts): + yield self.interval + self.jitter() + + return gen() + + +@dc.dataclass(frozen=True) +class ExponentialBackoff(Backoff): + """ + Exponential request retry strategy. + + :param base: exponentially growing delay base + :param factor: exponentially growing delay factor (multiplier) + :param max_value: delay max value + """ + + base: float = 1.0 + factor: float = 2.0 + max_value: Optional[float] = None + + def __call__(self) -> Iterator[float]: + def gen() -> Generator[float, None, None]: + for n, base in enumerate(it.repeat(self.base, self.attempts)): + value = base * (self.factor ** n) + self.jitter() + yield min(self.max_value, value) if self.max_value is not None else value + + return gen() + + +@dc.dataclass(frozen=True) +class FibonacciBackoff(Backoff): + """ + Fibonacci request retry strategy. + + :param multiplier: fibonacci interval sequence multiplier + :param max_value: delay max value + """ + + multiplier: float = 1.0 + max_value: float = 1.0 + + def __call__(self) -> Iterator[float]: + def gen() -> Generator[float, None, None]: + prev, cur = 1, 1 + + for _ in range(self.attempts): + value = cur * self.multiplier + self.jitter() + yield min(self.max_value, value) if self.max_value is not None else value + + tmp = cur + cur = prev + cur + prev = tmp + + return gen() + + +@dc.dataclass(frozen=True) +class RetryStrategy: + """ + JSON-RPC request retry strategy. + + :param backoff: backoff delay generator + :param codes: JSON-RPC response codes receiving which the request will be retried + :param exceptions: exceptions catching which the request will be retried + """ + + backoff: Backoff + codes: Optional[Set[int]] = None + exceptions: Optional[Set[Type[Exception]]] = None + + +def retry( + func: Callable[..., AbstractResponse], + retry_strategy: RetryStrategy, +) -> Callable[..., AbstractResponse]: + """ + Synchronous function retry decorator. + + :param func: function to be retried + :param retry_strategy: retry strategy to be applied + :return: decorated function + """ + + def wrapped(*args: Any, **kwargs: Any) -> AbstractResponse: + delays = retry_strategy.backoff() + + for attempt in it.count(start=1): + try: + response = func(*args, **kwargs) + if response.is_error and retry_strategy.codes and response.get_error().code in retry_strategy.codes: + delay = next(delays, None) + if delay is not None: + logger.debug("retrying request: attempt=%d, code=%s", attempt, response.error) + time.sleep(delay) + continue + + return response + + except tuple(retry_strategy.exceptions or {}) as e: + delay = next(delays, None) + if delay is not None: + logger.debug("retrying request: attempt=%d, exception=%r", attempt, e) + time.sleep(delay) + else: + raise e + else: + raise AssertionError("unreachable") + + return wrapped + + +def retry_async( + func: Callable[..., Awaitable[AbstractResponse]], + retry_strategy: RetryStrategy, +) -> Callable[..., Awaitable[AbstractResponse]]: + """ + Asynchronous function retry decorator. + + :param func: function to be retried + :param retry_strategy: retry strategy to be applied + :return: decorated function + """ + + async def wrapped(*args: Any, **kwargs: Any) -> AbstractResponse: + delays = retry_strategy.backoff() + + for attempt in it.count(start=1): + try: + response = await func(*args, **kwargs) + if response.is_error and retry_strategy.codes and response.get_error().code in retry_strategy.codes: + delay = next(delays, None) + if delay is not None: + logger.debug("retrying request: attempt=%d, code=%s", attempt, response.error) + await asyncio.sleep(delay) + continue + + return response + + except tuple(retry_strategy.exceptions or {}) as e: + delay = next(delays, None) + if delay is not None: + logger.debug("retrying request: attempt=%d, exception=%r", attempt, e) + await asyncio.sleep(delay) + else: + raise e + else: + raise AssertionError("unreachable") + + return wrapped diff --git a/pjrpc/server/integration/aio_pika.py b/pjrpc/server/integration/aio_pika.py index 936f3b2..ca3fa97 100644 --- a/pjrpc/server/integration/aio_pika.py +++ b/pjrpc/server/integration/aio_pika.py @@ -2,6 +2,7 @@ from typing import Any, Dict, Optional import aio_pika +from yarl import URL import pjrpc.server @@ -18,15 +19,15 @@ class Executor: :param kwargs: dispatcher additional arguments """ - def __init__(self, broker_url: str, queue_name: str, prefetch_count: int = 0, **kwargs: Any): + def __init__(self, broker_url: URL, queue_name: str, prefetch_count: int = 0, **kwargs: Any): self._broker_url = broker_url self._queue_name = queue_name self._prefetch_count = prefetch_count self._connection = aio_pika.connection.Connection(broker_url) - self._channel: Optional[aio_pika.Channel] = None + self._channel: Optional[aio_pika.abc.AbstractChannel] = None - self._queue: Optional[aio_pika.Queue] = None + self._queue: Optional[aio_pika.abc.AbstractQueue] = None self._consumer_tag: Optional[str] = None self._dispatcher = pjrpc.server.AsyncDispatcher(**kwargs) @@ -65,7 +66,7 @@ async def shutdown(self) -> None: await self._connection.close() - async def _rpc_handle(self, message: aio_pika.IncomingMessage) -> None: + async def _rpc_handle(self, message: aio_pika.abc.AbstractIncomingMessage) -> None: """ Handles JSON-RPC request. diff --git a/pjrpc/server/typedefs.py b/pjrpc/server/typedefs.py index 094af34..75db74c 100644 --- a/pjrpc/server/typedefs.py +++ b/pjrpc/server/typedefs.py @@ -6,33 +6,53 @@ from pjrpc.common import Request, Response, UnsetType __all__ = [ - 'AsyncMiddlewareType', 'AsyncErrorHandlerType', + 'AsyncMiddlewareType', + 'AsyncHandlerType', + 'MiddlewareResponse', 'MiddlewareType', 'ErrorHandlerType', + 'ResponseOrUnset', + 'ContextType', ] + +ContextType = Optional[Any] +'''Context argument for RPC methods and middlewares''' # for sphinx autodoc + +ResponseOrUnset = Union[UnsetType, Response] +'''Return value of RPC handlers and middlewares''' # for sphinx autodoc + +AsyncHandlerType = Callable[[Request, ContextType], Awaitable[ResponseOrUnset]] +'''Async RPC handler method, passed to middlewares''' # for sphinx autodoc + +HandlerType = Callable[[Request, ContextType], ResponseOrUnset] +'''Blocking RPC handler method, passed to middlewares''' # for sphinx autodoc + +MiddlewareResponse = Union[UnsetType, Response] +'''middlewares and handlers return Response or UnsetType''' # for sphinx autodoc + AsyncMiddlewareType = Callable[ - [Request, Optional[Any], Callable[[Request, Optional[Any]], Union[UnsetType, Response]]], - Awaitable[Union[UnsetType, Response]], + [Request, ContextType, AsyncHandlerType], + Awaitable[MiddlewareResponse], ] '''Asynchronous middleware type''' # for sphinx autodoc AsyncErrorHandlerType = Callable[ - [Request, Optional[Any], pjrpc.exceptions.JsonRpcError], + [Request, ContextType, pjrpc.exceptions.JsonRpcError], Awaitable[pjrpc.exceptions.JsonRpcError], ] '''Asynchronous server error handler''' # for sphinx autodoc MiddlewareType = Callable[ - [Request, Optional[Any], Callable[[Request, Optional[Any]], Union[UnsetType, Response]]], - Union[UnsetType, Response], + [Request, ContextType, HandlerType], + MiddlewareResponse, ] '''Synchronous middleware type''' # for sphinx autodoc ErrorHandlerType = Callable[ - [Request, Optional[Any], pjrpc.exceptions.JsonRpcError], + [Request, ContextType, pjrpc.exceptions.JsonRpcError], pjrpc.exceptions.JsonRpcError, ] '''Synchronous server error handler''' # for sphinx autodoc diff --git a/pyproject.toml b/pyproject.toml index 96d9e3b..aba2b6f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pjrpc" -version = "1.5.0" +version = "1.6.0" description = "Extensible JSON-RPC library" authors = ["Dmitry Pershin "] license = "Unlicense" @@ -35,13 +35,13 @@ classifiers = [ [tool.poetry.dependencies] python = "^3.7" -aio-pika = { version = "^6.8", optional = true } +aio-pika = { version = "^8.0", optional = true } aiofiles = { version = "^0.7", optional = true } aiohttp = { version = "^3.7", optional = true } django = { version = "^3.0", optional = true } docstring-parser = { version = "^0.8", optional = true } flask = { version = ">=1.1.3", optional = true } -httpx = { version = "^0.17", optional = true } +httpx = { version = "^0.23.0", optional = true } jsonschema = { version = "^3.0", optional = true } kombu = { version = "^5.1", optional = true } markupsafe = { version = "==2.0.1", optional = true } @@ -58,13 +58,14 @@ aiohttp = ['aiohttp'] django = ['django'] docstring-parser = ['docstring-parser'] flask = ['flask', 'markupsafe'] -httpx = ['requests'] +httpx = ['httpx'] jsonschema = ['jsonschema'] kombu = ['kombu'] openapi-ui-bundles = ['openapi-ui-bundles'] pydantic = ['pydantic'] requests = ['requests'] starlette = ['starlette', 'aiofiles'] +test = ['docstring-parser', 'flask', 'jsonschema', 'openapi-ui-bundles', 'pydantic', 'werkzeug'] werkzeug = ['werkzeug'] docgen = ['sphinx', 'aiohttp', 'aio-pika', 'flask', 'jsonschema', 'pydantic', 'requests', 'kombu'] @@ -77,7 +78,7 @@ pytest-aiohttp = "^0.3" pytest-cov = "^2.0" pytest-mock = "^1.0" responses = "^0.14" -respx = "^0.16" +respx = "^0.19.2" mypy = "^0.942" pre-commit = "^2.19" @@ -151,3 +152,7 @@ ignore_missing_imports = true [[tool.mypy.overrides]] module = "requests.*" ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "yarl.*" +ignore_missing_imports = true diff --git a/tests/client/test_retry.py b/tests/client/test_retry.py new file mode 100644 index 0000000..714ff0a --- /dev/null +++ b/tests/client/test_retry.py @@ -0,0 +1,319 @@ +import pytest +import responses +from aioresponses import aioresponses + +import pjrpc +from pjrpc.client import retry +from pjrpc.client.backend import aiohttp as aiohttp_backend +from pjrpc.client.backend import requests as requests_backend +from pjrpc.common import UNSET + + +@pytest.mark.parametrize( + 'strategy, expected', + [ + ( + retry.PeriodicBackoff(attempts=5, interval=1.0, jitter=lambda: 0.1), + (1.1, 1.1, 1.1, 1.1, 1.1), + ), + ( + retry.ExponentialBackoff(attempts=5, base=1.0, factor=2.0, max_value=10.0, jitter=lambda: -0.2), + (0.8, 1.8, 3.8, 7.8, 10.0), + ), + ( + retry.FibonacciBackoff(attempts=5, multiplier=2, max_value=10.0), + (2.0, 4.0, 6.0, 10.0, 10.0), + ), + ], +) +def test_retry_strategies(strategy, expected): + assert tuple(strategy()) == expected + + +@pytest.mark.parametrize( + 'resp_code, resp_errors, retry_codes, retry_attempts, success', + [ + (2001, 2, {2000, 2001}, 2, True), + (2000, 2, {2000}, 2, True), + (2000, 2, {2001}, 2, False), + (2000, 1, {2000}, 2, True), + (2000, 3, {2000}, 2, False), + (2000, 1, {}, 2, False), + (2000, 0, {}, 0, True), + ], +) +async def test_async_client_retry_strategy_by_code(resp_code, resp_errors, retry_codes, retry_attempts, success): + with aioresponses() as mock: + test_url = 'http://test.com/api' + expected_result = 'result' + + resp_success = dict( + url=test_url, + payload={"jsonrpc": "2.0", "result": expected_result, "id": 1}, + ) + resp_error = dict( + url=test_url, + payload={"jsonrpc": "2.0", "error": {"code": resp_code, "message": "error"}, "id": 1}, + ) + + client = aiohttp_backend.Client( + url=test_url, + retry_strategy=retry.RetryStrategy( + codes=retry_codes, + backoff=retry.PeriodicBackoff(attempts=retry_attempts, interval=0.0), + ), + ) + + for _ in range(resp_errors): + mock.post(**resp_error) + mock.post(**resp_success) + + if success: + actual_result = await client.proxy.method() + assert actual_result == expected_result + else: + with pytest.raises(pjrpc.exceptions.JsonRpcError) as err: + await client.proxy.method() + + assert err.value.code == resp_code + + +@pytest.mark.parametrize( + 'resp_exc, resp_errors, retry_exc, retry_attempts, success', + [ + (ConnectionError, 2, {TimeoutError, ConnectionError}, 2, True), + (TimeoutError, 2, {TimeoutError}, 2, True), + (TimeoutError, 2, {ConnectionError}, 2, False), + (TimeoutError, 1, {TimeoutError}, 2, True), + (TimeoutError, 3, {TimeoutError}, 2, False), + (TimeoutError, 1, {}, 2, False), + (TimeoutError, 0, {}, 0, True), + ], +) +async def test_async_client_retry_strategy_by_exception(resp_exc, resp_errors, retry_exc, retry_attempts, success): + with aioresponses() as mock: + test_url = 'http://test.com/api' + expected_result = 'result' + + resp_success = dict( + url=test_url, + payload={"jsonrpc": "2.0", "result": expected_result, "id": 1}, + ) + resp_error = dict( + url=test_url, + exception=resp_exc(), + ) + + client = aiohttp_backend.Client( + url=test_url, + retry_strategy=retry.RetryStrategy( + exceptions=retry_exc, + backoff=retry.PeriodicBackoff(attempts=retry_attempts, interval=0.0), + ), + ) + + for _ in range(resp_errors): + mock.post(**resp_error) + mock.post(**resp_success) + + if success: + actual_result = await client.proxy.method() + assert actual_result == expected_result + else: + with pytest.raises(resp_exc): + await client.proxy.method() + + +@pytest.mark.parametrize( + 'resp_code, resp_errors, retry_codes, retry_attempts, success', + [ + (2001, 2, {2000, 2001}, 2, True), + (2000, 2, {2000}, 2, True), + (2000, 2, {2001}, 2, False), + (2000, 1, {2000}, 2, True), + (2000, 3, {2000}, 2, False), + (2000, 1, {}, 2, False), + (2000, 0, {}, 0, True), + ], +) +@responses.activate +def test_client_retry_strategy_by_code(resp_code, resp_errors, retry_codes, retry_attempts, success): + test_url = 'http://test.com/api' + expected_result = 'result' + + resp_success = responses.Response( + method=responses.POST, + url=test_url, + status=200, + json={"jsonrpc": "2.0", "result": expected_result, "id": 1}, + ) + resp_error = responses.Response( + method=responses.POST, + url=test_url, + status=200, + json={"jsonrpc": "2.0", "error": {"code": resp_code, "message": "error"}, "id": 1}, + ) + + client = requests_backend.Client( + url=test_url, + retry_strategy=retry.RetryStrategy( + codes=retry_codes, + backoff=retry.PeriodicBackoff(attempts=retry_attempts, interval=0.0), + ), + ) + + for _ in range(resp_errors): + responses.add(resp_error) + responses.add(resp_success) + + if success: + actual_result = client.proxy.method() + assert actual_result == expected_result + else: + with pytest.raises(pjrpc.exceptions.JsonRpcError) as err: + client.proxy.method() + + assert err.value.code == resp_code + + +@pytest.mark.parametrize( + 'resp_exc, resp_errors, retry_exc, retry_attempts, success', + [ + (ConnectionError, 2, {TimeoutError, ConnectionError}, 2, True), + (TimeoutError, 2, {TimeoutError}, 2, True), + (TimeoutError, 2, {ConnectionError}, 2, False), + (TimeoutError, 1, {TimeoutError}, 2, True), + (TimeoutError, 3, {TimeoutError}, 2, False), + (TimeoutError, 1, {}, 2, False), + (TimeoutError, 0, {}, 0, True), + ], +) +@responses.activate +def test_client_retry_strategy_by_exception(resp_exc, resp_errors, retry_exc, retry_attempts, success): + test_url = 'http://test.com/api' + expected_result = 'result' + + resp_success = responses.Response( + method=responses.POST, + url=test_url, + status=200, + json={"jsonrpc": "2.0", "result": expected_result, "id": 1}, + ) + resp_error = responses.Response( + method=responses.POST, + url=test_url, + status=200, + body=resp_exc(), + ) + + client = requests_backend.Client( + url=test_url, + retry_strategy=retry.RetryStrategy( + exceptions=retry_exc, + backoff=retry.PeriodicBackoff(attempts=retry_attempts, interval=0.0), + ), + ) + + for _ in range(resp_errors): + responses.add(resp_error) + responses.add(resp_success) + + if success: + actual_result = client.proxy.method() + assert actual_result == expected_result + else: + with pytest.raises(resp_exc): + client.proxy.method() + + +@responses.activate +def test_client_retry_strategy_by_code_and_exception(): + test_url = 'http://test.com/api' + expected_result = 'result' + + client = requests_backend.Client( + url=test_url, + retry_strategy=retry.RetryStrategy( + codes={2000}, + exceptions={TimeoutError}, + backoff=retry.PeriodicBackoff(attempts=2, interval=0.0), + ), + ) + + responses.add( + responses.Response( + method=responses.POST, + url=test_url, + status=200, + body=TimeoutError(), + ), + ) + responses.add( + responses.Response( + method=responses.POST, + url=test_url, + status=200, + json={"jsonrpc": "2.0", "error": {"code": 2000, "message": "error"}, "id": 1}, + ), + ) + responses.add( + responses.Response( + method=responses.POST, + url=test_url, + status=200, + json={"jsonrpc": "2.0", "result": expected_result, "id": 1}, + ), + ) + + actual_result = client.proxy.method() + assert actual_result == expected_result + + +@pytest.mark.parametrize( + 'resp_code, default_retry_codes, request_retry_codes, success', + [ + (2001, None, None, False), + (2001, {2001}, None, True), + (2001, None, {2001}, True), + (2001, {2001}, {2002}, False), + (2001, {2002}, {2001}, True), + ], +) +@responses.activate +def test_request_retry_strategy(resp_code, default_retry_codes, request_retry_codes, success): + test_url = 'http://test.com/api' + expected_result = 'result' + + resp_success = responses.Response( + method=responses.POST, + url=test_url, + status=200, + json={"jsonrpc": "2.0", "result": expected_result, "id": 1}, + ) + resp_error = responses.Response( + method=responses.POST, + url=test_url, + status=200, + json={"jsonrpc": "2.0", "error": {"code": resp_code, "message": "error"}, "id": 1}, + ) + + default_retry_strategy = retry.RetryStrategy( + codes=default_retry_codes, + backoff=retry.PeriodicBackoff(attempts=1, interval=0.0), + ) if default_retry_codes else None + + client = requests_backend.Client(url=test_url, retry_strategy=default_retry_strategy) + + responses.add(resp_error) + responses.add(resp_success) + + request_retry_strategy = retry.RetryStrategy( + codes=request_retry_codes, + backoff=retry.PeriodicBackoff(attempts=1, interval=0.0), + ) if request_retry_codes else UNSET + + actual_result = client.send(pjrpc.Request('method', id=1), _retry_strategy=request_retry_strategy) + if success: + assert actual_result == pjrpc.Response(id=1, result=expected_result) + else: + assert actual_result == pjrpc.Response(id=1, error=pjrpc.exc.JsonRpcError(code=resp_code, message="error"))