Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add cluster/statistics endpoint handling. #1441

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/myint/autoflake
rev: v1.4
rev: v2.2.1 # autoflake v2.2.1 is the latest version that supports Python 3.12
hooks:
- id: autoflake
args: [--in-place, --remove-all-unused-imports, --exclude=weaviate/proto/*]


- repo: https://github.com/PyCQA/flake8
rev: 7.1.0
hooks:
Expand All @@ -36,13 +35,13 @@ repos:
]
files: '^weaviate/collections'

- repo: local
hooks:
- repo: local
hooks:
- id: mypy
name: mypy
entry: ./run-mypy.sh
language: python
language_version: "3.11"
language_version: "3.12"
# use require_serial so that script
# is only called once per commit
require_serial: true
Expand Down
67 changes: 67 additions & 0 deletions integration_v3/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,70 @@ def test_get_nodes_status_with_data(client: weaviate.Client):
assert shards[0]["class"] == class_name1
assert shards[0]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT
assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT


def test_get_cluster_statistics(client: weaviate.Client):

if not client._connection._weaviate_version.is_lower_than(1, 25, 0):
pytest.skip("Cluster statistics are supported in versions higher than 1.25.0")

"""Test getting cluster statistics."""
stats = client.cluster.get_cluster_statistics()

# Check top level structure
assert "statistics" in stats
assert "synchronized" in stats
assert isinstance(stats["synchronized"], bool)

# Check statistics array
assert isinstance(stats["statistics"], list)
assert len(stats["statistics"]) >= 1 # At least one node

# Check first node's statistics
node = stats["statistics"][0]
# bootstrapped is optional
if "bootstrapped" in node:
assert isinstance(node["bootstrapped"], bool)
assert isinstance(node["candidates"], dict)
# Check candidates structure if not empty
if node["candidates"]:
for node_name, address in node["candidates"].items():
assert isinstance(node_name, str)
assert isinstance(address, str)
assert ":" in address # Address should be in format IP:PORT
assert isinstance(node["dbLoaded"], bool)
assert isinstance(node["isVoter"], bool)
assert isinstance(node["leaderAddress"], str)
assert isinstance(node["leaderId"], str)
assert isinstance(node["name"], str)
assert isinstance(node["open"], bool) # API returns 'open', not 'open_'
assert isinstance(node["ready"], bool)
assert isinstance(node["status"], str)

# Check Raft statistics
raft = node["raft"]
assert isinstance(raft["appliedIndex"], str)
assert isinstance(raft["commitIndex"], str)
assert isinstance(raft["fsmPending"], str)
assert isinstance(raft["lastContact"], str)
assert isinstance(raft["lastLogIndex"], str)
assert isinstance(raft["lastLogTerm"], str)
assert isinstance(raft["lastSnapshotIndex"], str)
assert isinstance(raft["lastSnapshotTerm"], str)
assert isinstance(raft["latestConfiguration"], list)
assert isinstance(raft["latestConfigurationIndex"], str)
assert isinstance(raft["numPeers"], str)
assert isinstance(raft["protocolVersion"], str)
assert isinstance(raft["protocolVersionMax"], str)
assert isinstance(raft["protocolVersionMin"], str)
assert isinstance(raft["snapshotVersionMax"], str)
assert isinstance(raft["snapshotVersionMin"], str)
assert isinstance(raft["state"], str)
assert isinstance(raft["term"], str)

# Check at least one peer in the configuration
assert len(raft["latestConfiguration"]) >= 1
peer = raft["latestConfiguration"][0]
assert isinstance(peer["address"], str)
assert isinstance(peer["id"], str) # API returns 'id', not 'id_'
assert isinstance(peer["suffrage"], int)
114 changes: 114 additions & 0 deletions test/cluster/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,117 @@ def test_get_nodes_status(self):
result = Cluster(mock_conn).get_nodes_status()
self.assertListEqual(result, expected_resp.get("nodes"))
mock_conn.get.assert_called_with(path="/nodes")

def test_get_cluster_statistics(self):
# error messages
unexpected_err_msg = "Cluster statistics"
empty_response_err_msg = "Cluster statistics response returned empty"
connection_err_msg = "Get cluster statistics failed due to connection error"

# expected failure
mock_conn = mock_connection_func("get", status_code=500)
with self.assertRaises(UnexpectedStatusCodeException) as error:
Cluster(mock_conn).get_cluster_statistics()
check_startswith_error_message(self, error, unexpected_err_msg)

mock_conn = mock_connection_func("get", status_code=200, return_json=None)
with self.assertRaises(EmptyResponseException) as error:
Cluster(mock_conn).get_cluster_statistics()
check_error_message(self, error, empty_response_err_msg)

mock_conn = mock_connection_func("get", side_effect=RequestsConnectionError)
with self.assertRaises(RequestsConnectionError) as error:
Cluster(mock_conn).get_cluster_statistics()
check_error_message(self, error, connection_err_msg)

# expected success
expected_resp = {
"statistics": [
{
"candidates": {
"weaviate-0": "10.244.2.3:8300",
"weaviate-1": "10.244.1.3:8300",
},
"dbLoaded": True,
"isVoter": True,
"leaderAddress": "10.244.3.3:8300",
"leaderId": "weaviate-2",
"name": "weaviate-0",
"open_": True,
"raft": {
"appliedIndex": "3",
"commitIndex": "3",
"fsmPending": "0",
"lastContact": "29.130625ms",
"lastLogIndex": "3",
"lastLogTerm": "2",
"lastSnapshotIndex": "0",
"lastSnapshotTerm": "0",
"latestConfiguration": [
{"address": "10.244.1.3:8300", "id_": "weaviate-1", "suffrage": 0},
{"address": "10.244.3.3:8300", "id_": "weaviate-2", "suffrage": 0},
{"address": "10.244.2.3:8300", "id_": "weaviate-0", "suffrage": 0},
],
"latestConfigurationIndex": "0",
"numPeers": "2",
"protocolVersion": "3",
"protocolVersionMax": "3",
"protocolVersionMin": "0",
"snapshotVersionMax": "1",
"snapshotVersionMin": "0",
"state": "Follower",
"term": "2",
},
"ready": True,
"status": "HEALTHY",
},
{
"bootstrapped": True,
"candidates": {},
"dbLoaded": True,
"isVoter": True,
"leaderAddress": "10.244.3.3:8300",
"leaderId": "weaviate-2",
"name": "weaviate-1",
"open_": True,
"raft": {
"appliedIndex": "3",
"commitIndex": "3",
"fsmPending": "0",
"lastContact": "41.289833ms",
"lastLogIndex": "3",
"lastLogTerm": "2",
"lastSnapshotIndex": "0",
"lastSnapshotTerm": "0",
"latestConfiguration": [
{"address": "10.244.1.3:8300", "id_": "weaviate-1", "suffrage": 0},
{"address": "10.244.3.3:8300", "id_": "weaviate-2", "suffrage": 0},
{"address": "10.244.2.3:8300", "id_": "weaviate-0", "suffrage": 0},
],
"latestConfigurationIndex": "0",
"numPeers": "2",
"protocolVersion": "3",
"protocolVersionMax": "3",
"protocolVersionMin": "0",
"snapshotVersionMax": "1",
"snapshotVersionMin": "0",
"state": "Follower",
"term": "2",
},
"ready": True,
"status": "HEALTHY",
},
],
"synchronized": True,
}
mock_conn = mock_connection_func("get", status_code=200, return_json=expected_resp)
result = Cluster(mock_conn).get_cluster_statistics()

# Convert the response to match our type definitions with renamed fields
for node in result["statistics"]:
node["open_"] = node.pop("open_")
for peer in node["raft"]["latestConfiguration"]:
peer["id_"] = peer.pop("id_")

self.assertEqual(result, expected_resp)
mock_conn.get.assert_called_with(path="/cluster/statistics")
32 changes: 31 additions & 1 deletion weaviate/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from requests.exceptions import ConnectionError as RequestsConnectionError

from weaviate.cluster.types import Node
from weaviate.cluster.types import Node, ClusterStats
from weaviate.connect import Connection
from weaviate.exceptions import (
EmptyResponseException,
Expand Down Expand Up @@ -79,3 +79,33 @@ def get_nodes_status(
if nodes is None or nodes == []:
raise EmptyResponseException("Nodes status response returned empty")
return cast(List[Node], nodes)

def get_cluster_statistics(self) -> ClusterStats:
"""
Get the cluster statistics including Raft consensus information.

Returns
-------
ClusterStats
Statistics about the cluster including Raft consensus information.

Raises
------
requests.ConnectionError
If the network connection to weaviate fails.
weaviate.UnexpectedStatusCodeException
If weaviate reports a none OK status.
weaviate.EmptyResponseException
If the response is empty.
"""
try:
response = self._connection.get(path="/cluster/statistics")
except RequestsConnectionError as conn_err:
raise RequestsConnectionError(
"Get cluster statistics failed due to connection error"
) from conn_err

response_typed = _decode_json_response_dict(response, "Cluster statistics")
if response_typed is None:
raise EmptyResponseException("Cluster statistics response returned empty")
return cast(ClusterStats, response_typed)
49 changes: 48 additions & 1 deletion weaviate/cluster/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Literal, Optional, TypedDict
from typing import List, Literal, Optional, TypedDict, Dict


class BatchStats(TypedDict):
Expand Down Expand Up @@ -34,3 +34,50 @@ class Node(TypedDict):
stats: Stats
status: str
version: str


class RaftPeer(TypedDict):
address: str
id_: str
suffrage: int


class RaftStats(TypedDict):
appliedIndex: str
commitIndex: str
fsmPending: str
lastContact: str
lastLogIndex: str
lastLogTerm: str
lastSnapshotIndex: str
lastSnapshotTerm: str
latestConfiguration: List[RaftPeer]
latestConfigurationIndex: str
numPeers: str
protocolVersion: str
protocolVersionMax: str
protocolVersionMin: str
snapshotVersionMax: str
snapshotVersionMin: str
state: str
term: str


# total=False is used to make handle some of the optional fields
class ClusterNodeStats(TypedDict, total=False):
bootstrapped: bool
candidates: Dict[str, str]
dbLoaded: bool
isVoter: bool
leaderAddress: str
leaderId: str
name: str
open_: bool
raft: RaftStats
ready: bool
status: str


class ClusterStats(TypedDict):
statistics: List[ClusterNodeStats]
synchronized: bool
Loading