diff --git a/internal/coordinator/coordclient/registry.go b/internal/coordinator/coordclient/registry.go index a7a9e65a02a26..2b039db02de46 100644 --- a/internal/coordinator/coordclient/registry.go +++ b/internal/coordinator/coordclient/registry.go @@ -50,9 +50,6 @@ type LocalClientRoleConfig struct { // EnableLocalClientRole init localable roles func EnableLocalClientRole(cfg *LocalClientRoleConfig) { - if !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { - return - } if cfg.ServerType != typeutil.StandaloneRole && cfg.ServerType != typeutil.MixtureRole { return } @@ -61,9 +58,6 @@ func EnableLocalClientRole(cfg *LocalClientRoleConfig) { // RegisterQueryCoordServer register query coord server func RegisterQueryCoordServer(server querypb.QueryCoordServer) { - if !enableLocal.EnableQueryCoord { - return - } newLocalClient := grpcclient.NewLocalGRPCClient(&querypb.QueryCoord_ServiceDesc, server, querypb.NewQueryCoordClient) glocalClient.queryCoordClient.Set(&nopCloseQueryCoordClient{newLocalClient}) log.Ctx(context.TODO()).Info("register query coord server", zap.Any("enableLocalClient", enableLocal)) @@ -71,9 +65,6 @@ func RegisterQueryCoordServer(server querypb.QueryCoordServer) { // RegsterDataCoordServer register data coord server func RegisterDataCoordServer(server datapb.DataCoordServer) { - if !enableLocal.EnableDataCoord { - return - } newLocalClient := grpcclient.NewLocalGRPCClient(&datapb.DataCoord_ServiceDesc, server, datapb.NewDataCoordClient) glocalClient.dataCoordClient.Set(&nopCloseDataCoordClient{newLocalClient}) log.Ctx(context.TODO()).Info("register data coord server", zap.Any("enableLocalClient", enableLocal)) @@ -81,9 +72,6 @@ func RegisterDataCoordServer(server datapb.DataCoordServer) { // RegisterRootCoordServer register root coord server func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) { - if !enableLocal.EnableRootCoord { - return - } newLocalClient := grpcclient.NewLocalGRPCClient(&rootcoordpb.RootCoord_ServiceDesc, server, rootcoordpb.NewRootCoordClient) glocalClient.rootCoordClient.Set(&nopCloseRootCoordClient{newLocalClient}) log.Ctx(context.TODO()).Info("register root coord server", zap.Any("enableLocalClient", enableLocal)) @@ -93,7 +81,7 @@ func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) { func GetQueryCoordClient(ctx context.Context) types.QueryCoordClient { var client types.QueryCoordClient var err error - if enableLocal.EnableQueryCoord { + if enableLocal.EnableQueryCoord && paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { client, err = glocalClient.queryCoordClient.GetWithContext(ctx) } else { // TODO: we should make a singleton here. but most unittest rely on a dedicated client. @@ -109,7 +97,7 @@ func GetQueryCoordClient(ctx context.Context) types.QueryCoordClient { func GetDataCoordClient(ctx context.Context) types.DataCoordClient { var client types.DataCoordClient var err error - if enableLocal.EnableDataCoord { + if enableLocal.EnableDataCoord && paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { client, err = glocalClient.dataCoordClient.GetWithContext(ctx) } else { // TODO: we should make a singleton here. but most unittest rely on a dedicated client. @@ -125,7 +113,7 @@ func GetDataCoordClient(ctx context.Context) types.DataCoordClient { func GetRootCoordClient(ctx context.Context) types.RootCoordClient { var client types.RootCoordClient var err error - if enableLocal.EnableRootCoord { + if enableLocal.EnableRootCoord && paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { client, err = glocalClient.rootCoordClient.GetWithContext(ctx) } else { // TODO: we should make a singleton here. but most unittest rely on a dedicated client. @@ -137,6 +125,15 @@ func GetRootCoordClient(ctx context.Context) types.RootCoordClient { return client } +// MustGetLocalRootCoordClientFuture return root coord client future, +// panic if root coord client is not enabled +func MustGetLocalRootCoordClientFuture() *syncutil.Future[types.RootCoordClient] { + if !enableLocal.EnableRootCoord { + panic("root coord client is not enabled") + } + return glocalClient.rootCoordClient +} + type nopCloseQueryCoordClient struct { querypb.QueryCoordClient } diff --git a/internal/coordinator/coordclient/registry_test.go b/internal/coordinator/coordclient/registry_test.go index 6752b637cc4f4..b392937a5f3de 100644 --- a/internal/coordinator/coordclient/registry_test.go +++ b/internal/coordinator/coordclient/registry_test.go @@ -31,13 +31,6 @@ func TestRegistry(t *testing.T) { assert.False(t, enableLocal.EnableDataCoord) assert.False(t, enableLocal.EnableRootCoord) - RegisterRootCoordServer(&rootcoordpb.UnimplementedRootCoordServer{}) - RegisterDataCoordServer(&datapb.UnimplementedDataCoordServer{}) - RegisterQueryCoordServer(&querypb.UnimplementedQueryCoordServer{}) - assert.False(t, glocalClient.dataCoordClient.Ready()) - assert.False(t, glocalClient.queryCoordClient.Ready()) - assert.False(t, glocalClient.rootCoordClient.Ready()) - enableLocal = &LocalClientRoleConfig{} EnableLocalClientRole(&LocalClientRoleConfig{ diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index ada34adad3c63..383be97b77c9b 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -33,7 +33,6 @@ import ( "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" - "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -49,7 +48,6 @@ import ( "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" - streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -163,9 +161,6 @@ type Server struct { // manage ways that data coord access other coord broker broker.Broker - // streamingcoord server is embedding in datacoord now. - streamingCoord *streamingcoord.Server - metricsRequest *metricsinfo.MetricsRequest } @@ -312,12 +307,6 @@ func (s *Server) Init() error { if err := s.initKV(); err != nil { return err } - if streamingutil.IsStreamingServiceEnabled() { - s.streamingCoord = streamingcoord.NewServerBuilder(). - WithETCD(s.etcdCli). - WithMetaKV(s.kv). - WithSession(s.session).Build() - } if s.enableActiveStandBy { s.activateFunc = func() error { log.Info("DataCoord switch from standby to active, activating") @@ -327,11 +316,6 @@ func (s *Server) Init() error { } s.startDataCoord() log.Info("DataCoord startup success") - - if s.streamingCoord != nil { - s.streamingCoord.Start() - log.Info("StreamingCoord stratup successfully at standby mode") - } return nil } s.stateCode.Store(commonpb.StateCode_StandBy) @@ -342,10 +326,6 @@ func (s *Server) Init() error { return s.initDataCoord() } -func (s *Server) RegisterStreamingCoordGRPCService(server *grpc.Server) { - s.streamingCoord.RegisterGRPCService(server) -} - func (s *Server) initDataCoord() error { log := log.Ctx(s.ctx) s.stateCode.Store(commonpb.StateCode_Initializing) @@ -376,15 +356,6 @@ func (s *Server) initDataCoord() error { return err } - // Initialize streaming coordinator. - if streamingutil.IsStreamingServiceEnabled() { - - if err = s.streamingCoord.Init(context.TODO()); err != nil { - return err - } - log.Info("init streaming coordinator done") - } - s.handler = newServerHandler(s) // check whether old node exist, if yes suspend auto balance until all old nodes down @@ -445,10 +416,6 @@ func (s *Server) Start() error { if !s.enableActiveStandBy { s.startDataCoord() log.Info("DataCoord startup successfully") - if s.streamingCoord != nil { - s.streamingCoord.Start() - log.Info("StreamingCoord stratup successfully") - } } return nil @@ -1102,12 +1069,6 @@ func (s *Server) Stop() error { s.garbageCollector.close() log.Info("datacoord garbage collector stopped") - if s.streamingCoord != nil { - log.Info("StreamingCoord stoping...") - s.streamingCoord.Stop() - log.Info("StreamingCoord stopped") - } - s.stopServerLoop() s.importScheduler.Close() diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index 59343de041d35..eee57efe9423c 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -41,8 +41,6 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" _ "github.com/milvus-io/milvus/internal/util/grpcclient" - "github.com/milvus-io/milvus/internal/util/streamingutil" - streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util" @@ -190,7 +188,6 @@ func (s *Server) startGrpcLoop() { } return s.serverID.Load() }), - streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(), )), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( logutil.StreamTraceLoggerInterceptor, @@ -201,7 +198,6 @@ func (s *Server) startGrpcLoop() { } return s.serverID.Load() }), - streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(), )), grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), } @@ -210,10 +206,6 @@ func (s *Server) startGrpcLoop() { s.grpcServer = grpc.NewServer(grpcOpts...) indexpb.RegisterIndexCoordServer(s.grpcServer, s) datapb.RegisterDataCoordServer(s.grpcServer, s) - // register the streaming coord grpc service. - if streamingutil.IsStreamingServiceEnabled() { - s.dataCoord.RegisterStreamingCoordGRPCService(s.grpcServer) - } coordclient.RegisterDataCoordServer(s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) if err := s.grpcServer.Serve(s.listener); err != nil { diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index 14daf1eb66be3..d86b8dcb22799 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -40,6 +40,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" _ "github.com/milvus-io/milvus/internal/util/grpcclient" + streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util" @@ -214,12 +215,6 @@ func (s *Server) init() error { log.Info("Connected to tikv. Using tikv as metadata storage.") } - err = s.startGrpc() - if err != nil { - return err - } - log.Info("grpc init done ...") - if s.newDataCoordClient != nil { log.Info("RootCoord start to create DataCoord client") dataCoord := s.newDataCoordClient(s.ctx) @@ -238,7 +233,17 @@ func (s *Server) init() error { } } - return s.rootCoord.Init() + if err := s.rootCoord.Init(); err != nil { + return err + } + log.Info("RootCoord init done ...") + + err = s.startGrpc() + if err != nil { + return err + } + log.Info("grpc init done ...") + return nil } func (s *Server) startGrpc() error { @@ -281,6 +286,7 @@ func (s *Server) startGrpcLoop() { } return s.serverID.Load() }), + streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(), )), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( logutil.StreamTraceLoggerInterceptor, @@ -291,6 +297,7 @@ func (s *Server) startGrpcLoop() { } return s.serverID.Load() }), + streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(), )), grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), } @@ -298,6 +305,7 @@ func (s *Server) startGrpcLoop() { grpcOpts = append(grpcOpts, utils.EnableInternalTLS("RootCoord")) s.grpcServer = grpc.NewServer(grpcOpts...) rootcoordpb.RegisterRootCoordServer(s.grpcServer, s) + s.rootCoord.RegisterStreamingCoordGRPCService(s.grpcServer) coordclient.RegisterRootCoordServer(s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 708e365ae7920..9ce81bbf74893 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -33,12 +33,14 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/coordinator/coordclient" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/kv/tikv" "github.com/milvus-io/milvus/internal/metastore" @@ -48,6 +50,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server" tso2 "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" @@ -85,7 +88,7 @@ var Params *paramtable.ComponentParam = paramtable.Get() type Opt func(*Core) -type metaKVCreator func() (kv.MetaKv, error) +type metaKVCreator func() kv.MetaKv // Core root coordinator core type Core struct { @@ -131,6 +134,8 @@ type Core struct { activateFunc func() error metricsRequest *metricsinfo.MetricsRequest + + streamingCoord *streamingcoord.Server } // --------------------- function -------------------------- @@ -328,19 +333,28 @@ func (c *Core) initSession() error { func (c *Core) initKVCreator() { if c.metaKVCreator == nil { if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV { - c.metaKVCreator = func() (kv.MetaKv, error) { + c.metaKVCreator = func() kv.MetaKv { return tikv.NewTiKV(c.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue(), - tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil + tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond))) } } else { - c.metaKVCreator = func() (kv.MetaKv, error) { + c.metaKVCreator = func() kv.MetaKv { return etcdkv.NewEtcdKV(c.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(), - etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil + etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))) } } } } +func (c *Core) initStreamingCoord() { + c.streamingCoord = streamingcoord.NewServerBuilder(). + WithETCD(c.etcdCli). + WithMetaKV(c.metaKVCreator()). + WithSession(c.session). + WithRootCoordClient(coordclient.MustGetLocalRootCoordClientFuture()). + Build() +} + func (c *Core) initMetaTable() error { log := log.Ctx(c.ctx) fn := func() error { @@ -350,28 +364,20 @@ func (c *Core) initMetaTable() error { switch Params.MetaStoreCfg.MetaStoreType.GetValue() { case util.MetaStoreTypeEtcd: log.Info("Using etcd as meta storage.") - var metaKV kv.MetaKv var ss *kvmetestore.SuffixSnapshot var err error - if metaKV, err = c.metaKVCreator(); err != nil { - return err - } - + metaKV := c.metaKVCreator() if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.EtcdCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil { return err } catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss} case util.MetaStoreTypeTiKV: log.Info("Using tikv as meta storage.") - var metaKV kv.MetaKv var ss *kvmetestore.SuffixSnapshot var err error - if metaKV, err = c.metaKVCreator(); err != nil { - return err - } - + metaKV := c.metaKVCreator() if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.TiKVCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil { return err } @@ -441,7 +447,6 @@ func (c *Core) initTSOAllocator() error { func (c *Core) initInternal() error { log := log.Ctx(c.ctx) c.UpdateStateCode(commonpb.StateCode_Initializing) - c.initKVCreator() if err := c.initIDAllocator(); err != nil { return err @@ -469,6 +474,10 @@ func (c *Core) initInternal() error { c.garbageCollector = newBgGarbageCollector(c) c.stepExecutor = newBgStepExecutor(c.ctx) + if err := c.streamingCoord.Start(c.ctx); err != nil { + log.Info("start streaming coord failed", zap.Error(err)) + return err + } if !streamingutil.IsStreamingServiceEnabled() { c.proxyWatcher = proxyutil.NewProxyWatcher( c.etcdCli, @@ -522,6 +531,8 @@ func (c *Core) Init() error { if err := c.initSession(); err != nil { return err } + c.initKVCreator() + c.initStreamingCoord() if c.enableActiveStandBy { c.activateFunc = func() error { @@ -814,6 +825,8 @@ func (c *Core) Stop() error { c.UpdateStateCode(commonpb.StateCode_Abnormal) c.stopExecutor() c.stopScheduler() + + c.streamingCoord.Stop() if c.proxyWatcher != nil { c.proxyWatcher.Stop() } @@ -3422,3 +3435,8 @@ func (c *Core) getPrivilegeGroups(ctx context.Context) ([]*milvuspb.PrivilegeGro } return allGroups, nil } + +// RegisterStreamingCoordGRPCService registers the grpc service of streaming coordinator. +func (s *Core) RegisterStreamingCoordGRPCService(server *grpc.Server) { + s.streamingCoord.RegisterGRPCService(server) +} diff --git a/internal/streamingcoord/client/client.go b/internal/streamingcoord/client/client.go index 4b6186186b388..83a55fd107159 100644 --- a/internal/streamingcoord/client/client.go +++ b/internal/streamingcoord/client/client.go @@ -44,8 +44,8 @@ type Client interface { // NewClient creates a new client. func NewClient(etcdCli *clientv3.Client) Client { // StreamingCoord is deployed on DataCoord node. - role := sessionutil.GetSessionPrefixByRole(typeutil.DataCoordRole) - rb := resolver.NewSessionBuilder(etcdCli, role) + role := sessionutil.GetSessionPrefixByRole(typeutil.RootCoordRole) + rb := resolver.NewSessionExclusiveBuilder(etcdCli, role) dialTimeout := paramtable.Get().StreamingCoordGrpcClientCfg.DialTimeout.GetAsDuration(time.Millisecond) dialOptions := getDialOptions(rb) conn := lazygrpc.NewConn(func(ctx context.Context) (*grpc.ClientConn, error) { @@ -53,7 +53,7 @@ func NewClient(etcdCli *clientv3.Client) Client { defer cancel() return grpc.DialContext( ctx, - resolver.SessionResolverScheme+":///"+typeutil.DataCoordRole, + resolver.SessionResolverScheme+":///"+typeutil.RootCoordRole, dialOptions..., ) }) diff --git a/internal/streamingcoord/server/balancer/balancer_impl.go b/internal/streamingcoord/server/balancer/balancer_impl.go index f40bbcb3c62b4..7a263a210054f 100644 --- a/internal/streamingcoord/server/balancer/balancer_impl.go +++ b/internal/streamingcoord/server/balancer/balancer_impl.go @@ -32,7 +32,7 @@ func RecoverBalancer( } b := &balancerImpl{ lifetime: typeutil.NewLifetime(), - logger: log.With(zap.String("policy", policy)), + logger: resource.Resource().Logger().With(log.FieldComponent("balancer"), zap.String("policy", policy)), channelMetaManager: manager, policy: mustGetPolicy(policy), reqCh: make(chan *request, 5), diff --git a/internal/streamingcoord/server/builder.go b/internal/streamingcoord/server/builder.go index f15ca49ccdbc7..4d2215b6df638 100644 --- a/internal/streamingcoord/server/builder.go +++ b/internal/streamingcoord/server/builder.go @@ -7,17 +7,18 @@ import ( "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer" "github.com/milvus-io/milvus/internal/streamingcoord/server/resource" "github.com/milvus-io/milvus/internal/streamingcoord/server/service" - "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/kv" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/syncutil" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) type ServerBuilder struct { - etcdClient *clientv3.Client - metaKV kv.MetaKv - session sessionutil.SessionInterface + etcdClient *clientv3.Client + metaKV kv.MetaKv + session sessionutil.SessionInterface + rootCoordClient *syncutil.Future[types.RootCoordClient] } func NewServerBuilder() *ServerBuilder { @@ -34,6 +35,11 @@ func (b *ServerBuilder) WithMetaKV(metaKV kv.MetaKv) *ServerBuilder { return b } +func (b *ServerBuilder) WithRootCoordClient(rootCoordClient *syncutil.Future[types.RootCoordClient]) *ServerBuilder { + b.rootCoordClient = rootCoordClient + return b +} + func (b *ServerBuilder) WithSession(session sessionutil.SessionInterface) *ServerBuilder { b.session = session return b @@ -43,12 +49,13 @@ func (s *ServerBuilder) Build() *Server { resource.Init( resource.OptETCD(s.etcdClient), resource.OptStreamingCatalog(streamingcoord.NewCataLog(s.metaKV)), + resource.OptRootCoordClient(s.rootCoordClient), ) balancer := syncutil.NewFuture[balancer.Balancer]() return &Server{ - session: s.session, - componentStateService: componentutil.NewComponentStateService(typeutil.StreamingCoordRole), - assignmentService: service.NewAssignmentService(balancer), - balancer: balancer, + logger: resource.Resource().Logger().With(log.FieldComponent("server")), + session: s.session, + assignmentService: service.NewAssignmentService(balancer), + balancer: balancer, } } diff --git a/internal/streamingcoord/server/resource/resource.go b/internal/streamingcoord/server/resource/resource.go index 722a0342db514..89b8dee5730c1 100644 --- a/internal/streamingcoord/server/resource/resource.go +++ b/internal/streamingcoord/server/resource/resource.go @@ -7,6 +7,10 @@ import ( "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/streamingnode/client/manager" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) var r *resourceImpl // singleton resource instance @@ -21,6 +25,13 @@ func OptETCD(etcd *clientv3.Client) optResourceInit { } } +// OptRootCoordClient provides the root coordinator client to the resource. +func OptRootCoordClient(rootCoordClient *syncutil.Future[types.RootCoordClient]) optResourceInit { + return func(r *resourceImpl) { + r.rootCoordClient = rootCoordClient + } +} + // OptStreamingCatalog provides streaming catalog to the resource. func OptStreamingCatalog(catalog metastore.StreamingCoordCataLog) optResourceInit { return func(r *resourceImpl) { @@ -31,10 +42,13 @@ func OptStreamingCatalog(catalog metastore.StreamingCoordCataLog) optResourceIni // Init initializes the singleton of resources. // Should be call when streaming node startup. func Init(opts ...optResourceInit) { - newR := &resourceImpl{} + newR := &resourceImpl{ + logger: log.With(log.FieldModule(typeutil.StreamingCoordRole)), + } for _, opt := range opts { opt(newR) } + assertNotNil(newR.RootCoordClient()) assertNotNil(newR.ETCD()) assertNotNil(newR.StreamingCatalog()) newR.streamingNodeManagerClient = manager.NewManagerClient(newR.etcdClient) @@ -50,9 +64,16 @@ func Resource() *resourceImpl { // resourceImpl is a basic resource dependency for streamingnode server. // All utility on it is concurrent-safe and singleton. type resourceImpl struct { + rootCoordClient *syncutil.Future[types.RootCoordClient] etcdClient *clientv3.Client streamingCatalog metastore.StreamingCoordCataLog streamingNodeManagerClient manager.ManagerClient + logger *log.MLogger +} + +// RootCoordClient returns the root coordinator client. +func (r *resourceImpl) RootCoordClient() *syncutil.Future[types.RootCoordClient] { + return r.rootCoordClient } // StreamingCatalog returns the StreamingCatalog client. @@ -70,6 +91,10 @@ func (r *resourceImpl) StreamingNodeManagerClient() manager.ManagerClient { return r.streamingNodeManagerClient } +func (r *resourceImpl) Logger() *log.MLogger { + return r.logger +} + // assertNotNil panics if the resource is nil. func assertNotNil(v interface{}) { iv := reflect.ValueOf(v) diff --git a/internal/streamingcoord/server/resource/test_utility.go b/internal/streamingcoord/server/resource/test_utility.go index 6ac82884d1edb..fad9918eb16a6 100644 --- a/internal/streamingcoord/server/resource/test_utility.go +++ b/internal/streamingcoord/server/resource/test_utility.go @@ -5,6 +5,7 @@ package resource import ( "github.com/milvus-io/milvus/internal/streamingnode/client/manager" + "github.com/milvus-io/milvus/pkg/log" ) // OptStreamingManagerClient provides streaming manager client to the resource. @@ -16,7 +17,9 @@ func OptStreamingManagerClient(c manager.ManagerClient) optResourceInit { // InitForTest initializes the singleton of resources for test. func InitForTest(opts ...optResourceInit) { - r = &resourceImpl{} + r = &resourceImpl{ + logger: log.With(), + } for _, opt := range opts { opt(r) } diff --git a/internal/streamingcoord/server/server.go b/internal/streamingcoord/server/server.go index 9e3db0ae4172f..2b9e50f3c2be4 100644 --- a/internal/streamingcoord/server/server.go +++ b/internal/streamingcoord/server/server.go @@ -9,72 +9,76 @@ import ( "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer" _ "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/policy" // register the balancer policy "github.com/milvus-io/milvus/internal/streamingcoord/server/service" - "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/internal/util/streamingutil/util" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/syncutil" ) // Server is the streamingcoord server. type Server struct { + logger *log.MLogger + // session of current server. session sessionutil.SessionInterface // service level variables. - assignmentService service.AssignmentService - componentStateService *componentutil.ComponentStateService // state. + assignmentService service.AssignmentService // basic component variables can be used at service level. balancer *syncutil.Future[balancer.Balancer] } // Init initializes the streamingcoord server. -func (s *Server) Init(ctx context.Context) (err error) { - log.Info("init streamingcoord server...") - - // Init all underlying component of streamingcoord server. +func (s *Server) Start(ctx context.Context) (err error) { + s.logger.Info("init streamingcoord...") if err := s.initBasicComponent(ctx); err != nil { - log.Error("init basic component of streamingcoord server failed", zap.Error(err)) + s.logger.Warn("init basic component of streamingcoord failed", zap.Error(err)) return err } // Init all grpc service of streamingcoord server. - s.componentStateService.OnInitialized(s.session.GetServerID()) - log.Info("streamingcoord server initialized") + s.logger.Info("streamingcoord initialized") return nil } // initBasicComponent initialize all underlying dependency for streamingcoord. -func (s *Server) initBasicComponent(ctx context.Context) error { - // Init balancer - var err error - // Read new incoming topics from configuration, and register it into balancer. - newIncomingTopics := util.GetAllTopicsFromConfiguration() - balancer, err := balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...) - if err != nil { - return err +func (s *Server) initBasicComponent(ctx context.Context) (err error) { + if streamingutil.IsStreamingServiceEnabled() { + fBalancer := conc.Go(func() (struct{}, error) { + s.logger.Info("start recovery balancer...") + // Read new incoming topics from configuration, and register it into balancer. + newIncomingTopics := util.GetAllTopicsFromConfiguration() + balancer, err := balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...) + if err != nil { + s.logger.Warn("recover balancer failed", zap.Error(err)) + return struct{}{}, err + } + s.balancer.Set(balancer) + s.logger.Info("recover balancer done") + return struct{}{}, nil + }) + return conc.AwaitAll(fBalancer) } - s.balancer.Set(balancer) - return err + return nil } -// registerGRPCService register all grpc service to grpc server. +// RegisterGRPCService register all grpc service to grpc server. func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) { - streamingpb.RegisterStreamingCoordAssignmentServiceServer(grpcServer, s.assignmentService) - streamingpb.RegisterStreamingCoordStateServiceServer(grpcServer, s.componentStateService) -} - -// Start starts the streamingcoord server. -func (s *Server) Start() { - // Just do nothing now. - log.Info("start streamingcoord server") + if streamingutil.IsStreamingServiceEnabled() { + streamingpb.RegisterStreamingCoordAssignmentServiceServer(grpcServer, s.assignmentService) + } } -// Stop stops the streamingcoord server. +// Close closes the streamingcoord server. func (s *Server) Stop() { - s.componentStateService.OnStopping() - log.Info("close balancer...") - s.balancer.Get().Close() - log.Info("streamingcoord server stopped") + if s.balancer.Ready() { + s.logger.Info("start close balancer...") + s.balancer.Get().Close() + } else { + s.logger.Info("balancer not ready, skip close") + } + s.logger.Info("streamingcoord server stopped") } diff --git a/internal/streamingcoord/server/service/discover/discover_server.go b/internal/streamingcoord/server/service/discover/discover_server.go index ed60650f9b02a..d4aaa48b4ebf2 100644 --- a/internal/streamingcoord/server/service/discover/discover_server.go +++ b/internal/streamingcoord/server/service/discover/discover_server.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer" + "github.com/milvus-io/milvus/internal/streamingcoord/server/resource" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/types" @@ -27,7 +28,7 @@ func NewAssignmentDiscoverServer( streamServer: discoverGrpcServerHelper{ streamServer, }, - logger: log.With(), + logger: resource.Resource().Logger().With(log.FieldComponent("assignment-discover-server")), } } diff --git a/internal/streamingcoord/server/server_test.go b/internal/streamingcoord/server/service_test.go similarity index 80% rename from internal/streamingcoord/server/server_test.go rename to internal/streamingcoord/server/service_test.go index e14907dacd4f7..f303851c82362 100644 --- a/internal/streamingcoord/server/server_test.go +++ b/internal/streamingcoord/server/service_test.go @@ -8,9 +8,11 @@ import ( "github.com/stretchr/testify/assert" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/syncutil" ) func TestServer(t *testing.T) { @@ -27,16 +29,15 @@ func TestServer(t *testing.T) { b := NewServerBuilder() metaKV := etcdkv.NewEtcdKV(c, "test") s := sessionutil.NewMockSession(t) - s.EXPECT().GetServerID().Return(1) - + f := syncutil.NewFuture[types.RootCoordClient]() newServer := b.WithETCD(c). WithMetaKV(metaKV). WithSession(s). + WithRootCoordClient(f). Build() ctx := context.Background() - err = newServer.Init(ctx) + err = newServer.Start(ctx) assert.NoError(t, err) - newServer.Start() newServer.Stop() } diff --git a/internal/types/types.go b/internal/types/types.go index d13ea19a60858..fb48de650e3ec 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -121,8 +121,6 @@ type DataCoord interface { type DataCoordComponent interface { DataCoord - RegisterStreamingCoordGRPCService(s *grpc.Server) - SetAddress(address string) // SetEtcdClient set EtcdClient for DataCoord // `etcdClient` is a client of etcd @@ -213,6 +211,8 @@ type RootCoordComponent interface { // GetMetrics notifies RootCoordComponent to collect metrics for specified component GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) + + RegisterStreamingCoordGRPCService(server *grpc.Server) } // ProxyClient is the client interface for proxy server diff --git a/internal/util/streamingutil/service/discoverer/session_discoverer.go b/internal/util/streamingutil/service/discoverer/session_discoverer.go index d93c412348fb3..d770f84393236 100644 --- a/internal/util/streamingutil/service/discoverer/session_discoverer.go +++ b/internal/util/streamingutil/service/discoverer/session_discoverer.go @@ -17,12 +17,13 @@ import ( ) // NewSessionDiscoverer returns a new Discoverer for the milvus session registration. -func NewSessionDiscoverer(etcdCli *clientv3.Client, prefix string, minimumVersion string) Discoverer { +func NewSessionDiscoverer(etcdCli *clientv3.Client, prefix string, exclusive bool, minimumVersion string) Discoverer { return &sessionDiscoverer{ etcdCli: etcdCli, prefix: prefix, + exclusive: exclusive, versionRange: semver.MustParseRange(">=" + minimumVersion), - logger: log.With(zap.String("prefix", prefix), zap.String("expectedVersion", minimumVersion)), + logger: log.With(zap.String("prefix", prefix), zap.Bool("exclusive", exclusive), zap.String("expectedVersion", minimumVersion)), revision: 0, peerSessions: make(map[string]*sessionutil.SessionRaw), } @@ -32,6 +33,7 @@ func NewSessionDiscoverer(etcdCli *clientv3.Client, prefix string, minimumVersio type sessionDiscoverer struct { etcdCli *clientv3.Client prefix string + exclusive bool // if exclusive, only one session is allowed, not use the prefix, only use the role directly. logger *log.MLogger versionRange semver.Range revision int64 @@ -64,12 +66,15 @@ func (sw *sessionDiscoverer) Discover(ctx context.Context, cb func(VersionedStat // watch performs the watch on etcd. func (sw *sessionDiscoverer) watch(ctx context.Context, cb func(VersionedState) error) error { + opts := []clientv3.OpOption{clientv3.WithRev(sw.revision + 1)} + if !sw.exclusive { + opts = append(opts, clientv3.WithPrefix()) + } // start a watcher at background. eventCh := sw.etcdCli.Watch( ctx, sw.prefix, - clientv3.WithPrefix(), - clientv3.WithRev(sw.revision+1), + opts..., ) for { @@ -124,7 +129,11 @@ func (sw *sessionDiscoverer) handleETCDEvent(resp clientv3.WatchResponse) error // initDiscover initializes the discoverer if needed. func (sw *sessionDiscoverer) initDiscover(ctx context.Context) error { - resp, err := sw.etcdCli.Get(ctx, sw.prefix, clientv3.WithPrefix(), clientv3.WithSerializable()) + opts := []clientv3.OpOption{clientv3.WithSerializable()} + if !sw.exclusive { + opts = append(opts, clientv3.WithPrefix()) + } + resp, err := sw.etcdCli.Get(ctx, sw.prefix, opts...) if err != nil { return err } diff --git a/internal/util/streamingutil/service/discoverer/session_discoverer_test.go b/internal/util/streamingutil/service/discoverer/session_discoverer_test.go index 723bcc29e910e..aaeb726e6afe8 100644 --- a/internal/util/streamingutil/service/discoverer/session_discoverer_test.go +++ b/internal/util/streamingutil/service/discoverer/session_discoverer_test.go @@ -25,7 +25,7 @@ func TestSessionDiscoverer(t *testing.T) { etcdClient, err := etcd.GetEmbedEtcdClient() assert.NoError(t, err) targetVersion := "0.1.0" - d := NewSessionDiscoverer(etcdClient, "session/", targetVersion) + d := NewSessionDiscoverer(etcdClient, "session/", false, targetVersion) s := d.NewVersionedState() assert.True(t, s.Version.EQ(typeutil.VersionInt64(-1))) @@ -95,7 +95,7 @@ func TestSessionDiscoverer(t *testing.T) { assert.ErrorIs(t, err, io.EOF) // Do a init discover here. - d = NewSessionDiscoverer(etcdClient, "session/", targetVersion) + d = NewSessionDiscoverer(etcdClient, "session/", false, targetVersion) err = d.Discover(ctx, func(state VersionedState) error { // balance attributes sessions := state.Sessions() diff --git a/internal/util/streamingutil/service/resolver/builder.go b/internal/util/streamingutil/service/resolver/builder.go index 9412d84e9c6a5..ca6af04552aec 100644 --- a/internal/util/streamingutil/service/resolver/builder.go +++ b/internal/util/streamingutil/service/resolver/builder.go @@ -28,9 +28,15 @@ func NewChannelAssignmentBuilder(w types.AssignmentDiscoverWatcher) Builder { } // NewSessionBuilder creates a new resolver builder. +// Multiple sessions are allowed, use the role as prefix. func NewSessionBuilder(c *clientv3.Client, role string) Builder { - // TODO: use 2.5.0 after 2.5.0 released. - return newBuilder(SessionResolverScheme, discoverer.NewSessionDiscoverer(c, role, "2.4.0")) + return newBuilder(SessionResolverScheme, discoverer.NewSessionDiscoverer(c, role, false, "2.4.0")) +} + +// NewSessionExclusiveBuilder creates a new resolver builder with exclusive. +// Only one session is allowed, not use the prefix, only use the role directly. +func NewSessionExclusiveBuilder(c *clientv3.Client, role string) Builder { + return newBuilder(SessionResolverScheme, discoverer.NewSessionDiscoverer(c, role, true, "2.4.0")) } // newBuilder creates a new resolver builder. diff --git a/pkg/log/fields.go b/pkg/log/fields.go new file mode 100644 index 0000000000000..ed3100c4f5039 --- /dev/null +++ b/pkg/log/fields.go @@ -0,0 +1,18 @@ +package log + +import "go.uber.org/zap" + +const ( + FieldNameModule = "module" + FieldNameComponent = "component" +) + +// FieldModule returns a zap field with the module name. +func FieldModule(module string) zap.Field { + return zap.String(FieldNameModule, module) +} + +// FieldComponent returns a zap field with the component name. +func FieldComponent(component string) zap.Field { + return zap.String(FieldNameComponent, component) +} diff --git a/pkg/log/global.go b/pkg/log/global.go index 6a00a8d4d25ba..2fabc4e7c3f00 100644 --- a/pkg/log/global.go +++ b/pkg/log/global.go @@ -141,7 +141,7 @@ func WithReqID(ctx context.Context, reqID int64) context.Context { // WithModule adds given module field to the logger in ctx func WithModule(ctx context.Context, module string) context.Context { - fields := []zap.Field{zap.String("module", module)} + fields := []zap.Field{zap.String(FieldNameModule, module)} return WithFields(ctx, fields...) }