Skip to content

Commit

Permalink
enhance: add broadcast operation for msgstream (#39040)
Browse files Browse the repository at this point in the history
issue: #38399

- make broadcast service available for msgstream by reusing the
architecture streaming service

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Jan 14, 2025
1 parent da1b786 commit fd84ed8
Show file tree
Hide file tree
Showing 27 changed files with 435 additions and 158 deletions.
7 changes: 2 additions & 5 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/internal/util/initcore"
internalmetrics "github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -389,10 +388,8 @@ func (mr *MilvusRoles) Run() {
tracer.Init()

// Initialize streaming service if enabled.
if streamingutil.IsStreamingServiceEnabled() {
streaming.Init()
defer streaming.Release()
}
streaming.Init()
defer streaming.Release()

coordclient.EnableLocalClientRole(&coordclient.LocalClientRoleConfig{
ServerType: mr.ServerType,
Expand Down
2 changes: 2 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,8 @@ streaming:
# It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration
backoffInitialInterval: 50ms
backoffMultiplier: 2 # The multiplier of balance task trigger backoff, 2 by default
walBroadcaster:
concurrencyRatio: 1 # The concurrency ratio based on number of CPU for wal broadcaster, 1 by default.
txn:
defaultKeepaliveTimeout: 10s # The default keepalive timeout for wal txn, 10s by default

Expand Down
3 changes: 3 additions & 0 deletions internal/distributed/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
Expand All @@ -17,6 +18,8 @@ var singleton WALAccesser = nil
func Init() {
c, _ := kvfactory.GetEtcdAndPath()
singleton = newWALAccesser(c)
// Add the wal accesser to the broadcaster registry for making broadcast operation.
registry.Register(registry.AppendOperatorTypeStreaming, singleton)
}

// Release releases the resources of the wal accesser.
Expand Down
5 changes: 5 additions & 0 deletions internal/distributed/streaming/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
Expand Down Expand Up @@ -34,6 +35,10 @@ func TestStreamingProduce(t *testing.T) {
PartitionIds: []int64{1, 2, 3},
}).
WithBody(&msgpb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateCollection,
Timestamp: 1,
},
CollectionID: 1,
}).
WithBroadcast(vChannels).
Expand Down
79 changes: 14 additions & 65 deletions internal/distributed/streaming/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)

type (
AppendResponses = types.AppendResponses
AppendResponse = types.AppendResponse
)

