Skip to content

Commit

Permalink
enhance: add broadcast for streaming service
Browse files Browse the repository at this point in the history
- Add new rpc for transfer broadcast to streaming coord
- Add broadcast service at streaming coord to make broadcast message
  sent

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Jan 7, 2025
1 parent 52a6ad0 commit 1a4b760
Show file tree
Hide file tree
Showing 49 changed files with 1,595 additions and 93 deletions.
12 changes: 8 additions & 4 deletions internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ packages:
github.com/milvus-io/milvus/internal/streamingcoord/server/balancer:
interfaces:
Balancer:
github.com/milvus-io/milvus/internal/streamingnode/client/manager:
github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster:
interfaces:
ManagerClient:
AppendOperator:
github.com/milvus-io/milvus/internal/streamingcoord/client:
interfaces:
Client:
BroadcastService:
github.com/milvus-io/milvus/internal/streamingnode/client/manager:
interfaces:
ManagerClient:
github.com/milvus-io/milvus/internal/streamingnode/client/handler:
interfaces:
HandlerClient:
Expand Down Expand Up @@ -46,10 +50,10 @@ packages:
InterceptorWithReady:
InterceptorBuilder:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector:
interfaces:
interfaces:
SealOperator:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector:
interfaces:
interfaces:
TimeTickSyncOperator:
google.golang.org/grpc:
interfaces:
Expand Down
17 changes: 14 additions & 3 deletions internal/distributed/streaming/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ func (w *walAccesserImpl) appendToWAL(ctx context.Context, msg message.MutableMe
return p.Produce(ctx, msg)
}

func (w *walAccesserImpl) broadcastToWAL(ctx context.Context, msg message.BroadcastMutableMessage) (*types.BroadcastAppendResult, error) {
// The broadcast operation will be sent to the coordinator.
// The coordinator will dispatch the message to all the vchannels with an eventual consistency guarantee.
return w.streamingCoordClient.Broadcast().Broadcast(ctx, msg)
}

// createOrGetProducer creates or get a producer.
// vchannel in same pchannel can share the same producer.
func (w *walAccesserImpl) getProducer(pchannel string) *producer.ResumableProducer {
Expand All @@ -40,14 +46,19 @@ func assertValidMessage(msgs ...message.MutableMessage) {
if msg.MessageType().IsSystem() {
panic("system message is not allowed to append from client")
}
}
for _, msg := range msgs {
if msg.VChannel() == "" {
panic("vchannel is empty")
panic("we don't support sent all vchannel message at client now")

Check warning on line 50 in internal/distributed/streaming/append.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/streaming/append.go#L50

Added line #L50 was not covered by tests
}
}
}

// assertValidBroadcastMessage asserts the message is not system message.
func assertValidBroadcastMessage(msg message.BroadcastMutableMessage) {
if msg.MessageType().IsSystem() {
panic("system message is not allowed to broadcast append from client")

Check warning on line 58 in internal/distributed/streaming/append.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/streaming/append.go#L58

Added line #L58 was not covered by tests
}
}

