Skip to content

Commit

Permalink
Add some live logging of the HQ consumer behavior (#143)
Browse files Browse the repository at this point in the history
* Add: print HQ consumer status

* chore: bump gocrawlhq to 1.2.6
  • Loading branch information
CorentinB authored Sep 1, 2024
1 parent 349e035 commit 1aef5b6
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 1 deletion.
1 change: 1 addition & 0 deletions internal/pkg/crawl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type Crawl struct {
HQBatchSize int
HQContinuousPull bool
HQClient *gocrawlhq.Client
HQConsumerState string
HQFinishedChannel chan *queue.Item
HQProducerChannel chan *queue.Item
HQChannelsWg *sync.WaitGroup
Expand Down
13 changes: 12 additions & 1 deletion internal/pkg/crawl/hq.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,24 @@ func (c *Crawl) HQProducer() {

func (c *Crawl) HQConsumer() {
for {
c.HQConsumerState = "running"

// This is on purpose evaluated every time,
// because the value of workers will maybe change
// during the crawl in the future (to be implemented)
var HQBatchSize = int(c.Workers.Count)

if c.Finished.Get() {
c.HQConsumerState = "finished"
c.Log.Error("crawl finished, stopping HQ consumer")
break
}

// If HQContinuousPull is set to true, we will pull URLs from HQ continuously,
// otherwise we will only pull URLs when needed (and when the crawl is not paused)
for (uint64(c.Queue.GetStats().TotalElements) > uint64(HQBatchSize) && !c.HQContinuousPull) || c.Paused.Get() || c.Queue.HandoverOpen.Get() {
for (c.Queue.GetStats().TotalElements > HQBatchSize && !c.HQContinuousPull) || c.Paused.Get() || c.Queue.HandoverOpen.Get() {
c.HQConsumerState = "waiting"
c.Log.Info("HQ producer waiting", "paused", c.Paused.Get(), "handoverOpen", c.Queue.HandoverOpen.Get(), "queueSize", c.Queue.GetStats().TotalElements)
time.Sleep(time.Millisecond * 50)
continue
}
Expand All @@ -171,6 +176,7 @@ func (c *Crawl) HQConsumer() {
}

// get batch from crawl HQ
c.HQConsumerState = "waitingOnFeed"
batch, err := c.HQClient.Feed(HQBatchSize, c.HQStrategy)
if err != nil {
c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
Expand All @@ -179,11 +185,13 @@ func (c *Crawl) HQConsumer() {
})).Error("error getting new URLs from crawl HQ")
continue
}
c.HQConsumerState = "feedCompleted"

// send all URLs received in the batch to the queue
var items = make([]*queue.Item, 0, len(batch.URLs))
if len(batch.URLs) > 0 {
for _, URL := range batch.URLs {
c.HQConsumerState = "urlParse"
newURL, err := url.Parse(URL.Value)
if err != nil {
c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
Expand All @@ -194,6 +202,7 @@ func (c *Crawl) HQConsumer() {
continue
}

c.HQConsumerState = "newItem"
newItem, err := queue.NewItem(newURL, nil, "seed", uint64(strings.Count(URL.Path, "L")), URL.ID, false)
if err != nil {
c.Log.WithFields(c.genLogFields(err, newURL, map[string]interface{}{
Expand All @@ -204,10 +213,12 @@ func (c *Crawl) HQConsumer() {
continue
}

c.HQConsumerState = "append"
items = append(items, newItem)
}
}

c.HQConsumerState = "enqueue"
err = c.Queue.BatchEnqueue(items...)
if err != nil {
c.Log.Error("unable to enqueue URL batch received from crawl HQ, discarding", "error", err)
Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/crawl/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (c *Crawl) printLiveStats() {
stats.AddRow(" - Queue empty bool state:", c.Queue.Empty.Get())
stats.AddRow(" - Can Enqueue:", c.Queue.CanEnqueue())
stats.AddRow(" - Can Dequeue:", c.Queue.CanDequeue())

if c.UseHQ {
stats.AddRow(" - HQ consumer state:", c.HQConsumerState)
}

stats.AddRow(" - Crawled total:", crawledSeeds+crawledAssets)
stats.AddRow(" - Crawled seeds:", crawledSeeds)
stats.AddRow(" - Crawled assets:", crawledAssets)
Expand Down

0 comments on commit 1aef5b6

Please sign in to comment.