// AppendMessagesToWAL appends messages to the wal.
// It it a helper utility function to append messages to the wal.
// If the messages is belong to one vchannel, it will be sent as a transaction.
Expand All @@ -26,7 +31,7 @@ func (u *walAccesserImpl) AppendMessages(ctx context.Context, msgs ...message.Mu

// Otherwise append the messages concurrently.
mu := &sync.Mutex{}
resp := newAppendResponseN(len(msgs))
resp := types.NewAppendResponseN(len(msgs))

wg := &sync.WaitGroup{}
wg.Add(len(dispatchedMessages))
Expand All @@ -39,7 +44,7 @@ func (u *walAccesserImpl) AppendMessages(ctx context.Context, msgs ...message.Mu
singleResp := u.appendToVChannel(ctx, vchannel, msgs...)
mu.Lock()
for i, idx := range idxes {
resp.fillResponseAtIdx(singleResp.Responses[i], idx)
resp.FillResponseAtIdx(singleResp.Responses[i], idx)
}
mu.Unlock()
return struct{}{}, nil
Expand Down Expand Up @@ -76,17 +81,17 @@ func (u *walAccesserImpl) dispatchMessages(msgs ...message.MutableMessage) (map[
// appendToVChannel appends the messages to the specified vchannel.
func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string, msgs ...message.MutableMessage) AppendResponses {
if len(msgs) == 0 {
return newAppendResponseN(0)
return types.NewAppendResponseN(0)
}
resp := newAppendResponseN(len(msgs))
resp := types.NewAppendResponseN(len(msgs))

// if only one message here, append it directly, no more goroutine needed.
// at most time, there's only one message here.
// TODO: only the partition-key with high partition will generate many message in one time on the same pchannel,
// we should optimize the message-format, make it into one; but not the goroutine count.
if len(msgs) == 1 {
appendResult, err := u.appendToWAL(ctx, msgs[0])
resp.fillResponseAtIdx(AppendResponse{
resp.FillResponseAtIdx(AppendResponse{
AppendResult: appendResult,
Error: err,
}, 0)
Expand All @@ -99,7 +104,7 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string,
VChannel: vchannel,
})
if err != nil {
resp.fillAllError(err)
resp.FillAllError(err)
return resp
}

Expand All @@ -115,7 +120,7 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string,
defer wg.Done()
if err := txn.Append(ctx, msg); err != nil {
mu.Lock()
resp.fillResponseAtIdx(AppendResponse{
resp.FillResponseAtIdx(AppendResponse{
Error: err,
}, i)
mu.Unlock()
Expand All @@ -129,75 +134,19 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string,
// and fill the error with the first error.
if err := resp.UnwrapFirstError(); err != nil {
_ = txn.Rollback(ctx) // rollback failure can be ignored.
resp.fillAllError(err)
resp.FillAllError(err)
return resp
}

// commit the transaction and fill the response.
appendResult, err := txn.Commit(ctx)
resp.fillAllResponse(AppendResponse{
resp.FillAllResponse(AppendResponse{
AppendResult: appendResult,
Error: err,
})
return resp
}

// newAppendResponseN creates a new append response.
func newAppendResponseN(n int) AppendResponses {
return AppendResponses{
Responses: make([]AppendResponse, n),
}
}

// AppendResponse is the response of one append operation.
type AppendResponse struct {
AppendResult *types.AppendResult
Error error
}

// AppendResponses is the response of append operation.
type AppendResponses struct {
Responses []AppendResponse
}

func (a AppendResponses) MaxTimeTick() uint64 {
var maxTimeTick uint64
for _, r := range a.Responses {
if r.AppendResult != nil && r.AppendResult.TimeTick > maxTimeTick {
maxTimeTick = r.AppendResult.TimeTick
}
}
return maxTimeTick
}

// UnwrapFirstError returns the first error in the responses.
func (a AppendResponses) UnwrapFirstError() error {
for _, r := range a.Responses {
if r.Error != nil {
return r.Error
}
}
return nil
}

// fillAllError fills all the responses with the same error.
func (a *AppendResponses) fillAllError(err error) {
for i := range a.Responses {
a.Responses[i].Error = err
}
}

// fillResponseAtIdx fill the response at idx
func (a *AppendResponses) fillResponseAtIdx(resp AppendResponse, idx int) {
a.Responses[idx] = resp
}

func (a *AppendResponses) fillAllResponse(resp AppendResponse) {
for i := range a.Responses {
a.Responses[i] = resp
}
}

// applyOpt applies the append options to the message.
func applyOpt(msg message.MutableMessage, opts ...AppendOption) message.MutableMessage {
if len(opts) == 0 {
Expand Down
7 changes: 6 additions & 1 deletion internal/distributed/streaming/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/distributed/streaming/internal/producer"
"github.com/milvus-io/milvus/internal/streamingcoord/client"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
Expand All @@ -27,7 +28,11 @@ func newWALAccesser(c *clientv3.Client) *walAccesserImpl {
// Create a new streaming coord client.
streamingCoordClient := client.NewClient(c)
// Create a new streamingnode handler client.
handlerClient := handler.NewHandlerClient(streamingCoordClient.Assignment())
var handlerClient handler.HandlerClient
if streamingutil.IsStreamingServiceEnabled() {
// streaming service is enabled, create the handler client for the streaming service.
handlerClient = handler.NewHandlerClient(streamingCoordClient.Assignment())
}
return &walAccesserImpl{
lifetime: typeutil.NewLifetime(),
streamingCoordClient: streamingCoordClient,
Expand Down
105 changes: 105 additions & 0 deletions internal/rootcoord/broadcast_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package rootcoord

import (
"context"

"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

var _ task = (*broadcastTask)(nil)

// newBroadcastTask creates a new broadcast task.
func newBroadcastTask(ctx context.Context, core *Core, msgs []message.MutableMessage) *broadcastTask {
return &broadcastTask{
baseTask: newBaseTask(ctx, core),
msgs: msgs,
}
}

// BroadcastTask is used to implement the broadcast operation based on the msgstream
// by using the streaming service interface.
// msgstream will be deprecated since 2.6.0 with streaming service, so those code will be removed in the future version.
type broadcastTask struct {
baseTask
msgs []message.MutableMessage // The message wait for broadcast
walName string
resultFuture *syncutil.Future[types.AppendResponses]
}

func (b *broadcastTask) Execute(ctx context.Context) error {
result := types.NewAppendResponseN(len(b.msgs))
defer func() {
b.resultFuture.Set(result)
}()

for idx, msg := range b.msgs {
tsMsg, err := adaptor.NewMsgPackFromMutableMessageV1(msg)
if err != nil {
result.FillResponseAtIdx(types.AppendResponse{Error: err}, idx)
return err
}
pchannel := funcutil.ToPhysicalChannel(msg.VChannel())
msgID, err := b.core.chanTimeTick.broadcastMarkDmlChannels([]string{pchannel}, &msgstream.MsgPack{
BeginTs: b.ts,
EndTs: b.ts,
Msgs: []msgstream.TsMsg{tsMsg},
})
if err != nil {
result.FillResponseAtIdx(types.AppendResponse{Error: err}, idx)
continue
}
result.FillResponseAtIdx(types.AppendResponse{
AppendResult: &types.AppendResult{
MessageID: adaptor.MustGetMessageIDFromMQWrapperIDBytes(b.walName, msgID[pchannel]),
TimeTick: b.ts,
},
}, idx)
}
return result.UnwrapFirstError()
}

func newMsgStreamAppendOperator(c *Core) *msgstreamAppendOperator {
return &msgstreamAppendOperator{
core: c,
walName: util.MustSelectWALName(),
}
}

// msgstreamAppendOperator the code of streamingcoord to make broadcast available on the legacy msgstream.
// Because msgstream is bound to the rootcoord task, so we transfer each broadcast operation into a ddl task.
// to make sure the timetick rule.
// The Msgstream will be deprecated since 2.6.0, so we make a single module to hold it.
type msgstreamAppendOperator struct {
core *Core
walName string
}

// AppendMessages implements the AppendOperator interface for broadcaster service at streaming service.
func (m *msgstreamAppendOperator) AppendMessages(ctx context.Context, msgs ...message.MutableMessage) types.AppendResponses {
t := &broadcastTask{
baseTask: newBaseTask(ctx, m.core),
msgs: msgs,
walName: m.walName,
resultFuture: syncutil.NewFuture[types.AppendResponses](),
}

if err := m.core.scheduler.AddTask(t); err != nil {
resp := types.NewAppendResponseN(len(msgs))
resp.FillAllError(err)
return resp
}

result, err := t.resultFuture.GetWithContext(ctx)
if err != nil {
resp := types.NewAppendResponseN(len(msgs))
resp.FillAllError(err)
return resp
}
return result
}
3 changes: 3 additions & 0 deletions internal/rootcoord/dml_channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ func (d *dmlChannels) broadcastMark(chanNames []string, pack *msgstream.MsgPack)
result[cn] = id.Serialize()
}
}
} else {
dms.mutex.RUnlock()
return nil, errors.Newf("channel not in use: %s", chanName)
}
dms.mutex.RUnlock()
}
Expand Down
5 changes: 5 additions & 0 deletions internal/rootcoord/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
tso2 "github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
Expand Down Expand Up @@ -766,6 +767,10 @@ func (c *Core) startInternal() error {
sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID)
log.Info("rootcoord startup successfully")

// regster the core as a appendoperator for broadcast service.
// TODO: should be removed at 2.6.0.
// Add the wal accesser to the broadcaster registry for making broadcast operation.
registry.Register(registry.AppendOperatorTypeMsgstream, newMsgStreamAppendOperator(c))
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions internal/rootcoord/root_coord_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
"github.com/milvus-io/milvus/internal/util/dependency"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/internal/util/proxyutil"
Expand Down Expand Up @@ -1356,6 +1357,7 @@ func TestCore_startTimeTickLoop(t *testing.T) {
func TestRootcoord_EnableActiveStandby(t *testing.T) {
randVal := rand.Int()
paramtable.Init()
registry.ResetRegistration()
Params.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal))
// Need to reset global etcd to follow new path
kvfactory.CloseEtcdClient()
Expand Down Expand Up @@ -1416,6 +1418,7 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) {
func TestRootcoord_DisableActiveStandby(t *testing.T) {
randVal := rand.Int()
paramtable.Init()
registry.ResetRegistration()
Params.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal))
// Need to reset global etcd to follow new path
kvfactory.CloseEtcdClient()
Expand Down
Loading

0 comments on commit fd84ed8

Please sign in to comment.