Skip to content

Commit

Permalink
Merge pull request #78 from internetarchive/queue
Browse files Browse the repository at this point in the history
Rewriting the queue
  • Loading branch information
equals215 authored Aug 4, 2024
2 parents 193a78f + b404d69 commit 53dd830
Show file tree
Hide file tree
Showing 59 changed files with 5,400 additions and 989 deletions.
1 change: 0 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ jobs:
run: go build -v ./...

- name: Test
continue-on-error: true
run: go test -race -v ./...

- name: Goroutine leak detector
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ Zeno
*.txt
*.sh
zeno.log
.vscode/
.vscode/
*.py
6 changes: 4 additions & 2 deletions cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func getCMDsFlags(getCmd *cobra.Command) {
getCmd.PersistentFlags().String("job", "", "Job name to use, will determine the path for the persistent queue, seencheck database, and WARC files.")
getCmd.PersistentFlags().IntP("workers", "w", 1, "Number of concurrent workers to run.")
getCmd.PersistentFlags().Int("max-concurrent-assets", 8, "Max number of concurrent assets to fetch PER worker. E.g. if you have 100 workers and this setting at 8, Zeno could do up to 800 concurrent requests at any time.")
getCmd.PersistentFlags().Uint("max-hops", 0, "Maximum number of hops to execute.")
getCmd.PersistentFlags().Uint8("max-hops", 0, "Maximum number of hops to execute.")
getCmd.PersistentFlags().String("cookies", "", "File containing cookies that will be used for requests.")
getCmd.PersistentFlags().Bool("keep-cookies", false, "Keep a global cookie jar")
getCmd.PersistentFlags().Bool("headless", false, "Use headless browsers instead of standard GET requests.")
Expand All @@ -56,6 +56,8 @@ func getCMDsFlags(getCmd *cobra.Command) {
getCmd.PersistentFlags().StringSlice("exclude-string", []string{}, "Discard any (discovered) URLs containing this string.")
getCmd.PersistentFlags().Bool("random-local-ip", false, "Use random local IP for requests. (will be ignored if a proxy is set)")
getCmd.PersistentFlags().Int("min-space-required", 20, "Minimum space required in GB to continue the crawl.")
getCmd.PersistentFlags().Bool("handover", false, "Use handover mechanism to dispatch URLs via a buffer before enqueuing on disk.")
getCmd.PersistentFlags().Bool("batch-write-WAL", false, "Use commited batch writes to the WAL. (can cause more seed loss on crash)")

// Proxy flags
getCmd.PersistentFlags().String("proxy", "", "Proxy to use when requesting pages.")
Expand Down Expand Up @@ -88,7 +90,7 @@ func getCMDsFlags(getCmd *cobra.Command) {
// Aliases shouldn't be used as proper flags nor declared in the config struct
// Aliases should be marked as deprecated to inform the user base
// Aliases values should be copied to the proper flag in the config/config.go:handleFlagsAliases() function
getCmd.PersistentFlags().Uint("hops", 0, "Maximum number of hops to execute.")
getCmd.PersistentFlags().Uint8("hops", 0, "Maximum number of hops to execute.")
getCmd.PersistentFlags().MarkDeprecated("hops", "use --max-hops instead")
getCmd.PersistentFlags().MarkHidden("hops")

Expand Down
4 changes: 2 additions & 2 deletions cmd/get_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"

"github.com/internetarchive/Zeno/internal/pkg/crawl"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
"github.com/internetarchive/Zeno/internal/pkg/queue"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -32,7 +32,7 @@ var getListCmd = &cobra.Command{
}

// Initialize initial seed list
crawl.SeedList, err = frontier.IsSeedList(args[0])
crawl.SeedList, err = queue.FileToItems(args[0])
if err != nil || len(crawl.SeedList) <= 0 {
crawl.Log.WithFields(map[string]interface{}{
"input": args[0],
Expand Down
12 changes: 10 additions & 2 deletions cmd/get_url.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"net/url"

"github.com/internetarchive/Zeno/internal/pkg/crawl"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
"github.com/internetarchive/Zeno/internal/pkg/queue"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -43,7 +43,15 @@ var getURLCmd = &cobra.Command{
return err
}

crawl.SeedList = append(crawl.SeedList, *frontier.NewItem(input, nil, "seed", 0, "", false))
item, err := queue.NewItem(input, nil, "seed", 0, "", false)
if err != nil {
crawl.Log.WithFields(map[string]interface{}{
"input_url": arg,
"err": err.Error(),
}).Error("Failed to create new item")
return err
}
crawl.SeedList = append(crawl.SeedList, *item)
}

// Start crawl
Expand Down
8 changes: 5 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ type Config struct {
ElasticSearchURLs []string `mapstructure:"es-url"`
WorkersCount int `mapstructure:"workers"`
MaxConcurrentAssets int `mapstructure:"max-concurrent-assets"`
MaxHops uint `mapstructure:"max-hops"`
MaxRedirect int `mapstructure:"max-redirect"`
MaxRetry int `mapstructure:"max-retry"`
MaxHops uint8 `mapstructure:"max-hops"`
MaxRedirect uint8 `mapstructure:"max-redirect"`
MaxRetry uint8 `mapstructure:"max-retry"`
HTTPTimeout int `mapstructure:"http-timeout"`
MaxConcurrentRequestsPerDomain int `mapstructure:"max-concurrent-per-domain"`
ConcurrentSleepLength int `mapstructure:"concurrent-sleep-length"`
Expand Down Expand Up @@ -74,6 +74,8 @@ type Config struct {
HQContinuousPull bool `mapstructure:"hq-continuous-pull"`
HQRateLimitSendBack bool `mapstructure:"hq-rate-limiting-send-back"`
NoStdoutLogging bool `mapstructure:"no-stdout-log"`
Handover bool `mapstructure:"handover"`
BatchWriteWAL bool `mapstructure:"batch-write-WAL"`
}

var (
Expand Down
11 changes: 5 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/CorentinB/warc v0.8.40
github.com/PuerkitoBio/goquery v1.9.2
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2
github.com/beeker1121/goque v2.1.0+incompatible
github.com/clbanning/mxj/v2 v2.7.0
github.com/dustin/go-humanize v1.0.1
github.com/elastic/go-elasticsearch/v8 v8.14.0
Expand All @@ -25,9 +24,11 @@ require (
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
github.com/telanflow/cookiejar v0.0.0-20190719062046-114449e86aa5
github.com/tomnomnom/linkheader v0.0.0-20180905144013-02ca5825eb80
github.com/zeebo/xxh3 v1.0.2
go.uber.org/goleak v1.3.0
golang.org/x/net v0.26.0
golang.org/x/net v0.27.0
google.golang.org/protobuf v1.34.2
mvdan.cc/xurls/v2 v2.5.0
)

Expand All @@ -52,7 +53,6 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/klauspost/pgzip v1.2.6 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
Expand Down Expand Up @@ -84,12 +84,11 @@ require (
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
20 changes: 6 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPd
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/beeker1121/goque v2.1.0+incompatible h1:m5pZ5b8nqzojS2DF2ioZphFYQUqGYsDORq6uefUItPM=
github.com/beeker1121/goque v2.1.0+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down Expand Up @@ -78,8 +76,6 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU=
github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
Expand Down Expand Up @@ -183,10 +179,6 @@ github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpP
github.com/telanflow/cookiejar v0.0.0-20190719062046-114449e86aa5 h1:gTQl5nPlc9B53vFOKM8aJHwxB2BW2kM49PVR5526GBg=
github.com/telanflow/cookiejar v0.0.0-20190719062046-114449e86aa5/go.mod h1:qNgA5MKwTh103SxGTooqZMiKxZTaV9UV3KjN7I7Drig=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
Expand All @@ -203,8 +195,8 @@ go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
Expand All @@ -215,8 +207,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -234,8 +226,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
Expand Down
20 changes: 14 additions & 6 deletions internal/pkg/crawl/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ type APIWorkersState struct {

// APIWorkerState represents the state of an API worker.
type APIWorkerState struct {
WorkerID string `json:"worker_id"`
Status string `json:"status"`
LastError string `json:"last_error"`
LastSeen string `json:"last_seen"`
Locked bool `json:"locked"`
WorkerID string `json:"worker_id"`
Status string `json:"status"`
LastError string `json:"last_error"`
LastSeen string `json:"last_seen"`
LastAction string `json:"last_action"`
Locked bool `json:"locked"`
}

// startAPI starts the API server for the crawl
Expand All @@ -42,7 +43,7 @@ func (crawl *Crawl) startAPI() {
"crawled": crawledSeeds + crawledAssets,
"crawledSeeds": crawledSeeds,
"crawledAssets": crawledAssets,
"queued": crawl.Frontier.QueueCount.Value(),
"queued": crawl.Queue.GetStats().TotalElements,
"uptime": time.Since(crawl.StartTime).String(),
}

Expand All @@ -53,12 +54,19 @@ func (crawl *Crawl) startAPI() {
http.HandleFunc("/metrics", setupPrometheus(crawl).ServeHTTP)
}

http.HandleFunc("/queue", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(crawl.Queue.GetStats())
})

http.HandleFunc("/workers", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
workersState := crawl.Workers.GetWorkerStateFromPool("")
json.NewEncoder(w).Encode(workersState)
})

http.HandleFunc("/worker/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
workerID := strings.TrimPrefix(r.URL.Path, "/worker/")
workersState := crawl.Workers.GetWorkerStateFromPool(workerID)
if workersState == nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/crawl/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (

"github.com/PuerkitoBio/goquery"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/cloudflarestream"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
"github.com/internetarchive/Zeno/internal/pkg/queue"
"github.com/internetarchive/Zeno/internal/pkg/utils"
)

func (c *Crawl) extractAssets(base *url.URL, item *frontier.Item, doc *goquery.Document) (assets []*url.URL, err error) {
func (c *Crawl) extractAssets(base *url.URL, item *queue.Item, doc *goquery.Document) (assets []*url.URL, err error) {
var rawAssets []string

// Execute plugins on the response
Expand Down
Loading

0 comments on commit 53dd830

Please sign in to comment.