Skip to content

Commit

Permalink
fix: Fix load slowly (#37454)
Browse files Browse the repository at this point in the history
When there're a lot of loaded collections, they would occupy the target
observer scheduler’s pool. This prevents loading collections from
updating the current target in time, slowing down the load process.
This PR adds a separate target dispatcher for loading collections.

issue: #37166

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Nov 8, 2024
1 parent ce2fa3d commit ff9bdf7
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 12 deletions.
4 changes: 2 additions & 2 deletions internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error {
partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 {
return partition.PartitionID
})
allocatedTarget := NewCollectionTarget(nil, nil, partitionIDs)

channelInfos := make(map[string][]*datapb.VchannelInfo)
segments := make(map[int64]*datapb.SegmentInfo, 0)
Expand Down Expand Up @@ -194,7 +193,8 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error {
return nil
}

mgr.next.updateCollectionTarget(collectionID, NewCollectionTarget(segments, dmChannels, partitionIDs))
allocatedTarget := NewCollectionTarget(segments, dmChannels, partitionIDs)
mgr.next.updateCollectionTarget(collectionID, allocatedTarget)
log.Debug("finish to update next targets for collection",
zap.Int64("collectionID", collectionID),
zap.Int64s("PartitionIDs", partitionIDs),
Expand Down
28 changes: 20 additions & 8 deletions internal/querycoordv2/observers/target_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,12 @@ type TargetObserver struct {
mut sync.Mutex // Guard readyNotifiers
readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers

dispatcher *taskDispatcher[int64]
keylocks *lock.KeyLock[int64]
// loadingDispatcher updates targets for collections that are loading (also collections without a current target).
loadingDispatcher *taskDispatcher[int64]
// loadedDispatcher updates targets for loaded collections.
loadedDispatcher *taskDispatcher[int64]

keylocks *lock.KeyLock[int64]

startOnce sync.Once
stopOnce sync.Once
Expand All @@ -114,8 +118,8 @@ func NewTargetObserver(
keylocks: lock.NewKeyLock[int64](),
}

dispatcher := newTaskDispatcher(result.check)
result.dispatcher = dispatcher
result.loadingDispatcher = newTaskDispatcher(result.check)
result.loadedDispatcher = newTaskDispatcher(result.check)
return result
}

Expand All @@ -124,7 +128,8 @@ func (ob *TargetObserver) Start() {
ctx, cancel := context.WithCancel(context.Background())
ob.cancel = cancel

ob.dispatcher.Start()
ob.loadingDispatcher.Start()
ob.loadedDispatcher.Start()

ob.wg.Add(1)
go func() {
Expand All @@ -144,7 +149,8 @@ func (ob *TargetObserver) Stop() {
}
ob.wg.Wait()

ob.dispatcher.Stop()
ob.loadingDispatcher.Stop()
ob.loadedDispatcher.Stop()
})
}

Expand All @@ -167,7 +173,13 @@ func (ob *TargetObserver) schedule(ctx context.Context) {

case <-ticker.C:
ob.clean()
ob.dispatcher.AddTask(ob.meta.GetAll()...)
loaded := lo.FilterMap(ob.meta.GetAllCollections(), func(collection *meta.Collection, _ int) (int64, bool) {
if collection.GetStatus() == querypb.LoadStatus_Loaded {
return collection.GetCollectionID(), true
}
return 0, false
})
ob.loadedDispatcher.AddTask(loaded...)

case req := <-ob.updateChan:
log.Info("manually trigger update target",
Expand Down Expand Up @@ -217,7 +229,7 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool {
result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID)
if !result {
ob.dispatcher.AddTask(collectionID)
ob.loadingDispatcher.AddTask(collectionID)
}
return result
}
Expand Down
7 changes: 5 additions & 2 deletions internal/querycoordv2/observers/target_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ func (suite *TargetObserverSuite) SetupTest() {
suite.collectionID = int64(1000)
suite.partitionID = int64(100)

err = suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(suite.collectionID, 1))
testCollection := utils.CreateTestCollection(suite.collectionID, 1)
testCollection.Status = querypb.LoadStatus_Loaded
err = suite.meta.CollectionManager.PutCollection(testCollection)
suite.NoError(err)
err = suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collectionID, suite.partitionID))
suite.NoError(err)
Expand Down Expand Up @@ -302,7 +304,8 @@ func (suite *TargetObserverCheckSuite) SetupTest() {
func (s *TargetObserverCheckSuite) TestCheck() {
r := s.observer.Check(context.Background(), s.collectionID, common.AllPartitionsID)
s.False(r)
s.True(s.observer.dispatcher.tasks.Contain(s.collectionID))
s.False(s.observer.loadedDispatcher.tasks.Contain(s.collectionID))
s.True(s.observer.loadingDispatcher.tasks.Contain(s.collectionID))
}

func TestTargetObserver(t *testing.T) {
Expand Down

0 comments on commit ff9bdf7

Please sign in to comment.