Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into ytdlp
Browse files Browse the repository at this point in the history
  • Loading branch information
CorentinB committed Aug 12, 2024
2 parents 7c95917 + b428ecb commit 578fb6b
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 21 deletions.
42 changes: 28 additions & 14 deletions internal/pkg/crawl/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,29 +172,43 @@ func (c *Crawl) Start() (err error) {
go c.HQFinisher()
go c.HQWebsocket()
} else {
// Temporarily disable handover as it's not needed
c.Log.Info("Temporarily disabling handover..")
enableBackHandover := make(chan struct{})
syncHandover := make(chan struct{})

go c.Queue.TempDisableHandover(enableBackHandover, syncHandover)

<-syncHandover
// Push the seed list to the queue
c.Log.Info("Pushing seeds in the local queue..")
var seedPointers []*queue.Item
for idx, item := range c.SeedList {
seedPointers = append(seedPointers, &item)

// We enqueue seeds by batch of 100k
// Workers will start processing them as soon as one batch is enqueued
if idx%100000 == 0 {
c.Log.Info("Enqueuing seeds", "index", idx)
if err := c.Queue.BatchEnqueue(seedPointers...); err != nil {
c.Log.Error("unable to enqueue seeds, discarding", "error", err)
}
seedPointers = nil
for i := 0; i < len(c.SeedList); i += 100000 {
end := i + 100000
if end > len(c.SeedList) {
end = len(c.SeedList)
}
}
if len(seedPointers) > 0 {

c.Log.Info("Enqueuing seeds", "index", i)

// Create a slice of pointers to the items in the current batch
seedPointers := make([]*queue.Item, end-i)
for j := range seedPointers {
seedPointers[j] = &c.SeedList[i+j]
}

if err := c.Queue.BatchEnqueue(seedPointers...); err != nil {
c.Log.Error("unable to enqueue seeds, discarding", "error", err)
}
}

c.SeedList = nil

c.Log.Info("Enabling handover..")
enableBackHandover <- struct{}{}
<-syncHandover
close(enableBackHandover)
close(syncHandover)

c.Log.Info("All seeds are now in queue")
}

Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/queue/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (q *PersistentGroupedQueue) batchEnqueueNoCommit(items ...*Item) error {
// Update empty status
defer q.Empty.Set(false)

if !q.useHandover {
if !q.useHandover.Load() {
isHandover = false
} else {
isHandover = q.handover.tryOpen(batchLen)
Expand Down Expand Up @@ -110,7 +110,7 @@ func (q *PersistentGroupedQueue) batchEnqueueNoCommit(items ...*Item) error {
close(itemsChan)

// Wait for handover to finish then close for consumption
for q.useHandover {
for q.useHandover.Load() {
done := <-q.handover.signalConsumerDone
if done {
q.HandoverOpen.Set(false)
Expand Down Expand Up @@ -247,7 +247,7 @@ func (q *PersistentGroupedQueue) batchEnqueueUntilCommitted(items ...*Item) erro
// Update empty status
defer q.Empty.Set(false)

if !q.useHandover {
if !q.useHandover.Load() {
isHandover = false
} else {
isHandover = q.handover.tryOpen(batchLen)
Expand Down Expand Up @@ -313,7 +313,7 @@ func (q *PersistentGroupedQueue) batchEnqueueUntilCommitted(items ...*Item) erro
close(itemsChan)

// Wait for handover to finish then close for consumption
for q.useHandover {
for q.useHandover.Load() {
done := <-q.handover.signalConsumerDone
if done {
q.HandoverOpen.Set(false)
Expand Down
20 changes: 20 additions & 0 deletions internal/pkg/queue/handover.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,23 @@ func (h *handoverChannel) monitorActivity() {
}
}
}

func (q *PersistentGroupedQueue) TempDisableHandover(enableBack chan struct{}, syncHandover chan struct{}) bool {
if !q.useHandover.CompareAndSwap(true, false) {
return false
}

syncHandover <- struct{}{}

for {
timeout := time.After(1 * time.Minute)
select {
case <-enableBack:
ok := q.useHandover.CompareAndSwap(false, true)
syncHandover <- struct{}{}
return ok
case <-timeout:
return false
}
}
}
5 changes: 3 additions & 2 deletions internal/pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type PersistentGroupedQueue struct {
currentHost *atomic.Uint64
mutex sync.RWMutex

useHandover bool
useHandover *atomic.Bool
handover *handoverChannel
HandoverOpen *utils.TAtomBool
handoverMutex sync.Mutex
Expand Down Expand Up @@ -86,7 +86,7 @@ func NewPersistentGroupedQueue(queueDirPath string, useHandover bool, useCommit
closed: new(utils.TAtomBool),
finishing: new(utils.TAtomBool),

useHandover: useHandover,
useHandover: new(atomic.Bool),
HandoverOpen: new(utils.TAtomBool),
handoverCount: new(atomic.Uint64),

Expand Down Expand Up @@ -117,6 +117,7 @@ func NewPersistentGroupedQueue(queueDirPath string, useHandover bool, useCommit
q.currentHost.Store(0)

// Handover
q.useHandover.Store(useHandover)
q.HandoverOpen.Set(false)
q.handoverCount.Store(0)
if useHandover {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/queue/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (q *PersistentGroupedQueue) genStats() {
}

// Calculate handover success get count
if q.useHandover {
if q.useHandover.Load() {
q.stats.HandoverSuccessGetCount = q.handoverCount.Load()
}
}
Expand Down

0 comments on commit 578fb6b

Please sign in to comment.