Skip to content

Commit

Permalink
enhance: move streaming coord from datacoord to rootcoord
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Jan 6, 2025
1 parent b9da6a5 commit 4b10eb3
Show file tree
Hide file tree
Showing 20 changed files with 201 additions and 158 deletions.
27 changes: 12 additions & 15 deletions internal/coordinator/coordclient/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ type LocalClientRoleConfig struct {

// EnableLocalClientRole init localable roles
func EnableLocalClientRole(cfg *LocalClientRoleConfig) {
if !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() {
return
}
if cfg.ServerType != typeutil.StandaloneRole && cfg.ServerType != typeutil.MixtureRole {
return
}
Expand All @@ -61,29 +58,20 @@ func EnableLocalClientRole(cfg *LocalClientRoleConfig) {

// RegisterQueryCoordServer register query coord server
func RegisterQueryCoordServer(server querypb.QueryCoordServer) {
if !enableLocal.EnableQueryCoord {
return
}
newLocalClient := grpcclient.NewLocalGRPCClient(&querypb.QueryCoord_ServiceDesc, server, querypb.NewQueryCoordClient)
glocalClient.queryCoordClient.Set(&nopCloseQueryCoordClient{newLocalClient})
log.Ctx(context.TODO()).Info("register query coord server", zap.Any("enableLocalClient", enableLocal))
}

// RegsterDataCoordServer register data coord server
func RegisterDataCoordServer(server datapb.DataCoordServer) {
if !enableLocal.EnableDataCoord {
return
}
newLocalClient := grpcclient.NewLocalGRPCClient(&datapb.DataCoord_ServiceDesc, server, datapb.NewDataCoordClient)
glocalClient.dataCoordClient.Set(&nopCloseDataCoordClient{newLocalClient})
log.Ctx(context.TODO()).Info("register data coord server", zap.Any("enableLocalClient", enableLocal))
}

// RegisterRootCoordServer register root coord server
func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) {
if !enableLocal.EnableRootCoord {
return
}
newLocalClient := grpcclient.NewLocalGRPCClient(&rootcoordpb.RootCoord_ServiceDesc, server, rootcoordpb.NewRootCoordClient)
glocalClient.rootCoordClient.Set(&nopCloseRootCoordClient{newLocalClient})
log.Ctx(context.TODO()).Info("register root coord server", zap.Any("enableLocalClient", enableLocal))
Expand All @@ -93,7 +81,7 @@ func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) {
func GetQueryCoordClient(ctx context.Context) types.QueryCoordClient {
var client types.QueryCoordClient
var err error
if enableLocal.EnableQueryCoord {
if enableLocal.EnableQueryCoord && paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() {
client, err = glocalClient.queryCoordClient.GetWithContext(ctx)
} else {
// TODO: we should make a singleton here. but most unittest rely on a dedicated client.
Expand All @@ -109,7 +97,7 @@ func GetQueryCoordClient(ctx context.Context) types.QueryCoordClient {
func GetDataCoordClient(ctx context.Context) types.DataCoordClient {
var client types.DataCoordClient
var err error
if enableLocal.EnableDataCoord {
if enableLocal.EnableDataCoord && paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() {
client, err = glocalClient.dataCoordClient.GetWithContext(ctx)
} else {
// TODO: we should make a singleton here. but most unittest rely on a dedicated client.
Expand All @@ -125,7 +113,7 @@ func GetDataCoordClient(ctx context.Context) types.DataCoordClient {
func GetRootCoordClient(ctx context.Context) types.RootCoordClient {
var client types.RootCoordClient
var err error
if enableLocal.EnableRootCoord {
if enableLocal.EnableRootCoord && paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() {
client, err = glocalClient.rootCoordClient.GetWithContext(ctx)
} else {
// TODO: we should make a singleton here. but most unittest rely on a dedicated client.
Expand All @@ -137,6 +125,15 @@ func GetRootCoordClient(ctx context.Context) types.RootCoordClient {
return client
}

// MustGetLocalRootCoordClientFuture return root coord client future,
// panic if root coord client is not enabled
func MustGetLocalRootCoordClientFuture() *syncutil.Future[types.RootCoordClient] {
if !enableLocal.EnableRootCoord {
panic("root coord client is not enabled")
}
return glocalClient.rootCoordClient
}

type nopCloseQueryCoordClient struct {
querypb.QueryCoordClient
}
Expand Down
7 changes: 0 additions & 7 deletions internal/coordinator/coordclient/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ func TestRegistry(t *testing.T) {
assert.False(t, enableLocal.EnableDataCoord)
assert.False(t, enableLocal.EnableRootCoord)

RegisterRootCoordServer(&rootcoordpb.UnimplementedRootCoordServer{})
RegisterDataCoordServer(&datapb.UnimplementedDataCoordServer{})
RegisterQueryCoordServer(&querypb.UnimplementedQueryCoordServer{})
assert.False(t, glocalClient.dataCoordClient.Ready())
assert.False(t, glocalClient.queryCoordClient.Ready())
assert.False(t, glocalClient.rootCoordClient.Ready())

enableLocal = &LocalClientRoleConfig{}

EnableLocalClientRole(&LocalClientRoleConfig{
Expand Down
39 changes: 0 additions & 39 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/client-go/v2/txnkv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
Expand All @@ -49,7 +48,6 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/sessionutil"
Expand Down Expand Up @@ -163,9 +161,6 @@ type Server struct {
// manage ways that data coord access other coord
broker broker.Broker

// streamingcoord server is embedding in datacoord now.
streamingCoord *streamingcoord.Server

metricsRequest *metricsinfo.MetricsRequest
}

Expand Down Expand Up @@ -312,12 +307,6 @@ func (s *Server) Init() error {
if err := s.initKV(); err != nil {
return err
}
if streamingutil.IsStreamingServiceEnabled() {
s.streamingCoord = streamingcoord.NewServerBuilder().
WithETCD(s.etcdCli).
WithMetaKV(s.kv).
WithSession(s.session).Build()
}
if s.enableActiveStandBy {
s.activateFunc = func() error {
log.Info("DataCoord switch from standby to active, activating")
Expand All @@ -327,11 +316,6 @@ func (s *Server) Init() error {
}
s.startDataCoord()
log.Info("DataCoord startup success")

if s.streamingCoord != nil {
s.streamingCoord.Start()
log.Info("StreamingCoord stratup successfully at standby mode")
}
return nil
}
s.stateCode.Store(commonpb.StateCode_StandBy)
Expand All @@ -342,10 +326,6 @@ func (s *Server) Init() error {
return s.initDataCoord()
}

func (s *Server) RegisterStreamingCoordGRPCService(server *grpc.Server) {
s.streamingCoord.RegisterGRPCService(server)
}

func (s *Server) initDataCoord() error {
log := log.Ctx(s.ctx)
s.stateCode.Store(commonpb.StateCode_Initializing)
Expand Down Expand Up @@ -376,15 +356,6 @@ func (s *Server) initDataCoord() error {
return err
}

// Initialize streaming coordinator.
if streamingutil.IsStreamingServiceEnabled() {

if err = s.streamingCoord.Init(context.TODO()); err != nil {
return err
}
log.Info("init streaming coordinator done")
}

s.handler = newServerHandler(s)

// check whether old node exist, if yes suspend auto balance until all old nodes down
Expand Down Expand Up @@ -445,10 +416,6 @@ func (s *Server) Start() error {
if !s.enableActiveStandBy {
s.startDataCoord()
log.Info("DataCoord startup successfully")
if s.streamingCoord != nil {
s.streamingCoord.Start()
log.Info("StreamingCoord stratup successfully")
}
}

return nil
Expand Down Expand Up @@ -1102,12 +1069,6 @@ func (s *Server) Stop() error {
s.garbageCollector.close()
log.Info("datacoord garbage collector stopped")

if s.streamingCoord != nil {
log.Info("StreamingCoord stoping...")
s.streamingCoord.Stop()
log.Info("StreamingCoord stopped")
}

s.stopServerLoop()

s.importScheduler.Close()
Expand Down
8 changes: 0 additions & 8 deletions internal/distributed/datacoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
_ "github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/streamingutil"
streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util"
Expand Down Expand Up @@ -190,7 +188,6 @@ func (s *Server) startGrpcLoop() {
}
return s.serverID.Load()
}),
streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
logutil.StreamTraceLoggerInterceptor,
Expand All @@ -201,7 +198,6 @@ func (s *Server) startGrpcLoop() {
}
return s.serverID.Load()
}),
streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}
Expand All @@ -210,10 +206,6 @@ func (s *Server) startGrpcLoop() {
s.grpcServer = grpc.NewServer(grpcOpts...)
indexpb.RegisterIndexCoordServer(s.grpcServer, s)
datapb.RegisterDataCoordServer(s.grpcServer, s)
// register the streaming coord grpc service.
if streamingutil.IsStreamingServiceEnabled() {
s.dataCoord.RegisterStreamingCoordGRPCService(s.grpcServer)
}
coordclient.RegisterDataCoordServer(s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(s.listener); err != nil {
Expand Down
22 changes: 15 additions & 7 deletions internal/distributed/rootcoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
_ "github.com/milvus-io/milvus/internal/util/grpcclient"
streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util"
Expand Down Expand Up @@ -214,12 +215,6 @@ func (s *Server) init() error {
log.Info("Connected to tikv. Using tikv as metadata storage.")
}

err = s.startGrpc()
if err != nil {
return err
}
log.Info("grpc init done ...")

if s.newDataCoordClient != nil {
log.Info("RootCoord start to create DataCoord client")
dataCoord := s.newDataCoordClient(s.ctx)
Expand All @@ -238,7 +233,17 @@ func (s *Server) init() error {
}
}

return s.rootCoord.Init()
if err := s.rootCoord.Init(); err != nil {
return err
}
log.Info("RootCoord init done ...")

err = s.startGrpc()
if err != nil {
return err
}
log.Info("grpc init done ...")
return nil
}

func (s *Server) startGrpc() error {
Expand Down Expand Up @@ -281,6 +286,7 @@ func (s *Server) startGrpcLoop() {
}
return s.serverID.Load()
}),
streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
logutil.StreamTraceLoggerInterceptor,
Expand All @@ -291,13 +297,15 @@ func (s *Server) startGrpcLoop() {
}
return s.serverID.Load()
}),
streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("RootCoord"))
s.grpcServer = grpc.NewServer(grpcOpts...)
rootcoordpb.RegisterRootCoordServer(s.grpcServer, s)
s.rootCoord.RegisterStreamingCoordGRPCService(s.grpcServer)
coordclient.RegisterRootCoordServer(s)

go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
Expand Down
Loading

0 comments on commit 4b10eb3

Please sign in to comment.