From 8563ae011b5e09358c624ec2ad466878694cd015 Mon Sep 17 00:00:00 2001 From: alexandre menezes Date: Mon, 25 Mar 2019 17:13:29 -0300 Subject: [PATCH 1/3] [skip travis] fix reconnect flow. --- discovery/aioclient.py | 37 ++++++++++++++++++---------- discovery/client.py | 55 +++++++++++++++++++++++++++--------------- 2 files changed, 60 insertions(+), 32 deletions(-) diff --git a/discovery/aioclient.py b/discovery/aioclient.py index 969b31a..2bc3ede 100644 --- a/discovery/aioclient.py +++ b/discovery/aioclient.py @@ -65,11 +65,13 @@ def __format_catalog_service(self, services): async def _reconnect(self): """Service re-registration steps.""" await self.__discovery.agent.service.deregister(self.__service['id']) - await self.__discovery.agent.service.register(name=self.__service['name'], - service_id=self.__service['id'], - check=self.__service['healthcheck'], - address=self.__service['application_ip'], - port=self.__service['port']) + await self.__discovery.agent.service.register( + name=self.__service['name'], + service_id=self.__service['id'], + check=self.__service['healthcheck'], + address=self.__service['application_ip'], + port=self.__service['port'] + ) current_id = await self.__discovery.health.service('consul') self.__id = self.__format_id(current_id) @@ -94,11 +96,15 @@ async def consul_is_healthy(self): except aiohttp.ClientConnectorError: logging.error('failed to connect to discovery service...') - logging.error(f"reconnect will occur in {self.DEFAULT_TIMEOUT} seconds.") + logging.error( + f"reconnect will occur in {self.DEFAULT_TIMEOUT} seconds." + ) await self.consul_is_healthy() except aiohttp.ServerDisconnectedError: - logging.error('temporary loss of communication with the discovery server.') + logging.error( + 'temporary loss of communication with the discovery server.' + ) asyncio.sleep(self.DEFAULT_TIMEOUT) await self.consul_is_healthy() @@ -131,7 +137,10 @@ async def deregister(self): logging.info('successfully unregistered application!') - async def register(self, service_name, service_port, healthcheck_path="/manage/health"): + async def register(self, + service_name, + service_port, + healthcheck_path="/manage/health"): """Register a new service. Default values are: @@ -144,11 +153,13 @@ async def register(self, service_name, service_port, healthcheck_path="/manage/h self.__create_service(service_name, service_port, healthcheck_path) - await self.__discovery.agent.service.register(name=self.__service['name'], - service_id=self.__service['id'], - check=self.__service['healthcheck'], - address=self.__service['application_ip'], - port=self.__service['port']) + await self.__discovery.agent.service.register( + name=self.__service['name'], + service_id=self.__service['id'], + check=self.__service['healthcheck'], + address=self.__service['application_ip'], + port=self.__service['port'] + ) current_id = await self.__discovery.health.service('consul') self.__id = self.__format_id(current_id) diff --git a/discovery/client.py b/discovery/client.py index 114fba9..21b9c9e 100644 --- a/discovery/client.py +++ b/discovery/client.py @@ -64,16 +64,28 @@ def __reconnect(self): logging.debug('Service reconnect fallback') self.__discovery.agent.service.deregister(self.__service['id']) - self.__discovery.agent.service.register(name=self.__service['name'], - service_id=self.__service['id'], - check=self.__service['healthcheck'], - address=self.__service['application_ip'], - port=self.__service['port']) - current_id = self.__discovery.health.service('consul') - self.__id = self.__format_id(current_id) + self.__discovery.agent.service.register( + name=self.__service['name'], + service_id=self.__service['id'], + check=self.__service['healthcheck'], + address=self.__service['application_ip'], + port=self.__service['port'] + ) + + self.__id = self.get_leader_current_id() logging.info('Service successfully re-registered') + def get_leader_current_id(self): + """Retrieve current ID from consul leader.""" + consul_leader = self.__discovery.status.leader() + consul_instances = self.__discovery.health.service('consul')[Filter.PAYLOAD.value] + current_id = [instance['Node']['ID'] + for instance in consul_instances + if instance['Node']['Address'] == consul_leader.split(':')[0]] + + return current_id[Filter.FIRST_ITEM.value] + def consul_is_healthy(self): """Start a loop to monitor consul healthy. @@ -82,17 +94,18 @@ def consul_is_healthy(self): while True: try: time.sleep(self.DEFAULT_TIMEOUT) - current_id = self.__discovery.health.service('consul') - logging.debug('Checking consul health status') + current_id = self.get_leader_current_id() logging.debug(f"Consul ID: {self.__format_id(current_id)}") - if self.__format_id(current_id) != self.__id: + if current_id != self.__id: self.__reconnect() except requests.exceptions.ConnectionError: logging.error("Failed to connect to discovery service...") - logging.error(f'Reconnect will occur in {self.DEFAULT_TIMEOUT} seconds.') + logging.error( + f'Reconnect will occur in {self.DEFAULT_TIMEOUT} seconds.' + ) def find_service(self, service_name, method='rr'): """Search for a service in the consul's catalog. @@ -118,7 +131,9 @@ def find_services(self, service_name): def deregister(self): """Deregister a service registered.""" - logging.debug(f"Unregistering service id: {self.__service['id']}") + logging.debug( + f"Unregistering service id: {self.__service['id']}" + ) logging.info('Successfully unregistered application!') self.__discovery.agent.service.deregister(self.__service['id']) @@ -136,13 +151,15 @@ def register(self, service_name, service_port, healthcheck_path="/manage/health" self.__create_service(service_name, service_port, healthcheck_path) - self.__discovery.agent.service.register(name=self.__service['name'], - service_id=self.__service['id'], - check=self.__service['healthcheck'], - address=self.__service['application_ip'], - port=self.__service['port']) - current_id = self.__discovery.health.service('consul') - self.__id = self.__format_id(current_id) + self.__discovery.agent.service.register( + name=self.__service['name'], + service_id=self.__service['id'], + check=self.__service['healthcheck'], + address=self.__service['application_ip'], + port=self.__service['port'] + ) + + self.__id = self.get_leader_current_id() logging.info('Service successfully registered!') logging.debug(f'Consul ID: {self.__id}') From eeb46c0793f0276011b4b94d5e037f353ac4ef09 Mon Sep 17 00:00:00 2001 From: amenezes Date: Tue, 26 Mar 2019 01:26:44 -0300 Subject: [PATCH 2/3] pre-release of new reconnect flow. --- discovery/aioclient.py | 46 ++++++++++++++++++++++-------------- discovery/client.py | 19 +++++++-------- setup.py | 2 +- tests/unit/test_aioclient.py | 44 ++++++++++++++++++++++++++++++---- tests/unit/test_client.py | 6 ----- 5 files changed, 78 insertions(+), 39 deletions(-) diff --git a/discovery/aioclient.py b/discovery/aioclient.py index 2bc3ede..633f3e5 100644 --- a/discovery/aioclient.py +++ b/discovery/aioclient.py @@ -46,13 +46,6 @@ def __create_service(self, service_name, service_port, healthcheck_path): logging.debug(f'Service data: {self.__service}') - def __format_id(self, id): - """Retrieve consul ID from Consul API: /health/status/. - - docs: https://www.consul.io/api/health.html#list-nodes-for-service - """ - return id[Filter.PAYLOAD.value][Filter.FIRST_ITEM.value]['Node']['ID'] - def __format_catalog_service(self, services): servicesfmt = [{"node": svc['Node'], "address": svc['Address'], @@ -72,12 +65,27 @@ async def _reconnect(self): address=self.__service['application_ip'], port=self.__service['port'] ) - current_id = await self.__discovery.health.service('consul') - self.__id = self.__format_id(current_id) - logging.debug(f"Consul ID: {self.__format_id(current_id)}") + self.__id = await self.get_leader_current_id() + + logging.debug(f"Consul ID: {self.__id}") logging.info('Service successfully re-registered') + async def get_leader_current_id(self): + """Retrieve current ID from consul leader.""" + consul_leader = await self.__discovery.status.leader() + consul_instances = await self.__discovery.health.service('consul') + consul_instances = consul_instances[Filter.PAYLOAD.value] + + current_id = [instance['Node']['ID'] + for instance in consul_instances + if instance['Node']['Address'] == consul_leader.split(':')[0]] + + # if len(current_id) > 0: + # current_id = current_id[Filter.FIRST_ITEM.value] + + return current_id[Filter.FIRST_ITEM.value] + async def consul_is_healthy(self): """Start a loop to monitor consul healthy. @@ -89,9 +97,9 @@ async def consul_is_healthy(self): current_id = await self.__discovery.health.service('consul') logging.debug('Checking consul health status') - logging.debug(f"Consul ID: {self.__format_id(current_id)}") + logging.debug(f"Consul ID: {current_id}") - if self.__format_id(current_id) != self.__id: + if current_id != self.__id: await self._reconnect() except aiohttp.ClientConnectorError: @@ -150,9 +158,11 @@ async def register(self, timeout: 5s """ try: - self.__create_service(service_name, - service_port, - healthcheck_path) + self.__create_service( + service_name, + service_port, + healthcheck_path + ) await self.__discovery.agent.service.register( name=self.__service['name'], service_id=self.__service['id'], @@ -160,11 +170,11 @@ async def register(self, address=self.__service['application_ip'], port=self.__service['port'] ) - current_id = await self.__discovery.health.service('consul') - self.__id = self.__format_id(current_id) + + self.__id = await self.get_leader_current_id() logging.info('service successfully registered!') - logging.debug(f"Consul ID: {self.__format_id(current_id)}") + logging.debug(f"Consul ID: {self.__id}") except aiohttp.ClientConnectorError: logging.error("failed to connect to discovery...") diff --git a/discovery/client.py b/discovery/client.py index 21b9c9e..0c8090d 100644 --- a/discovery/client.py +++ b/discovery/client.py @@ -43,13 +43,6 @@ def __create_service(self, service_name, service_port, healthcheck_path): logging.debug(f'Service data: {self.__service}') - def __format_id(self, service_id): - """Retrieve consul ID from Consul API: /health/status/. - - docs: https://www.consul.io/api/health.html#list-nodes-for-service - """ - return service_id[Filter.PAYLOAD.value][Filter.FIRST_ITEM.value]['Node']['ID'] - def __format_catalog_service(self, services): servicesfmt = [{"node": svc['Node'], "address": svc['Address'], @@ -84,7 +77,10 @@ def get_leader_current_id(self): for instance in consul_instances if instance['Node']['Address'] == consul_leader.split(':')[0]] - return current_id[Filter.FIRST_ITEM.value] + if len(current_id) > 0: + current_id = current_id[Filter.FIRST_ITEM.value] + + return current_id def consul_is_healthy(self): """Start a loop to monitor consul healthy. @@ -96,7 +92,7 @@ def consul_is_healthy(self): time.sleep(self.DEFAULT_TIMEOUT) current_id = self.get_leader_current_id() - logging.debug(f"Consul ID: {self.__format_id(current_id)}") + logging.debug(f"Consul ID: {current_id}") if current_id != self.__id: self.__reconnect() @@ -138,7 +134,10 @@ def deregister(self): self.__discovery.agent.service.deregister(self.__service['id']) - def register(self, service_name, service_port, healthcheck_path="/manage/health"): + def register(self, + service_name, + service_port, + healthcheck_path="/manage/health"): """Register a new service. Default values are: diff --git a/setup.py b/setup.py index 4083d22..8cf491f 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setuptools.setup( name="discovery-client", - version="0.2.1", + version="0.2.2", author="alexandre menezes", author_email="alexandre.fmenezes@gmail.com", description="discovery service client", diff --git a/tests/unit/test_aioclient.py b/tests/unit/test_aioclient.py index 967a7b8..29ae4da 100644 --- a/tests/unit/test_aioclient.py +++ b/tests/unit/test_aioclient.py @@ -8,6 +8,8 @@ import consul.aio +from aiohttp import ClientConnectorError + from discovery import aioclient @@ -22,7 +24,8 @@ def setUp(self): self.loop = asyncio.get_event_loop() self.consul_health_response = ( 0, [{'Node': { - 'ID': '123456'}}]) + 'ID': '123456', + 'Address': '127.0.0.1'}}]) self.consul_raw_response = ( 0, [{'Node': 'localhost', 'Address': '127.0.0.1', @@ -130,18 +133,46 @@ async def async_test_find_services(loop): async_test_find_services(self.loop) ) + @patch('discovery.aioclient.consul.aio.Consul') + def test_get_leader_current_id(self, MockAioClient): + """Test retrieve the ID from Consul leader.""" + async def async_test_get_leader_current_id(loop): + consul_client = MockAioClient(consul.aio.Consul) + consul_client.status.leader = CoroutineMock( + return_value='127.0.0.1:8300' + ) + consul_client.health.service = CoroutineMock( + return_value=self.consul_health_response + ) + + dc = aioclient.Consul('localhost', 8500, app=loop) + current_id = await dc.get_leader_current_id() + + self.assertIsNotNone(current_id) + self.assertEqual( + current_id, + self.consul_health_response[1][0]['Node']['ID'] + ) + + self.loop.run_until_complete( + async_test_get_leader_current_id(self.loop) + ) + @patch('discovery.aioclient.consul.aio.Consul') def test_register(self, MockAioConsul): """Test registration of a service in the consul's catalog.""" async def async_test_register(loop): consul_client = MockAioConsul(consul.aio.Consul) consul_client.agent.service.register = CoroutineMock() - consul_client.catalog.service = CoroutineMock( - return_value=self.myapp_raw_response + consul_client.status.leader = CoroutineMock( + return_value='127.0.0.1:8300' ) consul_client.health.service = CoroutineMock( return_value=self.consul_health_response ) + consul_client.catalog.service = CoroutineMock( + return_value=self.myapp_raw_response + ) dc = aioclient.Consul('localhost', 8500, app=loop) await dc.register('myapp', 5000) @@ -164,6 +195,9 @@ async def async_test_deregister(loop): consul_client.catalog.service = CoroutineMock( return_value=self.myapp_raw_response ) + consul_client.status.leader = CoroutineMock( + return_value='127.0.0.1:8300' + ) consul_client.health.service = CoroutineMock( return_value=self.consul_health_response ) @@ -177,7 +211,9 @@ async def async_test_deregister(loop): await dc.deregister() - consul_client.catalog.service = CoroutineMock(return_value=(0, [])) + consul_client.catalog.service = CoroutineMock( + return_value=(0, []) + ) with self.assertRaises(IndexError): await dc.find_service('myapp') diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index ffe493c..e9858e2 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -148,9 +148,6 @@ def test_register(self, MockConsul): consul_client.catalog.service = MagicMock( return_value=self.myapp_raw_response ) - consul_client.health.service = MagicMock( - return_value=self.consul_health_response - ) dc = client.Consul('localhost', 8500) dc.register('myapp', 5000) @@ -186,9 +183,6 @@ def test_deregister(self, MockConsul): consul_client.catalog.service = MagicMock( return_value=self.myapp_raw_response ) - consul_client.health.service = MagicMock( - return_value=self.consul_health_response - ) dc = client.Consul('localhost', 8500) dc.register('myapp', 5000) From 656856af333924ba2683b319125a1626fa1f4b04 Mon Sep 17 00:00:00 2001 From: amenezes Date: Tue, 26 Mar 2019 01:29:58 -0300 Subject: [PATCH 3/3] fix flake8 error. --- tests/unit/test_aioclient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_aioclient.py b/tests/unit/test_aioclient.py index 29ae4da..d59162e 100644 --- a/tests/unit/test_aioclient.py +++ b/tests/unit/test_aioclient.py @@ -3,13 +3,13 @@ import os import unittest +# from aiohttp import ClientConnectorError + import asynctest from asynctest import CoroutineMock, patch import consul.aio -from aiohttp import ClientConnectorError - from discovery import aioclient