// We only support delete and insert message for txn now.
func assertIsDmlMessage(msgs ...message.MutableMessage) {
for _, msg := range msgs {
Expand Down
4 changes: 4 additions & 0 deletions internal/distributed/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ type WALAccesser interface {
// RawAppend writes a records to the log.
RawAppend(ctx context.Context, msgs message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error)

// BroadcastAppend sends a broadcast message to all target vchannels.
// BroadcastAppend guarantees the atomicity written of the messages and eventual consistency.
BroadcastAppend(ctx context.Context, msg message.BroadcastMutableMessage) (*types.BroadcastAppendResult, error)

// Read returns a scanner for reading records from the wal.
Read(ctx context.Context, opts ReadOption) Scanner

Expand Down
38 changes: 21 additions & 17 deletions internal/distributed/streaming/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

const vChannel = "by-dev-rootcoord-dml_4"
var vChannels = []string{
"by-dev-rootcoord-dml_4",
"by-dev-rootcoord-dml_5",
}

func TestMain(m *testing.M) {
paramtable.Init()
Expand All @@ -33,10 +36,11 @@ func TestStreamingProduce(t *testing.T) {
WithBody(&msgpb.CreateCollectionRequest{
CollectionID: 1,
}).
WithVChannel(vChannel).
BuildMutable()
resp, err := streaming.WAL().RawAppend(context.Background(), msg)
fmt.Printf("%+v\t%+v\n", resp, err)
WithBroadcast(vChannels).
BuildBroadcast()

resp, err := streaming.WAL().BroadcastAppend(context.Background(), msg)
t.Logf("CreateCollection: %+v\t%+v\n", resp, err)

for i := 0; i < 500; i++ {
time.Sleep(time.Millisecond * 1)
Expand All @@ -47,17 +51,17 @@ func TestStreamingProduce(t *testing.T) {
WithBody(&msgpb.InsertRequest{
CollectionID: 1,
}).
WithVChannel(vChannel).
WithVChannel(vChannels[0]).
BuildMutable()
resp, err := streaming.WAL().RawAppend(context.Background(), msg)
fmt.Printf("%+v\t%+v\n", resp, err)
t.Logf("Insert: %+v\t%+v\n", resp, err)
}

for i := 0; i < 500; i++ {
time.Sleep(time.Millisecond * 1)
txn, err := streaming.WAL().Txn(context.Background(), streaming.TxnOption{
VChannel: vChannel,
Keepalive: 100 * time.Millisecond,
VChannel: vChannels[0],
Keepalive: 500 * time.Millisecond,
})
if err != nil {
t.Errorf("txn failed: %v", err)
Expand All @@ -71,7 +75,7 @@ func TestStreamingProduce(t *testing.T) {
WithBody(&msgpb.InsertRequest{
CollectionID: 1,
}).
WithVChannel(vChannel).
WithVChannel(vChannels[0]).
BuildMutable()
err := txn.Append(context.Background(), msg)
fmt.Printf("%+v\n", err)
Expand All @@ -80,7 +84,7 @@ func TestStreamingProduce(t *testing.T) {
if err != nil {
t.Errorf("txn failed: %v", err)
}
fmt.Printf("%+v\n", result)
t.Logf("txn commit: %+v\n", result)
}

msg, _ = message.NewDropCollectionMessageBuilderV1().
Expand All @@ -90,10 +94,10 @@ func TestStreamingProduce(t *testing.T) {
WithBody(&msgpb.DropCollectionRequest{
CollectionID: 1,
}).
WithVChannel(vChannel).
BuildMutable()
resp, err = streaming.WAL().RawAppend(context.Background(), msg)
fmt.Printf("%+v\t%+v\n", resp, err)
WithBroadcast(vChannels).
BuildBroadcast()
resp, err = streaming.WAL().BroadcastAppend(context.Background(), msg)
t.Logf("DropCollection: %+v\t%+v\n", resp, err)
}

func TestStreamingConsume(t *testing.T) {
Expand All @@ -102,7 +106,7 @@ func TestStreamingConsume(t *testing.T) {
defer streaming.Release()
ch := make(message.ChanMessageHandler, 10)
s := streaming.WAL().Read(context.Background(), streaming.ReadOption{
VChannel: vChannel,
VChannel: vChannels[0],
DeliverPolicy: options.DeliverPolicyAll(),
MessageHandler: ch,
})
Expand All @@ -115,7 +119,7 @@ func TestStreamingConsume(t *testing.T) {
time.Sleep(10 * time.Millisecond)
select {
case msg := <-ch:
fmt.Printf("msgID=%+v, msgType=%+v, tt=%d, lca=%+v, body=%s, idx=%d\n",
t.Logf("msgID=%+v, msgType=%+v, tt=%d, lca=%+v, body=%s, idx=%d\n",
msg.MessageID(),
msg.MessageType(),
msg.TimeTick(),
Expand Down
26 changes: 18 additions & 8 deletions internal/distributed/streaming/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ func newWALAccesser(c *clientv3.Client) *walAccesserImpl {
// Create a new streamingnode handler client.
handlerClient := handler.NewHandlerClient(streamingCoordClient.Assignment())
return &walAccesserImpl{
lifetime: typeutil.NewLifetime(),
streamingCoordAssignmentClient: streamingCoordClient,
handlerClient: handlerClient,
producerMutex: sync.Mutex{},
producers: make(map[string]*producer.ResumableProducer),
lifetime: typeutil.NewLifetime(),
streamingCoordClient: streamingCoordClient,
handlerClient: handlerClient,
producerMutex: sync.Mutex{},
producers: make(map[string]*producer.ResumableProducer),

// TODO: optimize the pool size, use the streaming api but not goroutines.
appendExecutionPool: conc.NewPool[struct{}](10),
Expand All @@ -45,8 +45,8 @@ type walAccesserImpl struct {
lifetime *typeutil.Lifetime

// All services
streamingCoordAssignmentClient client.Client
handlerClient handler.HandlerClient
streamingCoordClient client.Client
handlerClient handler.HandlerClient

producerMutex sync.Mutex
producers map[string]*producer.ResumableProducer
Expand All @@ -66,6 +66,16 @@ func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMess
return w.appendToWAL(ctx, msg)
}

func (w *walAccesserImpl) BroadcastAppend(ctx context.Context, msg message.BroadcastMutableMessage) (*types.BroadcastAppendResult, error) {
assertValidBroadcastMessage(msg)
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, ErrWALAccesserClosed
}
defer w.lifetime.Done()

return w.broadcastToWAL(ctx, msg)
}

// Read returns a scanner for reading records from the wal.
func (w *walAccesserImpl) Read(_ context.Context, opts ReadOption) Scanner {
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
Expand Down Expand Up @@ -144,7 +154,7 @@ func (w *walAccesserImpl) Close() {
w.producerMutex.Unlock()

w.handlerClient.Close()
w.streamingCoordAssignmentClient.Close()
w.streamingCoordClient.Close()
}

// newErrScanner creates a scanner that returns an error.
Expand Down
54 changes: 46 additions & 8 deletions internal/distributed/streaming/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,33 @@ const (
func TestWAL(t *testing.T) {
coordClient := mock_client.NewMockClient(t)
coordClient.EXPECT().Close().Return()
broadcastServce := mock_client.NewMockBroadcastService(t)
broadcastServce.EXPECT().Broadcast(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, bmm message.BroadcastMutableMessage) (*types.BroadcastAppendResult, error) {
result := make(map[string]*types.AppendResult)
for idx, msg := range bmm.SplitIntoMutableMessage() {
result[msg.VChannel()] = &types.AppendResult{
MessageID: walimplstest.NewTestMessageID(int64(idx)),
TimeTick: uint64(time.Now().UnixMilli()),
}
}
return &types.BroadcastAppendResult{
AppendResults: result,
}, nil
})
coordClient.EXPECT().Broadcast().Return(broadcastServce)
handler := mock_handler.NewMockHandlerClient(t)
handler.EXPECT().Close().Return()

w := &walAccesserImpl{
lifetime: typeutil.NewLifetime(),
streamingCoordAssignmentClient: coordClient,
handlerClient: handler,
producerMutex: sync.Mutex{},
producers: make(map[string]*producer.ResumableProducer),
appendExecutionPool: conc.NewPool[struct{}](10),
dispatchExecutionPool: conc.NewPool[struct{}](10),
lifetime: typeutil.NewLifetime(),
streamingCoordClient: coordClient,
handlerClient: handler,
producerMutex: sync.Mutex{},
producers: make(map[string]*producer.ResumableProducer),
appendExecutionPool: conc.NewPool[struct{}](10),
dispatchExecutionPool: conc.NewPool[struct{}](10),
}
defer w.Close()

ctx := context.Background()

Expand Down Expand Up @@ -114,6 +128,18 @@ func TestWAL(t *testing.T) {
newInsertMessage(vChannel3),
)
assert.NoError(t, resp.UnwrapFirstError())

r, err := w.BroadcastAppend(ctx, newBroadcastMessage([]string{vChannel1, vChannel2, vChannel3}))
assert.NoError(t, err)
assert.Len(t, r.AppendResults, 3)

w.Close()

resp = w.AppendMessages(ctx, newInsertMessage(vChannel1))
assert.Error(t, resp.UnwrapFirstError())
r, err = w.BroadcastAppend(ctx, newBroadcastMessage([]string{vChannel1, vChannel2, vChannel3}))
assert.Error(t, err)
assert.Nil(t, r)
}

func newInsertMessage(vChannel string) message.MutableMessage {
Expand All @@ -127,3 +153,15 @@ func newInsertMessage(vChannel string) message.MutableMessage {
}
return msg
}

func newBroadcastMessage(vchannels []string) message.BroadcastMutableMessage {
msg, err := message.NewDropCollectionMessageBuilderV1().
WithBroadcast(vchannels).
WithHeader(&message.DropCollectionMessageHeader{}).
WithBody(&msgpb.DropCollectionRequest{}).
BuildBroadcast()
if err != nil {
panic(err)
}
return msg
}
9 changes: 9 additions & 0 deletions internal/metastore/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,15 @@ type StreamingCoordCataLog interface {

// SavePChannel save a pchannel info to metastore.
SavePChannels(ctx context.Context, info []*streamingpb.PChannelMeta) error

// ListBroadcastTask list all broadcast tasks.
// Used to recovery the broadcast tasks.
ListBroadcastTask(ctx context.Context) ([]*streamingpb.BroadcastTask, error)

// SaveBroadcastTask save the broadcast task to metastore.
// Make the task recoverable after restart.
// When broadcast task is done, it will be removed from metastore.
SaveBroadcastTask(ctx context.Context, task *streamingpb.BroadcastTask) error
}

// StreamingNodeCataLog is the interface for streamingnode catalog
Expand Down
5 changes: 3 additions & 2 deletions internal/metastore/kv/streamingcoord/constant.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package streamingcoord

const (
MetaPrefix = "streamingcoord-meta"
PChannelMeta = MetaPrefix + "/pchannel"
MetaPrefix = "streamingcoord-meta/"
PChannelMetaPrefix = MetaPrefix + "pchannel/"
BroadcastTaskPrefix = MetaPrefix + "broadcast-task/"
)
Loading

0 comments on commit 1a4b760

Please sign in to comment.