Skip to content

Commit

Permalink
Merge branch 'hf-reconnect'
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandre menezes committed Mar 27, 2019
2 parents 501c405 + 656856a commit 67c1d02
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 71 deletions.
85 changes: 53 additions & 32 deletions discovery/aioclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,6 @@ def __create_service(self, service_name, service_port, healthcheck_path):

logging.debug(f'Service data: {self.__service}')

def __format_id(self, id):
"""Retrieve consul ID from Consul API: /health/status/<service>.
docs: https://www.consul.io/api/health.html#list-nodes-for-service
"""
return id[Filter.PAYLOAD.value][Filter.FIRST_ITEM.value]['Node']['ID']

def __format_catalog_service(self, services):
servicesfmt = [{"node": svc['Node'],
"address": svc['Address'],
Expand All @@ -65,17 +58,34 @@ def __format_catalog_service(self, services):
async def _reconnect(self):
"""Service re-registration steps."""
await self.__discovery.agent.service.deregister(self.__service['id'])
await self.__discovery.agent.service.register(name=self.__service['name'],
service_id=self.__service['id'],
check=self.__service['healthcheck'],
address=self.__service['application_ip'],
port=self.__service['port'])
current_id = await self.__discovery.health.service('consul')
self.__id = self.__format_id(current_id)

logging.debug(f"Consul ID: {self.__format_id(current_id)}")
await self.__discovery.agent.service.register(
name=self.__service['name'],
service_id=self.__service['id'],
check=self.__service['healthcheck'],
address=self.__service['application_ip'],
port=self.__service['port']
)

self.__id = await self.get_leader_current_id()

logging.debug(f"Consul ID: {self.__id}")
logging.info('Service successfully re-registered')

async def get_leader_current_id(self):
"""Retrieve current ID from consul leader."""
consul_leader = await self.__discovery.status.leader()
consul_instances = await self.__discovery.health.service('consul')
consul_instances = consul_instances[Filter.PAYLOAD.value]

current_id = [instance['Node']['ID']
for instance in consul_instances
if instance['Node']['Address'] == consul_leader.split(':')[0]]

# if len(current_id) > 0:
# current_id = current_id[Filter.FIRST_ITEM.value]

return current_id[Filter.FIRST_ITEM.value]

async def consul_is_healthy(self):
"""Start a loop to monitor consul healthy.
Expand All @@ -87,18 +97,22 @@ async def consul_is_healthy(self):
current_id = await self.__discovery.health.service('consul')

logging.debug('Checking consul health status')
logging.debug(f"Consul ID: {self.__format_id(current_id)}")
logging.debug(f"Consul ID: {current_id}")

if self.__format_id(current_id) != self.__id:
if current_id != self.__id:
await self._reconnect()

except aiohttp.ClientConnectorError:
logging.error('failed to connect to discovery service...')
logging.error(f"reconnect will occur in {self.DEFAULT_TIMEOUT} seconds.")
logging.error(
f"reconnect will occur in {self.DEFAULT_TIMEOUT} seconds."
)
await self.consul_is_healthy()

except aiohttp.ServerDisconnectedError:
logging.error('temporary loss of communication with the discovery server.')
logging.error(
'temporary loss of communication with the discovery server.'
)
asyncio.sleep(self.DEFAULT_TIMEOUT)
await self.consul_is_healthy()

Expand Down Expand Up @@ -131,7 +145,10 @@ async def deregister(self):

logging.info('successfully unregistered application!')

async def register(self, service_name, service_port, healthcheck_path="/manage/health"):
async def register(self,
service_name,
service_port,
healthcheck_path="/manage/health"):
"""Register a new service.
Default values are:
Expand All @@ -141,19 +158,23 @@ async def register(self, service_name, service_port, healthcheck_path="/manage/h
timeout: 5s
"""
try:
self.__create_service(service_name,
service_port,
healthcheck_path)
await self.__discovery.agent.service.register(name=self.__service['name'],
service_id=self.__service['id'],
check=self.__service['healthcheck'],
address=self.__service['application_ip'],
port=self.__service['port'])
current_id = await self.__discovery.health.service('consul')
self.__id = self.__format_id(current_id)
self.__create_service(
service_name,
service_port,
healthcheck_path
)
await self.__discovery.agent.service.register(
name=self.__service['name'],
service_id=self.__service['id'],
check=self.__service['healthcheck'],
address=self.__service['application_ip'],
port=self.__service['port']
)

self.__id = await self.get_leader_current_id()

logging.info('service successfully registered!')
logging.debug(f"Consul ID: {self.__format_id(current_id)}")
logging.debug(f"Consul ID: {self.__id}")

except aiohttp.ClientConnectorError:
logging.error("failed to connect to discovery...")
72 changes: 44 additions & 28 deletions discovery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@ def __create_service(self, service_name, service_port, healthcheck_path):

logging.debug(f'Service data: {self.__service}')

def __format_id(self, service_id):
"""Retrieve consul ID from Consul API: /health/status/<service>.
docs: https://www.consul.io/api/health.html#list-nodes-for-service
"""
return service_id[Filter.PAYLOAD.value][Filter.FIRST_ITEM.value]['Node']['ID']

def __format_catalog_service(self, services):
servicesfmt = [{"node": svc['Node'],
"address": svc['Address'],
Expand All @@ -64,16 +57,31 @@ def __reconnect(self):
logging.debug('Service reconnect fallback')

self.__discovery.agent.service.deregister(self.__service['id'])
self.__discovery.agent.service.register(name=self.__service['name'],
service_id=self.__service['id'],
check=self.__service['healthcheck'],
address=self.__service['application_ip'],
port=self.__service['port'])
current_id = self.__discovery.health.service('consul')
self.__id = self.__format_id(current_id)
self.__discovery.agent.service.register(
name=self.__service['name'],
service_id=self.__service['id'],
check=self.__service['healthcheck'],
address=self.__service['application_ip'],
port=self.__service['port']
)

self.__id = self.get_leader_current_id()

logging.info('Service successfully re-registered')

def get_leader_current_id(self):
"""Retrieve current ID from consul leader."""
consul_leader = self.__discovery.status.leader()
consul_instances = self.__discovery.health.service('consul')[Filter.PAYLOAD.value]
current_id = [instance['Node']['ID']
for instance in consul_instances
if instance['Node']['Address'] == consul_leader.split(':')[0]]

if len(current_id) > 0:
current_id = current_id[Filter.FIRST_ITEM.value]

return current_id

def consul_is_healthy(self):
"""Start a loop to monitor consul healthy.
Expand All @@ -82,17 +90,18 @@ def consul_is_healthy(self):
while True:
try:
time.sleep(self.DEFAULT_TIMEOUT)
current_id = self.__discovery.health.service('consul')

logging.debug('Checking consul health status')
logging.debug(f"Consul ID: {self.__format_id(current_id)}")
current_id = self.get_leader_current_id()
logging.debug(f"Consul ID: {current_id}")

if self.__format_id(current_id) != self.__id:
if current_id != self.__id:
self.__reconnect()

except requests.exceptions.ConnectionError:
logging.error("Failed to connect to discovery service...")
logging.error(f'Reconnect will occur in {self.DEFAULT_TIMEOUT} seconds.')
logging.error(
f'Reconnect will occur in {self.DEFAULT_TIMEOUT} seconds.'
)

def find_service(self, service_name, method='rr'):
"""Search for a service in the consul's catalog.
Expand All @@ -118,12 +127,17 @@ def find_services(self, service_name):

def deregister(self):
"""Deregister a service registered."""
logging.debug(f"Unregistering service id: {self.__service['id']}")
logging.debug(
f"Unregistering service id: {self.__service['id']}"
)
logging.info('Successfully unregistered application!')

self.__discovery.agent.service.deregister(self.__service['id'])

def register(self, service_name, service_port, healthcheck_path="/manage/health"):
def register(self,
service_name,
service_port,
healthcheck_path="/manage/health"):
"""Register a new service.
Default values are:
Expand All @@ -136,13 +150,15 @@ def register(self, service_name, service_port, healthcheck_path="/manage/health"
self.__create_service(service_name,
service_port,
healthcheck_path)
self.__discovery.agent.service.register(name=self.__service['name'],
service_id=self.__service['id'],
check=self.__service['healthcheck'],
address=self.__service['application_ip'],
port=self.__service['port'])
current_id = self.__discovery.health.service('consul')
self.__id = self.__format_id(current_id)
self.__discovery.agent.service.register(
name=self.__service['name'],
service_id=self.__service['id'],
check=self.__service['healthcheck'],
address=self.__service['application_ip'],
port=self.__service['port']
)

self.__id = self.get_leader_current_id()

logging.info('Service successfully registered!')
logging.debug(f'Consul ID: {self.__id}')
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setuptools.setup(
name="discovery-client",
version="0.2.1",
version="0.2.2",
author="alexandre menezes",
author_email="[email protected]",
description="discovery service client",
Expand Down
44 changes: 40 additions & 4 deletions tests/unit/test_aioclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
import unittest

# from aiohttp import ClientConnectorError

import asynctest
from asynctest import CoroutineMock, patch

Expand All @@ -22,7 +24,8 @@ def setUp(self):
self.loop = asyncio.get_event_loop()
self.consul_health_response = (
0, [{'Node': {
'ID': '123456'}}])
'ID': '123456',
'Address': '127.0.0.1'}}])
self.consul_raw_response = (
0, [{'Node': 'localhost',
'Address': '127.0.0.1',
Expand Down Expand Up @@ -130,18 +133,46 @@ async def async_test_find_services(loop):
async_test_find_services(self.loop)
)

@patch('discovery.aioclient.consul.aio.Consul')
def test_get_leader_current_id(self, MockAioClient):
"""Test retrieve the ID from Consul leader."""
async def async_test_get_leader_current_id(loop):
consul_client = MockAioClient(consul.aio.Consul)
consul_client.status.leader = CoroutineMock(
return_value='127.0.0.1:8300'
)
consul_client.health.service = CoroutineMock(
return_value=self.consul_health_response
)

dc = aioclient.Consul('localhost', 8500, app=loop)
current_id = await dc.get_leader_current_id()

self.assertIsNotNone(current_id)
self.assertEqual(
current_id,
self.consul_health_response[1][0]['Node']['ID']
)

self.loop.run_until_complete(
async_test_get_leader_current_id(self.loop)
)

@patch('discovery.aioclient.consul.aio.Consul')
def test_register(self, MockAioConsul):
"""Test registration of a service in the consul's catalog."""
async def async_test_register(loop):
consul_client = MockAioConsul(consul.aio.Consul)
consul_client.agent.service.register = CoroutineMock()
consul_client.catalog.service = CoroutineMock(
return_value=self.myapp_raw_response
consul_client.status.leader = CoroutineMock(
return_value='127.0.0.1:8300'
)
consul_client.health.service = CoroutineMock(
return_value=self.consul_health_response
)
consul_client.catalog.service = CoroutineMock(
return_value=self.myapp_raw_response
)

dc = aioclient.Consul('localhost', 8500, app=loop)
await dc.register('myapp', 5000)
Expand All @@ -164,6 +195,9 @@ async def async_test_deregister(loop):
consul_client.catalog.service = CoroutineMock(
return_value=self.myapp_raw_response
)
consul_client.status.leader = CoroutineMock(
return_value='127.0.0.1:8300'
)
consul_client.health.service = CoroutineMock(
return_value=self.consul_health_response
)
Expand All @@ -177,7 +211,9 @@ async def async_test_deregister(loop):

await dc.deregister()

consul_client.catalog.service = CoroutineMock(return_value=(0, []))
consul_client.catalog.service = CoroutineMock(
return_value=(0, [])
)

with self.assertRaises(IndexError):
await dc.find_service('myapp')
Expand Down
6 changes: 0 additions & 6 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,6 @@ def test_register(self, MockConsul):
consul_client.catalog.service = MagicMock(
return_value=self.myapp_raw_response
)
consul_client.health.service = MagicMock(
return_value=self.consul_health_response
)

dc = client.Consul('localhost', 8500)
dc.register('myapp', 5000)
Expand Down Expand Up @@ -186,9 +183,6 @@ def test_deregister(self, MockConsul):
consul_client.catalog.service = MagicMock(
return_value=self.myapp_raw_response
)
consul_client.health.service = MagicMock(
return_value=self.consul_health_response
)

dc = client.Consul('localhost', 8500)
dc.register('myapp', 5000)
Expand Down

0 comments on commit 67c1d02

Please sign in to comment.