Skip to content

Commit

Permalink
Merge pull request #85 from dapper91/dev
Browse files Browse the repository at this point in the history
- JSON-RPC client requests retry support added
- aio-pika integration and backend updated for aio-pika 8.0
- type aliases for middlewares added
- httpx minimal version updated due to found vulnerability
  • Loading branch information
dapper91 authored Jul 5, 2022
2 parents dc5cb36 + c72bb0d commit 27a9630
Show file tree
Hide file tree
Showing 17 changed files with 821 additions and 42 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install poetry
poetry install --no-root -E aiohttp -E flask -E httpx -E jsonschema -E pydantic -E requests \
-E docstring-parser -E werkzeug -E openapi-ui-bundles
poetry install --no-root -E test
- name: Run pre-commit hooks
run: poetry run pre-commit run --hook-stage merge-commit --all-files
- name: Run tests
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

1.6.0 (2022-07-05)
------------------

- JSON-RPC client requests retry support added
- aio-pika integration and backend updated for aio-pika 8.0
- type aliases for middlewares added
- httpx minimal version updated due to found vulnerability


1.5.0 (2022-05-22)
------------------
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@

init:
pip install poetry --upgrade
poetry install --no-root
# Updates poetry.lock in case pyproject.toml was updated for install:
poetry update
poetry install --no-root --extras test

export PYTHONWARNINGS=ignore::DeprecationWarning
test:
poetry run py.test

Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ The User Guide
pjrpc/extending
pjrpc/testing
pjrpc/tracing
pjrpc/retries
pjrpc/specification
pjrpc/webui
pjrpc/examples
Expand Down
7 changes: 7 additions & 0 deletions docs/source/pjrpc/api/client.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ Tracer
:members:


Retry
~~~~~

.. automodule:: pjrpc.client.retry
:members:


Integrations
~~~~~~~~~~~~

Expand Down
60 changes: 60 additions & 0 deletions docs/source/pjrpc/retries.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
.. _retires:

Retries
=======

``pjrpc`` supports request retries based on response code or received exception using customizable backoff strategy.
``pjrpc`` provides several built-in backoff algorithms (see :py:mod:`pjrpc.client.retry`), but you can
implement your own one like this:

.. code-block:: python
import dataclasses as dc
import random
from pjrpc.client.retry import Backoff
@dc.dataclass(frozen=True)
class RandomBackoff(Backoff):
def __call__(self) -> Iterator[float]:
return (random.random() for _ in range(self.attempts))
Retry strategy can be configured for all client requests by passing a strategy to a client constructor
as a `retry_strategy` argument or for a particular request as a `_retry_strategy` when calling `send` method.

The following example illustrate request retries api usage:

.. code-block:: python
import asyncio
import random
import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client
from pjrpc.client.retry import ExponentialBackoff, PeriodicBackoff, RetryStrategy
async def main():
default_retry_strategy = RetryStrategy(
exceptions={TimeoutError},
backoff=PeriodicBackoff(attempts=3, interval=1.0, jitter=lambda: random.gauss(mu=0.5, sigma=0.1)),
)
async with pjrpc_client.Client('http://localhost/api/v1', retry_strategy=default_retry_strategy) as client:
response = await client.send(
pjrpc.Request('sum', params=[1, 2], id=1),
_retry_strategy=RetryStrategy(
exceptions={TimeoutError},
codes={2001},
backoff=ExponentialBackoff(
attempts=3, base=1.0, factor=2.0, jitter=lambda: random.gauss(mu=0.5, sigma=0.1),
),
),
)
print(f"1 + 2 = {response.result}")
result = await client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
asyncio.run(main())
32 changes: 32 additions & 0 deletions examples/aiohttp_client_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import asyncio
import random

import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client
from pjrpc.client.retry import ExponentialBackoff, PeriodicBackoff, RetryStrategy


async def main():
default_retry_strategy = RetryStrategy(
exceptions={TimeoutError},
backoff=PeriodicBackoff(attempts=3, interval=1.0, jitter=lambda: random.gauss(mu=0.5, sigma=0.1)),
)

async with pjrpc_client.Client('http://localhost/api/v1', retry_strategy=default_retry_strategy) as client:
response = await client.send(
pjrpc.Request('sum', params=[1, 2], id=1),
_retry_strategy=RetryStrategy(
exceptions={TimeoutError},
codes={2001},
backoff=ExponentialBackoff(
attempts=3, base=1.0, factor=2.0, jitter=lambda: random.gauss(mu=0.5, sigma=0.1),
),
),
)
print(f"1 + 2 = {response.result}")

result = await client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")


asyncio.run(main())
14 changes: 11 additions & 3 deletions examples/middlewares.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
from typing import Any

from aiohttp import web

import pjrpc.server
from pjrpc.common import Request
from pjrpc.server.integration import aiohttp
from pjrpc.server.typedefs import AsyncHandlerType, ContextType, MiddlewareResponse

methods = pjrpc.server.MethodRegistry()


@methods.add(context='request')
async def method(request):
async def method(request: Any) -> None:
print("method")


