From 3cf5b219012515185fe696b822d7a50c09c47ff0 Mon Sep 17 00:00:00 2001 From: alexandre menezes Date: Sun, 11 Dec 2022 18:12:38 -0300 Subject: [PATCH 1/2] fix watch connection error --- discovery/__init__.py | 2 +- discovery/client.py | 24 ++++++++++++++++-------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/discovery/__init__.py b/discovery/__init__.py index 86b35b9..367ee30 100644 --- a/discovery/__init__.py +++ b/discovery/__init__.py @@ -13,7 +13,7 @@ ) from .client import Consul -__version__ = "1.0.0" +__version__ = "1.0.1" __all__ = [ "Consul", "HealthState", diff --git a/discovery/client.py b/discovery/client.py index 77dc19b..71f3edb 100644 --- a/discovery/client.py +++ b/discovery/client.py @@ -50,6 +50,10 @@ def __init__( ) self._leader_id: Optional[str] = None + @property + def current_leader_id(self) -> Optional[str]: + return self._leader_id + async def leader_ip(self, *args, **kwargs) -> str: try: current_leader = await self.status.leader(*args, **kwargs) @@ -89,6 +93,7 @@ async def register( **kwargs, ) -> None: self._leader_id = await self.leader_id(**kwargs) + log.debug(f"Consul leader id: [current='{self.current_leader_id}']") try: await self.agent.service.register(service.dict(), **kwargs) except Exception as err: @@ -97,26 +102,29 @@ async def register( if enable_watch: loop = asyncio.get_running_loop() loop.create_task( - self._watch_connection(service, enable_watch, **kwargs), + self._watch_connection(service, **kwargs), name="discovery-client-watch-connection", ) async def deregister(self, service_id: str, ns: Optional[str] = None) -> None: await self.agent.service.deregister(service_id, ns) - async def reconnect(self, service: Service, *args, **kwargs) -> None: + async def reconnect(self, service: Service, **kwargs) -> None: await self.deregister(service.id) # type: ignore - await self.register(service, *args, **kwargs) + await self.register(service, **kwargs) - async def _watch_connection(self, service: Service, *args, **kwargs) -> None: + async def _watch_connection(self, service: Service, **kwargs) -> None: while True: + await asyncio.sleep(self.reconnect_timeout) try: - await asyncio.sleep(self.reconnect_timeout) current_id = await self.leader_id() - if current_id != self._leader_id: - await self.reconnect(service, *args, **kwargs) + if current_id != self.current_leader_id: + log.debug( + f"Consul leader id changed: [current='{self.current_leader_id}', new='{current_id}']" + ) + await self.reconnect(service, **kwargs) except Exception: - log.error( + log.info( f"Failed to connect to Consul, trying again at {self.reconnect_timeout}/s" ) From 0f0805957519656c327b2dc24cb8ba1a11f542cc Mon Sep 17 00:00:00 2001 From: alexandre menezes Date: Sun, 11 Dec 2022 18:17:53 -0300 Subject: [PATCH 2/2] fix watch connection error --- discovery/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/discovery/client.py b/discovery/client.py index 71f3edb..11d5386 100644 --- a/discovery/client.py +++ b/discovery/client.py @@ -109,9 +109,9 @@ async def register( async def deregister(self, service_id: str, ns: Optional[str] = None) -> None: await self.agent.service.deregister(service_id, ns) - async def reconnect(self, service: Service, **kwargs) -> None: + async def reconnect(self, service: Service, *args, **kwargs) -> None: await self.deregister(service.id) # type: ignore - await self.register(service, **kwargs) + await self.register(service, *args, **kwargs) async def _watch_connection(self, service: Service, **kwargs) -> None: while True: