Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewriting the queue #78

Merged
merged 107 commits into from
Aug 4, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
fd106db
Initial commit for frontier rewrite
CorentinB Jul 12, 2024
96a4e38
Replace mmap files with "regular" files
CorentinB Jul 12, 2024
1090690
Remove useless stat
CorentinB Jul 12, 2024
1c41124
Remove dequeue recursion
CorentinB Jul 12, 2024
a0045d3
Fix: dequeue timeout mechanism
CorentinB Jul 12, 2024
487319d
Big brain modifications of queue & enqueue with signal
CorentinB Jul 12, 2024
3a70670
Add: enqueue tests
CorentinB Jul 12, 2024
a2fb09d
Increment Prometheus counter only when --api is specified
CorentinB Jul 12, 2024
52d751a
WIP: Dequeue test
CorentinB Jul 12, 2024
5bc73f1
Remove timeout for Dequeue
CorentinB Jul 12, 2024
307cc9c
Remove any mention of 'frontier'
CorentinB Jul 12, 2024
2abaf8e
Move stats updates to separate functions
CorentinB Jul 12, 2024
ea748ea
Remove a test that doesn't make sense in a root env (like in a Docker…
CorentinB Jul 12, 2024
2da5d78
Add: large scale test (doesn't pass yet)
CorentinB Jul 12, 2024
61b7b9c
Restore previous version of Dequeue
CorentinB Jul 12, 2024
d81a5ff
chore: align structures to save space
equals215 Jul 13, 2024
a1f0efd
feat: remove the dequeue recusion
equals215 Jul 13, 2024
de41b1d
fix: crash when --api is not set (#79)
NGTmeaty Jul 15, 2024
7423728
Increment Prometheus counter only when --api is specified
CorentinB Jul 12, 2024
cc9eed1
WIP - Switching to Protobuf
CorentinB Jul 16, 2024
2786d62
Fix: dequeue properly
CorentinB Jul 16, 2024
bbdfa78
Use new NewItem function
CorentinB Jul 16, 2024
cea09f0
Remove Gob queue encoder/decoder
CorentinB Jul 16, 2024
7cb11da
Move item stuff to a separate file
CorentinB Jul 16, 2024
9427e39
chore: moved protobuf to a new package to respect conventions
equals215 Jul 16, 2024
9c1a579
Fix: cond wait lock problem
CorentinB Jul 16, 2024
25602e7
WIP fix parallel test
CorentinB Jul 16, 2024
42386e5
item encoding revamped
equals215 Jul 16, 2024
1ea1232
reflect changes on crawl.go
equals215 Jul 16, 2024
c07731a
Removing useless boolean comparison
CorentinB Jul 19, 2024
90bc8bd
Merge branch 'main' into queue
equals215 Jul 19, 2024
53b329c
chore: second pass for merge
equals215 Jul 19, 2024
708fd86
Merge branch 'queue' of github.com:internetarchive/Zeno into queue
equals215 Jul 19, 2024
5ccc9e1
chore: removed relicate
equals215 Jul 19, 2024
19cb96d
Fix TestParallelEnqueueDequeue
CorentinB Jul 19, 2024
ab683ee
Remove the very useless LoggingChan
CorentinB Jul 19, 2024
c0ad1c9
Remove sync.Cond & rewrite parallel test
CorentinB Jul 20, 2024
a7ac043
Fix: TestNewItem
CorentinB Jul 20, 2024
2081aa3
Typo: TestNewItem
CorentinB Jul 20, 2024
56ab6ef
Merge branch 'main' into queue
equals215 Jul 21, 2024
e50b979
fix: some leftovers unseen errors from main merge
equals215 Jul 21, 2024
48a2817
WIP on index
equals215 Jul 22, 2024
ac520c4
index MVP - to be tested __thoroughly__
equals215 Jul 22, 2024
2998afc
WIP changes
equals215 Jul 23, 2024
1e0db1a
enhancements for WAL + start of tests
equals215 Jul 23, 2024
2a3075b
fix: corrected WAL fd inconsistencies thanks to new tests
equals215 Jul 23, 2024
a39c0d5
add more tests and corrected truncate accordingly
equals215 Jul 23, 2024
7c6b106
feat: index can recover after non-graceful exit (still need to check …
equals215 Jul 23, 2024
0812bed
prepare to merge and apply Corentin live review
equals215 Jul 23, 2024
d0092e4
feat: remove index/file_io.go:performDump() printf
equals215 Jul 23, 2024
ed7f6cf
implemented requested changes
equals215 Jul 23, 2024
f11170c
feat: enhance periodicDump routine
equals215 Jul 23, 2024
3221141
fix: periodicDump() missing a stat check
equals215 Jul 23, 2024
c602dcf
Persist & load queue stats (#93)
CorentinB Jul 23, 2024
7ce49a5
fix: queue-stats merge typo
equals215 Jul 24, 2024
85682e7
Merge remote-tracking branch 'origin/main' into queue
CorentinB Jul 25, 2024
3000dfa
tests: add some benchmarking for queue and index
equals215 Jul 25, 2024
11d67e9
fix: temp files not contained in the same directory as Zeno & index f…
equals215 Jul 25, 2024
2d56d81
fix: edge-case of queue fatigue: queue depletes over time when not en…
equals215 Jul 26, 2024
8e724c4
add queue empty bool state to live stats
equals215 Jul 26, 2024
67c3384
Implement host rotation and Enqueue/Dequeue access regulation via ato…
equals215 Jul 26, 2024
98ec9fa
fix: open the WAL file as O_SYNC and get rid of file.Sync() calls con…
equals215 Jul 27, 2024
ccb6ac4
fix: reduce sleeps in worker
equals215 Jul 27, 2024
65c0cf3
feat: make Dequeue non-blocking and manage the wait on the worker sid…
equals215 Jul 27, 2024
73ac185
fix: TestDequeue expected return of a deprecated error
equals215 Jul 27, 2024
1f4d8a6
fix: TestPersistentGroupedQueue_Close expected return of a deprecated…
equals215 Jul 27, 2024
49f4989
feat: add `TEMP_DIR` env variable support for queue benchmarks
equals215 Jul 29, 2024
2a8aa48
add: TestBatchEnqueue
CorentinB Jul 29, 2024
d2e0503
modify HQConsumer to batch enqueue
CorentinB Jul 29, 2024
97c64ef
change HQ consumption logic
CorentinB Jul 29, 2024
cb68078
fix: TestEnqueue & TestBatchEnqueue
CorentinB Jul 29, 2024
d78a187
rework HQConsumer pause logic a bit
CorentinB Jul 29, 2024
63bafd7
fix: many tests
CorentinB Jul 29, 2024
2ed7b9c
fix: resuming queue couldn't start due to broken logic: queue.Empty w…
equals215 Jul 29, 2024
6c0006d
enhancement: close all workers in parallel
CorentinB Jul 29, 2024
ae2056d
fix: TestQueueEmptyBool
CorentinB Jul 29, 2024
fae5403
fix: index.Pop logic error: WAL writing unnamed goroutine couldn't re…
equals215 Jul 29, 2024
ca10b08
feat: implemented a handover process so that batch enqueue process ca…
equals215 Jul 30, 2024
d6480c9
fix: infinite for loop on BatchEnqueue()
equals215 Jul 30, 2024
12e9aea
make workers yield more frequently
equals215 Jul 30, 2024
01a0e22
feat: make outlinks batch enqueueable
equals215 Jul 30, 2024
e8ed999
feat: add handover stats
equals215 Jul 30, 2024
c9c2a2c
optimize `get list` loading performance (#104)
yzqzss Aug 1, 2024
4ce3cde
Remove `runtime.Gosched()` in polling (#105)
yzqzss Aug 1, 2024
09ddbc7
Queue handover v2 (#106)
equals215 Aug 1, 2024
7029d7e
slight enhancements of handover and adjustments of workers sleeps
equals215 Aug 2, 2024
6a34cac
fix: infinite for loop when finishing
equals215 Aug 2, 2024
beac4a4
chore: Set `Content-Type` header for json response
yzqzss Aug 1, 2024
bc6f381
Improve WAL concurrency performance through the commit mechanism
yzqzss Aug 2, 2024
538e2ab
tests: update
yzqzss Aug 2, 2024
37744a7
fix: QueueStats data race
yzqzss Aug 2, 2024
7a64613
fix: deadlock :(
yzqzss Aug 3, 2024
1c80c19
push the seed list to the queue in batches
yzqzss Aug 3, 2024
be8b9b4
typo: committed
yzqzss Aug 3, 2024
04e02e6
Gracefully shutdown walCommitsSyncer()
yzqzss Aug 3, 2024
5f81a4d
chore
yzqzss Aug 3, 2024
f56c463
Merge pull request #107 from saveweb/queue-commit-await
equals215 Aug 3, 2024
9d179ab
feat: made @yzqzss commit feature optional by flag
equals215 Aug 3, 2024
f633a8f
fix: forgot to separate Dequeue() function
equals215 Aug 3, 2024
5eaefe1
chore: rm uint64 init to 0
equals215 Aug 3, 2024
48dc14b
feat: check if items have been written before trying to commit WAL
equals215 Aug 3, 2024
c1ff8f0
forgot to save before commit
equals215 Aug 3, 2024
03b2877
fix: changes asked by yzqzss
equals215 Aug 3, 2024
15df42d
Merge pull request #109 from internetarchive/queue-WAL-write-batches
equals215 Aug 3, 2024
2c68252
fix: hanging on index.Close()
yzqzss Aug 3, 2024
e09bf35
Merge pull request #110 from yzqzss/patch-3
equals215 Aug 4, 2024
b404d69
Merge branch 'main' into queue
equals215 Aug 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/get/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package get
import (
"github.com/internetarchive/Zeno/cmd"
"github.com/internetarchive/Zeno/config"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
Expand All @@ -30,7 +29,7 @@ func cmdGetList(c *cli.Context) error {
crawl := cmd.InitCrawlWithCMD(config.App.Flags)

// Initialize initial seed list
crawl.SeedList, err = frontier.IsSeedList(c.Args().Get(0))
crawl.SeedList, err = isSeedList(c.Args().Get(0))
if err != nil || len(crawl.SeedList) <= 0 {
logrus.WithFields(logrus.Fields{
"input": c.Args().Get(0),
Expand Down
4 changes: 2 additions & 2 deletions cmd/get/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/internetarchive/Zeno/cmd"
"github.com/internetarchive/Zeno/config"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
"github.com/internetarchive/Zeno/internal/pkg/queue"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -40,7 +40,7 @@ func cmdGetURL(c *cli.Context) error {
return err
}

crawl.SeedList = append(crawl.SeedList, *frontier.NewItem(input, nil, "seed", 0, "", false))
crawl.SeedList = append(crawl.SeedList, *queue.NewItem(input, nil, "seed", 0, "", false))

// Start crawl
err = crawl.Start()
Expand Down
48 changes: 4 additions & 44 deletions internal/pkg/frontier/utils.go → cmd/get/utils.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
package frontier
package get

import (
"bufio"
"encoding/gob"
"errors"
"fmt"
"net/url"
"os"
"sync"

"github.com/gosuri/uilive"
"github.com/internetarchive/Zeno/internal/pkg/queue"
"github.com/sirupsen/logrus"
)

// IsSeedList validates if the path is a seed list, and return an array of
// frontier.Item made of the seeds if it can
func IsSeedList(path string) (seeds []Item, err error) {
func isSeedList(path string) (seeds []queue.Item, err error) {
var totalCount, validCount int
writer := uilive.New()
writer.Start()
Expand Down Expand Up @@ -51,7 +48,7 @@ func IsSeedList(path string) (seeds []Item, err error) {
continue
}

item := NewItem(URL, nil, "seed", 0, "", false)
item := queue.NewItem(URL, nil, "seed", 0, "", false)
seeds = append(seeds, *item)
validCount++
fmt.Fprintf(writer, "\t Reading input list.. Found %d valid URLs out of %d URLs read.\n", validCount, totalCount)
Expand All @@ -69,40 +66,3 @@ func IsSeedList(path string) (seeds []Item, err error) {

return seeds, nil
}

type Pair struct {
Key, Value interface{}
}

func SyncMapEncode(m *sync.Map, file *os.File) error {
var pairs []Pair

m.Range(func(key, value interface{}) bool {
pairs = append(pairs, Pair{key, value})
return true
})

gob.Register(PoolItem{})

enc := gob.NewEncoder(file)
err := enc.Encode(pairs)

return err
}

func SyncMapDecode(m *sync.Map, file *os.File) error {
var pairs []Pair
gob.Register(PoolItem{})
dec := gob.NewDecoder(file)
err := dec.Decode(&pairs)

if err != nil {
return err
}

for _, p := range pairs {
m.Store(p.Key, p.Value.(PoolItem))
}

return nil
}
7 changes: 1 addition & 6 deletions cmd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/google/uuid"
"github.com/internetarchive/Zeno/config"
"github.com/internetarchive/Zeno/internal/pkg/crawl"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
"github.com/internetarchive/Zeno/internal/pkg/log"
"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/paulbellamy/ratecounter"
Expand Down Expand Up @@ -73,10 +72,6 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl {

c.LiveStats = flags.LiveStats

// Frontier
c.Frontier = new(frontier.Frontier)
c.Frontier.Log = c.Log

// If the job name isn't specified, we generate a random name
if flags.Job == "" {
if flags.HQProject != "" {
Expand All @@ -101,7 +96,7 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl {
c.MaxConcurrentAssets = flags.MaxConcurrentAssets
c.WorkerStopSignal = make(chan bool)

c.Seencheck = flags.Seencheck
c.UseSeencheck = flags.Seencheck
c.HTTPTimeout = flags.HTTPTimeout
c.MaxConcurrentRequestsPerDomain = flags.MaxConcurrentRequestsPerDomain
c.RateLimitDelay = flags.RateLimitDelay
Expand Down
6 changes: 5 additions & 1 deletion internal/pkg/crawl/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (crawl *Crawl) startAPI() {
"crawled": crawledSeeds + crawledAssets,
"crawledSeeds": crawledSeeds,
"crawledAssets": crawledAssets,
"queued": crawl.Frontier.QueueCount.Value(),
"queued": crawl.Queue.GetStats().TotalElements,
"uptime": time.Since(crawl.StartTime).String(),
}

Expand All @@ -52,6 +52,10 @@ func (crawl *Crawl) startAPI() {

http.HandleFunc("/metrics", setupPrometheus(crawl).ServeHTTP)

http.HandleFunc("/queue", func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(crawl.Queue.GetStats())
})

http.HandleFunc("/workers", func(w http.ResponseWriter, r *http.Request) {
workersState := crawl.GetWorkerState(-1)
json.NewEncoder(w).Encode(workersState)
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/crawl/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (

"github.com/PuerkitoBio/goquery"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/cloudflarestream"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
"github.com/internetarchive/Zeno/internal/pkg/queue"
"github.com/internetarchive/Zeno/internal/pkg/utils"
)

func (c *Crawl) extractAssets(base *url.URL, item *frontier.Item, doc *goquery.Document) (assets []*url.URL, err error) {
func (c *Crawl) extractAssets(base *url.URL, item *queue.Item, doc *goquery.Document) (assets []*url.URL, err error) {
var rawAssets []string

// Execute plugins on the response
Expand Down
63 changes: 34 additions & 29 deletions internal/pkg/crawl/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,24 @@ import (
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/tiktok"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/truthsocial"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/vk"
"github.com/internetarchive/Zeno/internal/pkg/queue"
"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/remeh/sizedwaitgroup"
"github.com/tomnomnom/linkheader"

"github.com/internetarchive/Zeno/internal/pkg/frontier"
)

func (c *Crawl) executeGET(item *frontier.Item, req *http.Request, isRedirection bool) (resp *http.Response, err error) {
func (c *Crawl) executeGET(item *queue.Item, req *http.Request, isRedirection bool) (resp *http.Response, err error) {
var (
executionStart = time.Now()
newItem *frontier.Item
newItem *queue.Item
newReq *http.Request
URL *url.URL
)

defer func() {
c.PrometheusMetrics.DownloadedURI.Inc()
if c.API {
c.PrometheusMetrics.DownloadedURI.Inc()
}

c.URIsPerSecond.Incr(1)

Expand All @@ -52,17 +53,17 @@ func (c *Crawl) executeGET(item *frontier.Item, req *http.Request, isRedirection
time.Sleep(time.Second)
}

// TODO: re-implement host limitation
// Temporarily pause crawls for individual hosts if they are over our configured maximum concurrent requests per domain.
// If the request is a redirection, we do not pause the crawl because we want to follow the redirection.
if !isRedirection {
for c.shouldPause(item.Host) {
time.Sleep(time.Millisecond * time.Duration(c.RateLimitDelay))
}
// if !isRedirection {
// for c.shouldPause(item.Host) {
// time.Sleep(time.Millisecond * time.Duration(c.RateLimitDelay))
// }

c.Frontier.IncrHostActive(item.Host)

defer c.Frontier.DecrHostActive(item.Host)
}
// c.Queue.IncrHostActive(item.Host)
// defer c.Frontier.DecrHostActive(item.Host)
//}

// Retry on 429 error
for retry := 0; retry < c.MaxRetry; retry++ {
Expand Down Expand Up @@ -149,7 +150,7 @@ func (c *Crawl) executeGET(item *frontier.Item, req *http.Request, isRedirection
}

// Seencheck the URL
if c.Seencheck {
if c.UseSeencheck {
found := c.seencheckURL(utils.URLToString(URL), "seed")
if found {
return nil, errors.New("URL from redirection has already been seen")
Expand All @@ -165,7 +166,7 @@ func (c *Crawl) executeGET(item *frontier.Item, req *http.Request, isRedirection
}
}

newItem = frontier.NewItem(URL, item, item.Type, item.Hop, item.ID, false)
newItem = queue.NewItem(URL, item, item.Type, item.Hop, item.ID, false)
newItem.Redirect = item.Redirect + 1

// Prepare GET request
Expand All @@ -184,7 +185,7 @@ func (c *Crawl) executeGET(item *frontier.Item, req *http.Request, isRedirection
return resp, nil
}

func (c *Crawl) captureAsset(item *frontier.Item, cookies []*http.Cookie) error {
func (c *Crawl) captureAsset(item *queue.Item, cookies []*http.Cookie) error {
var resp *http.Response

// Prepare GET request
Expand Down Expand Up @@ -216,13 +217,13 @@ func (c *Crawl) captureAsset(item *frontier.Item, cookies []*http.Cookie) error
}

// Capture capture the URL and return the outlinks
func (c *Crawl) Capture(item *frontier.Item) error {
func (c *Crawl) Capture(item *queue.Item) error {
var (
resp *http.Response
waitGroup sync.WaitGroup
)

defer func(i *frontier.Item) {
defer func(i *queue.Item) {
waitGroup.Wait()

if c.UseHQ && i.ID != "" {
Expand Down Expand Up @@ -254,7 +255,7 @@ func (c *Crawl) Capture(item *frontier.Item) error {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while generating API URL")
} else {
// Then we create an item
apiItem := frontier.NewItem(apiURL, item, item.Type, item.Hop, item.ID, false)
apiItem := queue.NewItem(apiURL, item, item.Type, item.Hop, item.ID, false)

// And capture it
c.Capture(apiItem)
Expand All @@ -267,7 +268,7 @@ func (c *Crawl) Capture(item *frontier.Item) error {
} else {
for _, embedURL := range embedURLs {
// Create the embed item
embedItem := frontier.NewItem(embedURL, item, item.Type, item.Hop, item.ID, false)
embedItem := queue.NewItem(embedURL, item, item.Type, item.Hop, item.ID, false)

// Capture the embed item
c.Capture(embedItem)
Expand All @@ -284,7 +285,7 @@ func (c *Crawl) Capture(item *frontier.Item) error {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while generating Facebook embed URL")
} else {
// Create the embed item
embedItem := frontier.NewItem(embedURL, item, item.Type, item.Hop, item.ID, false)
embedItem := queue.NewItem(embedURL, item, item.Type, item.Hop, item.ID, false)

// Capture the embed item
err = c.Capture(embedItem)
Expand All @@ -302,7 +303,7 @@ func (c *Crawl) Capture(item *frontier.Item) error {
if highwindsURL == nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while generating libsyn URL")
} else {
c.Capture(frontier.NewItem(highwindsURL, item, item.Type, item.Hop, item.ID, false))
c.Capture(queue.NewItem(highwindsURL, item, item.Type, item.Hop, item.ID, false))
}
}
} else if tiktok.IsTikTokURL(utils.URLToString(item.URL)) {
Expand All @@ -312,7 +313,7 @@ func (c *Crawl) Capture(item *frontier.Item) error {
telegram.TransformURL(item.URL)

// Then we create an item
embedItem := frontier.NewItem(item.URL, item, item.Type, item.Hop, item.ID, false)
embedItem := queue.NewItem(item.URL, item, item.Type, item.Hop, item.ID, false)

// And capture it
c.Capture(embedItem)
Expand All @@ -325,7 +326,7 @@ func (c *Crawl) Capture(item *frontier.Item) error {
if err != nil && err.Error() == "URL from redirection has already been seen" {
return err
} else if err != nil && err.Error() == "URL is being rate limited, sending back to HQ" {
c.HQProducerChannel <- frontier.NewItem(item.URL, item.ParentItem, item.Type, item.Hop, "", true)
c.HQProducerChannel <- queue.NewItem(item.URL, item.ParentItem, item.Type, item.Hop, "", true)
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("URL is being rate limited, sending back to HQ")
return err
} else if err != nil {
Expand Down Expand Up @@ -428,7 +429,7 @@ func (c *Crawl) Capture(item *frontier.Item) error {
// Seencheck the URLs we captured, we ignore the returned value here
// because we already archived the URLs, we just want them to be added
// to the seencheck table.
if c.Seencheck {
if c.UseSeencheck {
for _, cfstreamURL := range cfstreamURLs {
c.seencheckURL(cfstreamURL, "asset")
}
Expand Down Expand Up @@ -505,7 +506,7 @@ func (c *Crawl) Capture(item *frontier.Item) error {
// If --local-seencheck is enabled, then we check if the assets are in the
// seencheck DB. If they are, then they are skipped.
// Else, if we use HQ, then we use HQ's seencheck.
if c.Seencheck {
if c.UseSeencheck {
seencheckedBatch := []*url.URL{}

for _, URL := range assets {
Expand Down Expand Up @@ -541,12 +542,16 @@ func (c *Crawl) Capture(item *frontier.Item) error {
}
}

c.Frontier.QueueCount.Incr(int64(len(assets)))
// TODO: implement a counter for the number of assets
// currently being processed
// c.Frontier.QueueCount.Incr(int64(len(assets)))
swg := sizedwaitgroup.New(c.MaxConcurrentAssets)
excluded := false

for _, asset := range assets {
c.Frontier.QueueCount.Incr(-1)
// TODO: implement a counter for the number of assets
// currently being processed
// c.Frontier.QueueCount.Incr(-1)

// Just making sure we do not over archive by archiving the original URL
if utils.URLToString(item.URL) == utils.URLToString(asset) {
Expand Down Expand Up @@ -579,7 +584,7 @@ func (c *Crawl) Capture(item *frontier.Item) error {
defer swg.Done()

// Create the asset's item
newAsset := frontier.NewItem(asset, item, "asset", item.Hop, "", false)
newAsset := queue.NewItem(asset, item, "asset", item.Hop, "", false)

// Capture the asset
err = c.captureAsset(newAsset, resp.Cookies())
Expand Down
Loading
Loading