async def middleware1(request, context, handler):
async def middleware1(
request: Request, context: ContextType, handler: AsyncHandlerType,
) -> MiddlewareResponse:
print("middleware1 started")
result = await handler(request, context)
print("middleware1 finished")

return result


async def middleware2(request, context, handler):
async def middleware2(
request: Request, context: ContextType, handler: AsyncHandlerType,
) -> MiddlewareResponse:
print("middleware2 started")
result = await handler(request, context)
print("middleware2 finished")
Expand Down
41 changes: 41 additions & 0 deletions examples/rabbitmq/aio_pika/client_response_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env python
"""By default, RabbitMQ JSON-RPC clients generate a temporary result queue
for their requests, but in very special cases, the client may want to choose
a specific result queue.
This example shows using a specific queue with specific properties as well."""
import asyncio
import logging

from yarl import URL

import pjrpc.client.backend.aio_pika


async def client_with_specific_queue() -> None:
"""aio_pika client demonstrating the use of a specific result_queue"""
logging.basicConfig(level=logging.INFO, format="%(message)s")

client = pjrpc.client.backend.aio_pika.Client(
broker_url=URL("amqp://guest:guest@localhost:5672/v1"),
queue_name="jsonrpc",
result_queue_name="pjrpc-aio_pika-example-jsonrpc-results",
result_queue_args={
"exclusive": True,
"auto_delete": True,
"durable": True,
"arguments": None,
},
)
await client.connect()

result = await client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")

await client.notify("tick")
await client.notify("schedule_shutdown")
await client.close()


if __name__ == "__main__":
asyncio.run(client_with_specific_queue())
2 changes: 1 addition & 1 deletion pjrpc/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
__description__ = 'Extensible JSON-RPC library'
__url__ = 'https://github.com/dapper91/pjrpc'

__version__ = '1.5.0'
__version__ = '1.6.0'

__author__ = 'Dmitry Pershin'
__email__ = '[email protected]'
Expand Down
28 changes: 14 additions & 14 deletions pjrpc/client/backend/aio_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import Any, Dict, Optional, cast

import aio_pika
from aio_pika.abc import AbstractIncomingMessage
from yarl import URL

import pjrpc
from pjrpc.client import AbstractAsyncClient
Expand All @@ -28,7 +30,7 @@ class Client(AbstractAsyncClient):

def __init__(
self,
broker_url: str,
broker_url: URL,
queue_name: Optional[str] = None,
conn_args: Optional[Dict[str, Any]] = None,
exchange_name: Optional[str] = None,
Expand All @@ -42,19 +44,19 @@ def __init__(

super().__init__(**kwargs)
self._connection = aio_pika.connection.Connection(broker_url, **(conn_args or {}))
self._channel: Optional[aio_pika.Channel] = None
self._channel: Optional[aio_pika.abc.AbstractChannel] = None

self._exchange_name = exchange_name
self._exchange_args = exchange_args
self._exchange: Optional[aio_pika.Exchange] = None
self._exchange: Optional[aio_pika.abc.AbstractExchange] = None

self._routing_key = cast(str, routing_key or queue_name)
self._result_queue_name = result_queue_name
self._result_queue_args = result_queue_args
self._result_queue: Optional[aio_pika.Queue] = None
self._result_queue: Optional[aio_pika.abc.AbstractQueue] = None
self._consumer_tag: Optional[str] = None

self._futures: Dict[str, asyncio.Future] = {}
self._futures: Dict[str, asyncio.Future[str]] = {}

async def connect(self) -> None:
"""
Expand All @@ -65,16 +67,13 @@ async def connect(self) -> None:
self._channel = channel = await self._connection.channel()

if self._exchange_name:
self._exchange = aio_pika.Exchange(
self._connection, channel, self._exchange_name, **(self._exchange_args or {})
)
await self._exchange.declare()
self._exchange = await channel.declare_exchange(self._exchange_name, **(self._exchange_args or {}))

if self._result_queue_name:
self._result_queue = aio_pika.Queue(
self._connection, channel, self._result_queue_name, **(self._result_queue_args or {})
assert channel
self._result_queue = await channel.declare_queue(
self._result_queue_name, **(self._result_queue_args or {})
)
await self._result_queue.declare()
self._consumer_tag = await self._result_queue.consume(self._on_result_message, no_ack=True)

async def close(self) -> None:
Expand All @@ -99,8 +98,9 @@ async def close(self) -> None:

future.set_exception(asyncio.CancelledError)

async def _on_result_message(self, message: aio_pika.IncomingMessage) -> None:
async def _on_result_message(self, message: AbstractIncomingMessage) -> None:
correlation_id = message.correlation_id
assert correlation_id
future = self._futures.pop(correlation_id, None)

if future is None:
Expand Down Expand Up @@ -147,7 +147,7 @@ async def _request(self, request_text: str, is_notification: bool = False, **kwa
**kwargs,
)

future: asyncio.Future = asyncio.Future()
future: asyncio.Future[str] = asyncio.Future()
self._futures[request_id] = future

try:
Expand Down
Loading

0 comments on commit 27a9630

Please sign in to comment.