From e539c9d007a0fe56a9ce9948837ea03dd195d7e6 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Mon, 12 Aug 2024 10:02:11 -0400 Subject: [PATCH 1/4] feat: enqueue seedlist using fixed memory slice for consistency and perf, change the useHandover bool to atomic bool, make a function to temp turn off the handover waiting on a channel --- internal/pkg/crawl/crawl.go | 11 +++++++++-- internal/pkg/queue/enqueue.go | 8 ++++---- internal/pkg/queue/handover.go | 15 +++++++++++++++ internal/pkg/queue/queue.go | 5 +++-- internal/pkg/queue/stats.go | 2 +- 5 files changed, 32 insertions(+), 9 deletions(-) diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index 5d8e5af2..3d82f67d 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -157,9 +157,14 @@ func (c *Crawl) Start() (err error) { go c.HQFinisher() go c.HQWebsocket() } else { + // Temporarily disable handover as it's not needed + c.Log.Info("Temporarily disabling handover..") + enableBackHandover := make(chan struct{}) + go c.Queue.TempDisableHandover(enableBackHandover) + // Push the seed list to the queue c.Log.Info("Pushing seeds in the local queue..") - var seedPointers []*queue.Item + seedPointers := make([]*queue.Item, 0, 100000) for idx, item := range c.SeedList { seedPointers = append(seedPointers, &item) @@ -170,7 +175,7 @@ func (c *Crawl) Start() (err error) { if err := c.Queue.BatchEnqueue(seedPointers...); err != nil { c.Log.Error("unable to enqueue seeds, discarding", "error", err) } - seedPointers = nil + seedPointers = make([]*queue.Item, 0, 100000) } } if len(seedPointers) > 0 { @@ -180,6 +185,8 @@ func (c *Crawl) Start() (err error) { } c.SeedList = nil + c.Log.Info("Enabling handover..") + enableBackHandover <- struct{}{} c.Log.Info("All seeds are now in queue") } diff --git a/internal/pkg/queue/enqueue.go b/internal/pkg/queue/enqueue.go index c13c6084..115cadea 100644 --- a/internal/pkg/queue/enqueue.go +++ b/internal/pkg/queue/enqueue.go @@ -44,7 +44,7 @@ func (q *PersistentGroupedQueue) batchEnqueueNoCommit(items ...*Item) error { // Update empty status defer q.Empty.Set(false) - if !q.useHandover { + if !q.useHandover.Load() { isHandover = false } else { isHandover = q.handover.tryOpen(batchLen) @@ -110,7 +110,7 @@ func (q *PersistentGroupedQueue) batchEnqueueNoCommit(items ...*Item) error { close(itemsChan) // Wait for handover to finish then close for consumption - for q.useHandover { + for q.useHandover.Load() { done := <-q.handover.signalConsumerDone if done { q.HandoverOpen.Set(false) @@ -247,7 +247,7 @@ func (q *PersistentGroupedQueue) batchEnqueueUntilCommitted(items ...*Item) erro // Update empty status defer q.Empty.Set(false) - if !q.useHandover { + if !q.useHandover.Load() { isHandover = false } else { isHandover = q.handover.tryOpen(batchLen) @@ -313,7 +313,7 @@ func (q *PersistentGroupedQueue) batchEnqueueUntilCommitted(items ...*Item) erro close(itemsChan) // Wait for handover to finish then close for consumption - for q.useHandover { + for q.useHandover.Load() { done := <-q.handover.signalConsumerDone if done { q.HandoverOpen.Set(false) diff --git a/internal/pkg/queue/handover.go b/internal/pkg/queue/handover.go index d5fe3e81..8c828934 100644 --- a/internal/pkg/queue/handover.go +++ b/internal/pkg/queue/handover.go @@ -176,3 +176,18 @@ func (h *handoverChannel) monitorActivity() { } } } + +func (q *PersistentGroupedQueue) TempDisableHandover(enableBack chan struct{}) bool { + if !q.useHandover.CompareAndSwap(true, false) { + return false + } + for { + timeout := time.After(1 * time.Minute) + select { + case <-enableBack: + return q.useHandover.CompareAndSwap(false, true) + case <-timeout: + return false + } + } +} diff --git a/internal/pkg/queue/queue.go b/internal/pkg/queue/queue.go index 6e3b6295..afd063ff 100644 --- a/internal/pkg/queue/queue.go +++ b/internal/pkg/queue/queue.go @@ -32,7 +32,7 @@ type PersistentGroupedQueue struct { currentHost *atomic.Uint64 mutex sync.RWMutex - useHandover bool + useHandover *atomic.Bool handover *handoverChannel HandoverOpen *utils.TAtomBool handoverMutex sync.Mutex @@ -86,7 +86,7 @@ func NewPersistentGroupedQueue(queueDirPath string, useHandover bool, useCommit closed: new(utils.TAtomBool), finishing: new(utils.TAtomBool), - useHandover: useHandover, + useHandover: new(atomic.Bool), HandoverOpen: new(utils.TAtomBool), handoverCount: new(atomic.Uint64), @@ -117,6 +117,7 @@ func NewPersistentGroupedQueue(queueDirPath string, useHandover bool, useCommit q.currentHost.Store(0) // Handover + q.useHandover.Store(useHandover) q.HandoverOpen.Set(false) q.handoverCount.Store(0) if useHandover { diff --git a/internal/pkg/queue/stats.go b/internal/pkg/queue/stats.go index 1ec145c2..dcf14af4 100644 --- a/internal/pkg/queue/stats.go +++ b/internal/pkg/queue/stats.go @@ -61,7 +61,7 @@ func (q *PersistentGroupedQueue) genStats() { } // Calculate handover success get count - if q.useHandover { + if q.useHandover.Load() { q.stats.HandoverSuccessGetCount = q.handoverCount.Load() } } From 2f71eccebe190216cc47636a5535789986b2d2ff Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Mon, 12 Aug 2024 10:30:52 -0400 Subject: [PATCH 2/4] feat: make TempDisableHandover() use a waitgroup and make the seedlist processing have a smaller mem footprint --- internal/pkg/crawl/crawl.go | 35 +++++++++++++++++++--------------- internal/pkg/queue/handover.go | 6 +++++- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index 3d82f67d..06fbd7ac 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -160,33 +160,38 @@ func (c *Crawl) Start() (err error) { // Temporarily disable handover as it's not needed c.Log.Info("Temporarily disabling handover..") enableBackHandover := make(chan struct{}) - go c.Queue.TempDisableHandover(enableBackHandover) + wg := new(sync.WaitGroup) + + go c.Queue.TempDisableHandover(enableBackHandover, wg) // Push the seed list to the queue c.Log.Info("Pushing seeds in the local queue..") - seedPointers := make([]*queue.Item, 0, 100000) - for idx, item := range c.SeedList { - seedPointers = append(seedPointers, &item) - - // We enqueue seeds by batch of 100k - // Workers will start processing them as soon as one batch is enqueued - if idx%100000 == 0 { - c.Log.Info("Enqueuing seeds", "index", idx) - if err := c.Queue.BatchEnqueue(seedPointers...); err != nil { - c.Log.Error("unable to enqueue seeds, discarding", "error", err) - } - seedPointers = make([]*queue.Item, 0, 100000) + for i := 0; i < len(c.SeedList); i += 100000 { + end := i + 100000 + if end > len(c.SeedList) { + end = len(c.SeedList) } - } - if len(seedPointers) > 0 { + + c.Log.Info("Enqueuing seeds", "index", i) + + // Create a slice of pointers to the items in the current batch + seedPointers := make([]*queue.Item, end-i) + for j := range seedPointers { + seedPointers[j] = &c.SeedList[i+j] + } + if err := c.Queue.BatchEnqueue(seedPointers...); err != nil { c.Log.Error("unable to enqueue seeds, discarding", "error", err) } } c.SeedList = nil + c.Log.Info("Enabling handover..") enableBackHandover <- struct{}{} + wg.Wait() + close(enableBackHandover) + c.Log.Info("All seeds are now in queue") } diff --git a/internal/pkg/queue/handover.go b/internal/pkg/queue/handover.go index 8c828934..0ddcd819 100644 --- a/internal/pkg/queue/handover.go +++ b/internal/pkg/queue/handover.go @@ -1,6 +1,7 @@ package queue import ( + "sync" "sync/atomic" "time" ) @@ -177,7 +178,10 @@ func (h *handoverChannel) monitorActivity() { } } -func (q *PersistentGroupedQueue) TempDisableHandover(enableBack chan struct{}) bool { +func (q *PersistentGroupedQueue) TempDisableHandover(enableBack chan struct{}, wg *sync.WaitGroup) bool { + wg.Add(1) + defer wg.Done() + if !q.useHandover.CompareAndSwap(true, false) { return false } From 8668eb6b3fedf8c2e1313ead94e4a342279e7965 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Mon, 12 Aug 2024 10:36:37 -0400 Subject: [PATCH 3/4] sync using a second channel instead of waitgroup --- internal/pkg/crawl/crawl.go | 7 ++++--- internal/pkg/queue/handover.go | 13 +++++++------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index 06fbd7ac..c9c01cdf 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -160,10 +160,11 @@ func (c *Crawl) Start() (err error) { // Temporarily disable handover as it's not needed c.Log.Info("Temporarily disabling handover..") enableBackHandover := make(chan struct{}) - wg := new(sync.WaitGroup) + syncHandover := make(chan struct{}) - go c.Queue.TempDisableHandover(enableBackHandover, wg) + go c.Queue.TempDisableHandover(enableBackHandover, syncHandover) + <-syncHandover // Push the seed list to the queue c.Log.Info("Pushing seeds in the local queue..") for i := 0; i < len(c.SeedList); i += 100000 { @@ -189,7 +190,7 @@ func (c *Crawl) Start() (err error) { c.Log.Info("Enabling handover..") enableBackHandover <- struct{}{} - wg.Wait() + <-syncHandover close(enableBackHandover) c.Log.Info("All seeds are now in queue") diff --git a/internal/pkg/queue/handover.go b/internal/pkg/queue/handover.go index 0ddcd819..66e28bd7 100644 --- a/internal/pkg/queue/handover.go +++ b/internal/pkg/queue/handover.go @@ -1,7 +1,6 @@ package queue import ( - "sync" "sync/atomic" "time" ) @@ -178,18 +177,20 @@ func (h *handoverChannel) monitorActivity() { } } -func (q *PersistentGroupedQueue) TempDisableHandover(enableBack chan struct{}, wg *sync.WaitGroup) bool { - wg.Add(1) - defer wg.Done() - +func (q *PersistentGroupedQueue) TempDisableHandover(enableBack chan struct{}, syncHandover chan struct{}) bool { if !q.useHandover.CompareAndSwap(true, false) { return false } + + syncHandover <- struct{}{} + for { timeout := time.After(1 * time.Minute) select { case <-enableBack: - return q.useHandover.CompareAndSwap(false, true) + ok := q.useHandover.CompareAndSwap(false, true) + syncHandover <- struct{}{} + return ok case <-timeout: return false } From 46c721c3a2fd497f7e937866edc93630511dee8b Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Mon, 12 Aug 2024 10:43:20 -0400 Subject: [PATCH 4/4] close syncHandover chan --- internal/pkg/crawl/crawl.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index c9c01cdf..31cbe659 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -192,6 +192,7 @@ func (c *Crawl) Start() (err error) { enableBackHandover <- struct{}{} <-syncHandover close(enableBackHandover) + close(syncHandover) c.Log.Info("All seeds are now in queue") }