diff --git a/internal/coordinator/coordclient/registry.go b/internal/coordinator/coordclient/registry.go index 2b039db02de46..12c231ae9cda3 100644 --- a/internal/coordinator/coordclient/registry.go +++ b/internal/coordinator/coordclient/registry.go @@ -58,6 +58,9 @@ func EnableLocalClientRole(cfg *LocalClientRoleConfig) { // RegisterQueryCoordServer register query coord server func RegisterQueryCoordServer(server querypb.QueryCoordServer) { + if !enableLocal.EnableQueryCoord { + return + } newLocalClient := grpcclient.NewLocalGRPCClient(&querypb.QueryCoord_ServiceDesc, server, querypb.NewQueryCoordClient) glocalClient.queryCoordClient.Set(&nopCloseQueryCoordClient{newLocalClient}) log.Ctx(context.TODO()).Info("register query coord server", zap.Any("enableLocalClient", enableLocal)) @@ -65,6 +68,9 @@ func RegisterQueryCoordServer(server querypb.QueryCoordServer) { // RegsterDataCoordServer register data coord server func RegisterDataCoordServer(server datapb.DataCoordServer) { + if !enableLocal.EnableDataCoord { + return + } newLocalClient := grpcclient.NewLocalGRPCClient(&datapb.DataCoord_ServiceDesc, server, datapb.NewDataCoordClient) glocalClient.dataCoordClient.Set(&nopCloseDataCoordClient{newLocalClient}) log.Ctx(context.TODO()).Info("register data coord server", zap.Any("enableLocalClient", enableLocal)) @@ -72,6 +78,9 @@ func RegisterDataCoordServer(server datapb.DataCoordServer) { // RegisterRootCoordServer register root coord server func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) { + if !enableLocal.EnableRootCoord { + return + } newLocalClient := grpcclient.NewLocalGRPCClient(&rootcoordpb.RootCoord_ServiceDesc, server, rootcoordpb.NewRootCoordClient) glocalClient.rootCoordClient.Set(&nopCloseRootCoordClient{newLocalClient}) log.Ctx(context.TODO()).Info("register root coord server", zap.Any("enableLocalClient", enableLocal)) @@ -128,9 +137,6 @@ func GetRootCoordClient(ctx context.Context) types.RootCoordClient { // MustGetLocalRootCoordClientFuture return root coord client future, // panic if root coord client is not enabled func MustGetLocalRootCoordClientFuture() *syncutil.Future[types.RootCoordClient] { - if !enableLocal.EnableRootCoord { - panic("root coord client is not enabled") - } return glocalClient.rootCoordClient } diff --git a/internal/coordinator/coordclient/registry_test.go b/internal/coordinator/coordclient/registry_test.go index b392937a5f3de..6752b637cc4f4 100644 --- a/internal/coordinator/coordclient/registry_test.go +++ b/internal/coordinator/coordclient/registry_test.go @@ -31,6 +31,13 @@ func TestRegistry(t *testing.T) { assert.False(t, enableLocal.EnableDataCoord) assert.False(t, enableLocal.EnableRootCoord) + RegisterRootCoordServer(&rootcoordpb.UnimplementedRootCoordServer{}) + RegisterDataCoordServer(&datapb.UnimplementedDataCoordServer{}) + RegisterQueryCoordServer(&querypb.UnimplementedQueryCoordServer{}) + assert.False(t, glocalClient.dataCoordClient.Ready()) + assert.False(t, glocalClient.queryCoordClient.Ready()) + assert.False(t, glocalClient.rootCoordClient.Ready()) + enableLocal = &LocalClientRoleConfig{} EnableLocalClientRole(&LocalClientRoleConfig{ diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index 5965b47883c2c..d9250966cc26e 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -97,6 +98,9 @@ func (m *mockCore) SetQueryCoordClient(client types.QueryCoordClient) error { func (m *mockCore) SetProxyCreator(func(ctx context.Context, addr string, nodeID int64) (types.ProxyClient, error)) { } +func (m *mockCore) RegisterStreamingCoordGRPCService(server *grpc.Server) { +} + func (m *mockCore) Register() error { return nil } diff --git a/internal/mocks/mock_datacoord.go b/internal/mocks/mock_datacoord.go index b33f31f3c155f..8d7004e4d5190 100644 --- a/internal/mocks/mock_datacoord.go +++ b/internal/mocks/mock_datacoord.go @@ -10,8 +10,6 @@ import ( datapb "github.com/milvus-io/milvus/internal/proto/datapb" - grpc "google.golang.org/grpc" - indexpb "github.com/milvus-io/milvus/internal/proto/indexpb" internalpb "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -2606,39 +2604,6 @@ func (_c *MockDataCoord_Register_Call) RunAndReturn(run func() error) *MockDataC return _c } -// RegisterStreamingCoordGRPCService provides a mock function with given fields: s -func (_m *MockDataCoord) RegisterStreamingCoordGRPCService(s *grpc.Server) { - _m.Called(s) -} - -// MockDataCoord_RegisterStreamingCoordGRPCService_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterStreamingCoordGRPCService' -type MockDataCoord_RegisterStreamingCoordGRPCService_Call struct { - *mock.Call -} - -// RegisterStreamingCoordGRPCService is a helper method to define mock.On call -// - s *grpc.Server -func (_e *MockDataCoord_Expecter) RegisterStreamingCoordGRPCService(s interface{}) *MockDataCoord_RegisterStreamingCoordGRPCService_Call { - return &MockDataCoord_RegisterStreamingCoordGRPCService_Call{Call: _e.mock.On("RegisterStreamingCoordGRPCService", s)} -} - -func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) Run(run func(s *grpc.Server)) *MockDataCoord_RegisterStreamingCoordGRPCService_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*grpc.Server)) - }) - return _c -} - -func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) Return() *MockDataCoord_RegisterStreamingCoordGRPCService_Call { - _c.Call.Return() - return _c -} - -func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) RunAndReturn(run func(*grpc.Server)) *MockDataCoord_RegisterStreamingCoordGRPCService_Call { - _c.Call.Return(run) - return _c -} - // ReportDataNodeTtMsgs provides a mock function with given fields: _a0, _a1 func (_m *MockDataCoord) ReportDataNodeTtMsgs(_a0 context.Context, _a1 *datapb.ReportDataNodeTtMsgsRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_rootcoord.go b/internal/mocks/mock_rootcoord.go index 0ecf1b9ca01fc..0c261fcc056c4 100644 --- a/internal/mocks/mock_rootcoord.go +++ b/internal/mocks/mock_rootcoord.go @@ -8,6 +8,8 @@ import ( commonpb "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" clientv3 "go.etcd.io/etcd/client/v3" + grpc "google.golang.org/grpc" + internalpb "github.com/milvus-io/milvus/internal/proto/internalpb" milvuspb "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -2663,6 +2665,39 @@ func (_c *RootCoord_Register_Call) RunAndReturn(run func() error) *RootCoord_Reg return _c } +// RegisterStreamingCoordGRPCService provides a mock function with given fields: server +func (_m *RootCoord) RegisterStreamingCoordGRPCService(server *grpc.Server) { + _m.Called(server) +} + +// RootCoord_RegisterStreamingCoordGRPCService_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterStreamingCoordGRPCService' +type RootCoord_RegisterStreamingCoordGRPCService_Call struct { + *mock.Call +} + +// RegisterStreamingCoordGRPCService is a helper method to define mock.On call +// - server *grpc.Server +func (_e *RootCoord_Expecter) RegisterStreamingCoordGRPCService(server interface{}) *RootCoord_RegisterStreamingCoordGRPCService_Call { + return &RootCoord_RegisterStreamingCoordGRPCService_Call{Call: _e.mock.On("RegisterStreamingCoordGRPCService", server)} +} + +func (_c *RootCoord_RegisterStreamingCoordGRPCService_Call) Run(run func(server *grpc.Server)) *RootCoord_RegisterStreamingCoordGRPCService_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*grpc.Server)) + }) + return _c +} + +func (_c *RootCoord_RegisterStreamingCoordGRPCService_Call) Return() *RootCoord_RegisterStreamingCoordGRPCService_Call { + _c.Call.Return() + return _c +} + +func (_c *RootCoord_RegisterStreamingCoordGRPCService_Call) RunAndReturn(run func(*grpc.Server)) *RootCoord_RegisterStreamingCoordGRPCService_Call { + _c.Call.Return(run) + return _c +} + // RenameCollection provides a mock function with given fields: _a0, _a1 func (_m *RootCoord) RenameCollection(_a0 context.Context, _a1 *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 9ce81bbf74893..f1d0dee85bfe2 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -826,7 +826,9 @@ func (c *Core) Stop() error { c.stopExecutor() c.stopScheduler() - c.streamingCoord.Stop() + if c.streamingCoord != nil { + c.streamingCoord.Stop() + } if c.proxyWatcher != nil { c.proxyWatcher.Stop() } diff --git a/internal/streamingcoord/server/service/discover/discover_server_test.go b/internal/streamingcoord/server/service/discover/discover_server_test.go index a4cd2ed2cf0d6..a308028b55282 100644 --- a/internal/streamingcoord/server/service/discover/discover_server_test.go +++ b/internal/streamingcoord/server/service/discover/discover_server_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer" + "github.com/milvus-io/milvus/internal/streamingcoord/server/resource" "github.com/milvus-io/milvus/pkg/mocks/streaming/proto/mock_streamingpb" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/types" @@ -15,6 +16,7 @@ import ( ) func TestAssignmentDiscover(t *testing.T) { + resource.InitForTest() b := mock_balancer.NewMockBalancer(t) b.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) error { versions := []typeutil.VersionInt64Pair{