diff --git a/cmd/get.go b/cmd/get.go index 78e1a488..f73bd1dc 100644 --- a/cmd/get.go +++ b/cmd/get.go @@ -84,6 +84,10 @@ func getCMDsFlags(getCmd *cobra.Command) { getCmd.PersistentFlags().String("es-password", "", "ElasticSearch password to use for indexing crawl logs.") getCmd.PersistentFlags().String("es-index-prefix", "zeno", "ElasticSearch index prefix to use for indexing crawl logs. Default is : `zeno`, without `-`") + // Dependencies flags + getCmd.PersistentFlags().Bool("no-ytdlp", false, "Disable youtube-dlp usage for video extraction.") + getCmd.PersistentFlags().String("ytdlp-path", "", "Path to youtube-dlp binary.") + // Alias support // As cobra doesn't support aliases natively (couldn't find a way to do it), we have to do it manually // This is a workaround to allow users to use `--hops` instead of `--max-hops` for example diff --git a/config/config.go b/config/config.go index 298da11d..08745d6d 100644 --- a/config/config.go +++ b/config/config.go @@ -76,6 +76,10 @@ type Config struct { NoStdoutLogging bool `mapstructure:"no-stdout-log"` NoHandover bool `mapstructure:"no-handover"` NoBatchWriteWAL bool `mapstructure:"ultrasafe-queue"` + + // Dependencies + NoYTDLP bool `mapstructure:"no-ytdlp"` + YTDLPPath string `mapstructure:"ytdlp-path"` } var ( diff --git a/internal/pkg/crawl/assets.go b/internal/pkg/crawl/assets.go index f2783e77..541628cf 100644 --- a/internal/pkg/crawl/assets.go +++ b/internal/pkg/crawl/assets.go @@ -1,16 +1,124 @@ package crawl import ( + "io" + "net/http" "net/url" "regexp" "strings" + "sync/atomic" "github.com/PuerkitoBio/goquery" "github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/cloudflarestream" "github.com/internetarchive/Zeno/internal/pkg/queue" "github.com/internetarchive/Zeno/internal/pkg/utils" + "github.com/remeh/sizedwaitgroup" ) +func (c *Crawl) captureAsset(item *queue.Item, cookies []*http.Cookie) error { + var resp *http.Response + + // Prepare GET request + req, err := http.NewRequest("GET", utils.URLToString(item.URL), nil) + if err != nil { + return err + } + + req.Header.Set("Referer", utils.URLToString(item.ParentURL)) + req.Header.Set("User-Agent", c.UserAgent) + + // Apply cookies obtained from the original URL captured + for i := range cookies { + req.AddCookie(cookies[i]) + } + + resp, err = c.executeGET(item, req, false) + if err != nil && err.Error() == "URL from redirection has already been seen" { + return nil + } else if err != nil { + return err + } + defer resp.Body.Close() + + // needed for WARC writing + io.Copy(io.Discard, resp.Body) + + return nil +} + +func (c *Crawl) captureAssets(item *queue.Item, assets []*url.URL, cookies []*http.Cookie) { + // TODO: implement a counter for the number of assets + // currently being processed + // c.Frontier.QueueCount.Incr(int64(len(assets))) + swg := sizedwaitgroup.New(int(c.MaxConcurrentAssets)) + excluded := false + + for _, asset := range assets { + // TODO: implement a counter for the number of assets + // currently being processed + // c.Frontier.QueueCount.Incr(-1) + + // Just making sure we do not over archive by archiving the original URL + if utils.URLToString(item.URL) == utils.URLToString(asset) { + continue + } + + // We ban googlevideo.com URLs because they are heavily rate limited by default, and + // we don't want the crawler to spend an innapropriate amount of time archiving them + if strings.Contains(item.URL.Host, "googlevideo.com") { + continue + } + + // If the URL match any excluded string, we ignore it + for _, excludedString := range c.ExcludedStrings { + if strings.Contains(utils.URLToString(asset), excludedString) { + excluded = true + break + } + } + + if excluded { + excluded = false + continue + } + + swg.Add() + c.URIsPerSecond.Incr(1) + + go func(asset *url.URL, swg *sizedwaitgroup.SizedWaitGroup) { + defer swg.Done() + + // Create the asset's item + newAsset, err := queue.NewItem(asset, item.URL, "asset", item.Hop, "", false) + if err != nil { + c.Log.WithFields(c.genLogFields(err, asset, map[string]interface{}{ + "parentHop": item.Hop, + "parentUrl": utils.URLToString(item.URL), + "type": "asset", + })).Error("error while creating asset item") + return + } + + // Capture the asset + err = c.captureAsset(newAsset, cookies) + if err != nil { + c.Log.WithFields(c.genLogFields(err, &asset, map[string]interface{}{ + "parentHop": item.Hop, + "parentUrl": utils.URLToString(item.URL), + "type": "asset", + })).Error("error while capturing asset") + return + } + + // If we made it to this point, it means that the asset have been crawled successfully, + // then we can increment the locallyCrawled variable + atomic.AddUint64(&item.LocallyCrawled, 1) + }(asset, &swg) + } + + swg.Wait() +} + func (c *Crawl) extractAssets(base *url.URL, item *queue.Item, doc *goquery.Document) (assets []*url.URL, err error) { var rawAssets []string diff --git a/internal/pkg/crawl/capture.go b/internal/pkg/crawl/capture.go index 856d2852..b33a33c3 100644 --- a/internal/pkg/crawl/capture.go +++ b/internal/pkg/crawl/capture.go @@ -8,7 +8,6 @@ import ( "net/url" "strings" "sync" - "sync/atomic" "time" "github.com/PuerkitoBio/goquery" @@ -20,9 +19,9 @@ import ( "github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/tiktok" "github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/truthsocial" "github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/vk" + "github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/youtube" "github.com/internetarchive/Zeno/internal/pkg/queue" "github.com/internetarchive/Zeno/internal/pkg/utils" - "github.com/remeh/sizedwaitgroup" ) func (c *Crawl) executeGET(item *queue.Item, req *http.Request, isRedirection bool) (resp *http.Response, err error) { @@ -188,37 +187,6 @@ func (c *Crawl) executeGET(item *queue.Item, req *http.Request, isRedirection bo return resp, nil } -func (c *Crawl) captureAsset(item *queue.Item, cookies []*http.Cookie) error { - var resp *http.Response - - // Prepare GET request - req, err := http.NewRequest("GET", utils.URLToString(item.URL), nil) - if err != nil { - return err - } - - req.Header.Set("Referer", utils.URLToString(item.ParentURL)) - req.Header.Set("User-Agent", c.UserAgent) - - // Apply cookies obtained from the original URL captured - for i := range cookies { - req.AddCookie(cookies[i]) - } - - resp, err = c.executeGET(item, req, false) - if err != nil && err.Error() == "URL from redirection has already been seen" { - return nil - } else if err != nil { - return err - } - defer resp.Body.Close() - - // needed for WARC writing - io.Copy(io.Discard, resp.Body) - - return nil -} - // Capture capture the URL and return the outlinks func (c *Crawl) Capture(item *queue.Item) error { var ( @@ -370,6 +338,22 @@ func (c *Crawl) Capture(item *queue.Item) error { } defer resp.Body.Close() + // If it was a YouTube watch page, we potentially want to run it through the YouTube extractor + // TODO: support other watch page URLs + if strings.Contains(item.URL.Host, "youtube.com") && strings.Contains(item.URL.Path, "/watch") && !c.NoYTDLP { + URLs, err := youtube.Parse(resp.Body) + if err != nil { + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while parsing YouTube watch page") + return err + } + + if len(URLs) > 0 { + c.captureAssets(item, URLs, resp.Cookies()) + } + + return nil + } + // Scrape potential URLs from Link HTTP header var ( links = Parse(resp.Header.Get("link")) @@ -577,76 +561,8 @@ func (c *Crawl) Capture(item *queue.Item) error { } } - // TODO: implement a counter for the number of assets - // currently being processed - // c.Frontier.QueueCount.Incr(int64(len(assets))) - swg := sizedwaitgroup.New(int(c.MaxConcurrentAssets)) - excluded := false - - for _, asset := range assets { - // TODO: implement a counter for the number of assets - // currently being processed - // c.Frontier.QueueCount.Incr(-1) - - // Just making sure we do not over archive by archiving the original URL - if utils.URLToString(item.URL) == utils.URLToString(asset) { - continue - } - - // We ban googlevideo.com URLs because they are heavily rate limited by default, and - // we don't want the crawler to spend an innapropriate amount of time archiving them - if strings.Contains(item.URL.Host, "googlevideo.com") { - continue - } - - // If the URL match any excluded string, we ignore it - for _, excludedString := range c.ExcludedStrings { - if strings.Contains(utils.URLToString(asset), excludedString) { - excluded = true - break - } - } - - if excluded { - excluded = false - continue - } - - swg.Add() - c.URIsPerSecond.Incr(1) - - go func(asset *url.URL, swg *sizedwaitgroup.SizedWaitGroup) { - defer swg.Done() - - // Create the asset's item - newAsset, err := queue.NewItem(asset, item.URL, "asset", item.Hop, "", false) - if err != nil { - c.Log.WithFields(c.genLogFields(err, asset, map[string]interface{}{ - "parentHop": item.Hop, - "parentUrl": utils.URLToString(item.URL), - "type": "asset", - })).Error("error while creating asset item") - return - } - - // Capture the asset - err = c.captureAsset(newAsset, resp.Cookies()) - if err != nil { - c.Log.WithFields(c.genLogFields(err, &asset, map[string]interface{}{ - "parentHop": item.Hop, - "parentUrl": utils.URLToString(item.URL), - "type": "asset", - })).Error("error while capturing asset") - return - } - - // If we made it to this point, it means that the asset have been crawled successfully, - // then we can increment the locallyCrawled variable - atomic.AddUint64(&item.LocallyCrawled, 1) - }(asset, &swg) - } + c.captureAssets(item, assets, resp.Cookies()) - swg.Wait() return err } diff --git a/internal/pkg/crawl/config.go b/internal/pkg/crawl/config.go index 5e573144..2dff2270 100644 --- a/internal/pkg/crawl/config.go +++ b/internal/pkg/crawl/config.go @@ -97,7 +97,7 @@ type Crawl struct { CDXDedupeServer string WARCFullOnDisk bool WARCPoolSize int - WARCDedupSize int + WARCDedupeSize int DisableLocalDedupe bool CertValidation bool WARCCustomCookie string @@ -116,6 +116,10 @@ type Crawl struct { HQProducerChannel chan *queue.Item HQChannelsWg *sync.WaitGroup HQRateLimitingSendBack bool + + // Dependencies + NoYTDLP bool + YTDLPPath string } func GenerateCrawlConfig(config *config.Config) (*Crawl, error) { @@ -231,7 +235,7 @@ func GenerateCrawlConfig(config *config.Config) (*Crawl, error) { c.CertValidation = config.CertValidation c.WARCFullOnDisk = config.WARCOnDisk c.WARCPoolSize = config.WARCPoolSize - c.WARCDedupSize = config.WARCDedupeSize + c.WARCDedupeSize = config.WARCDedupeSize c.WARCCustomCookie = config.CDXCookie c.API = config.API @@ -246,6 +250,10 @@ func GenerateCrawlConfig(config *config.Config) (*Crawl, error) { c.PrometheusMetrics.Prefix = config.PrometheusPrefix } + // Dependencies + c.NoYTDLP = config.NoYTDLP + c.YTDLPPath = config.YTDLPPath + if config.UserAgent != "Zeno" { c.UserAgent = config.UserAgent } else { diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index 5d8e5af2..7e02983c 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -8,6 +8,7 @@ import ( "git.archive.org/wb/gocrawlhq" "github.com/CorentinB/warc" + "github.com/internetarchive/Zeno/internal/pkg/crawl/dependencies/ytdlp" "github.com/internetarchive/Zeno/internal/pkg/queue" "github.com/internetarchive/Zeno/internal/pkg/seencheck" "github.com/internetarchive/Zeno/internal/pkg/utils" @@ -67,9 +68,9 @@ func (c *Crawl) Start() (err error) { // Init WARC rotator settings rotatorSettings := c.initWARCRotatorSettings() - dedupeOptions := warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, SizeThreshold: c.WARCDedupSize} + dedupeOptions := warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, SizeThreshold: c.WARCDedupeSize} if c.CDXDedupeServer != "" { - dedupeOptions = warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, CDXDedupe: true, CDXURL: c.CDXDedupeServer, CDXCookie: c.WARCCustomCookie, SizeThreshold: c.WARCDedupSize} + dedupeOptions = warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, CDXDedupe: true, CDXURL: c.CDXDedupeServer, CDXCookie: c.WARCCustomCookie, SizeThreshold: c.WARCDedupeSize} } // Init the HTTP client responsible for recording HTTP(s) requests / responses @@ -125,6 +126,20 @@ func (c *Crawl) Start() (err error) { go c.startAPI() } + // Verify that dependencies exist on the system + if !c.NoYTDLP { + // If a yt-dlp path is specified, we use it, + // otherwise we try to find yt-dlp on the system + if c.YTDLPPath == "" { + path, found := ytdlp.FindPath() + if !found { + c.Log.Warn("yt-dlp not found on the system, please install it or specify the path in the configuration if you wish to use it") + } else { + c.YTDLPPath = path + } + } + } + // Parse input cookie file if specified if c.CookieFile != "" { cookieJar, err := cookiejar.NewFileJar(c.CookieFile, nil) diff --git a/internal/pkg/crawl/dependencies/ytdlp/model.go b/internal/pkg/crawl/dependencies/ytdlp/model.go new file mode 100644 index 00000000..51d990d4 --- /dev/null +++ b/internal/pkg/crawl/dependencies/ytdlp/model.go @@ -0,0 +1,11 @@ +package ytdlp + +type Video struct { + IsLive bool `json:"is_live"` + RequestedFormats []struct { + URL string `json:"url"` + } `json:"requested_formats"` + Thumbnails []struct { + URL string `json:"url"` + } `json:"thumbnails"` +} diff --git a/internal/pkg/crawl/dependencies/ytdlp/parse.go b/internal/pkg/crawl/dependencies/ytdlp/parse.go new file mode 100644 index 00000000..ab49b680 --- /dev/null +++ b/internal/pkg/crawl/dependencies/ytdlp/parse.go @@ -0,0 +1,42 @@ +package ytdlp + +import ( + "encoding/json" + "fmt" +) + +type SubtitleInfo struct { + Ext string `json:"ext"` + URL string `json:"url"` + Name string `json:"name"` +} + +// parseSubtitles parses the subtitles from the yt-dlp JSON output, +// it's needed because the subtitles are not given as a proper array or objects +func parseSubtitles(jsonData string) ([]string, error) { + var data map[string]json.RawMessage + err := json.Unmarshal([]byte(jsonData), &data) + if err != nil { + return nil, fmt.Errorf("error unmarshaling outer JSON: %v", err) + } + + subtitlesRaw, ok := data["subtitles"] + if !ok { + return nil, nil + } + + var subtitles map[string][]SubtitleInfo + err = json.Unmarshal(subtitlesRaw, &subtitles) + if err != nil { + return nil, fmt.Errorf("error unmarshaling subtitles JSON: %v", err) + } + + var URLs []string + for _, langSubtitles := range subtitles { + for _, subtitle := range langSubtitles { + URLs = append(URLs, subtitle.URL) + } + } + + return URLs, nil +} diff --git a/internal/pkg/crawl/dependencies/ytdlp/server.go b/internal/pkg/crawl/dependencies/ytdlp/server.go new file mode 100644 index 00000000..334c7ee4 --- /dev/null +++ b/internal/pkg/crawl/dependencies/ytdlp/server.go @@ -0,0 +1,46 @@ +package ytdlp + +import ( + "io" + "net" + "net/http" + "strings" +) + +func ServeBody(body io.ReadCloser) (port int, stopChan chan struct{}, err error) { + stopChan = make(chan struct{}) + portChan := make(chan int) + + bodyBytes, err := io.ReadAll(body) + if err != nil { + return 0, nil, err + } + + // Start the server + go func() { + // Serve the body on the random port + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(err) + } + defer listener.Close() + + portChan <- listener.Addr().(*net.TCPAddr).Port + + go func() { + <-stopChan + listener.Close() + }() + + // Create a handler that will serve the body on / + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write(bodyBytes) + }) + + if err := http.Serve(listener, handler); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { + return + } + }() + + return <-portChan, stopChan, nil +} diff --git a/internal/pkg/crawl/dependencies/ytdlp/ytdlp.go b/internal/pkg/crawl/dependencies/ytdlp/ytdlp.go new file mode 100644 index 00000000..75afc1c0 --- /dev/null +++ b/internal/pkg/crawl/dependencies/ytdlp/ytdlp.go @@ -0,0 +1,65 @@ +package ytdlp + +import ( + "bytes" + "encoding/json" + "fmt" + "os/exec" + "strconv" +) + +func GetJSON(port int) (URLs []string, err error) { + // Prepare the command + cmd := exec.Command("yt-dlp", "--dump-json", "http://localhost:"+strconv.Itoa(port)) + + // Buffers to capture stdout and stderr + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + // Run the command + err = cmd.Run() + if err != nil { + return URLs, fmt.Errorf("yt-dlp error: %v\nstderr: %s", err, stderr.String()) + } + + output := stdout.String() + + // Find subtitles + subtitleURLs, err := parseSubtitles(output) + if err != nil { + return nil, err + } + + // Parse the output as a Video object + var video Video + err = json.Unmarshal([]byte(output), &video) + if err != nil { + return nil, fmt.Errorf("error unmarshaling yt-dlp JSON: %v", err) + } + + // Get all thumbnail URLs + for _, thumbnail := range video.Thumbnails { + URLs = append(URLs, thumbnail.URL) + } + + // Get the manifest URL for the best video & audio quality + // Note: we do not archive live streams + if !video.IsLive { + for format := range video.RequestedFormats { + URLs = append(URLs, video.RequestedFormats[format].URL) + } + } + + URLs = append(URLs, subtitleURLs...) + + return URLs, nil +} + +func FindPath() (string, bool) { + path, err := exec.LookPath("yt-dlp") + if err != nil { + return "", false + } + return path, true +} diff --git a/internal/pkg/crawl/sitespecific/youtube/youtube.go b/internal/pkg/crawl/sitespecific/youtube/youtube.go new file mode 100644 index 00000000..888a5b08 --- /dev/null +++ b/internal/pkg/crawl/sitespecific/youtube/youtube.go @@ -0,0 +1,35 @@ +package youtube + +import ( + "io" + "net/url" + + "github.com/internetarchive/Zeno/internal/pkg/crawl/dependencies/ytdlp" +) + +func Parse(body io.ReadCloser) (URLs []*url.URL, err error) { + // Create a temporary server to serve the body and call ytdlp on it + port, stopChan, err := ytdlp.ServeBody(body) + if err != nil { + return nil, err + } + defer close(stopChan) + + // Call ytdlp on the temporary server + rawURLs, err := ytdlp.GetJSON(port) + if err != nil { + return nil, err + } + + // Parse the URLs + for _, urlString := range rawURLs { + URL, err := url.Parse(urlString) + if err != nil { + return nil, err + } + + URLs = append(URLs, URL) + } + + return URLs, nil +} diff --git a/internal/pkg/crawl/sitespecific/youtube/youtube_test.go b/internal/pkg/crawl/sitespecific/youtube/youtube_test.go new file mode 100644 index 00000000..44449b0d --- /dev/null +++ b/internal/pkg/crawl/sitespecific/youtube/youtube_test.go @@ -0,0 +1,27 @@ +package youtube + +import ( + "os" + "testing" +) + +func TestParse(t *testing.T) { + // Make io.ReadCloser from the youtube_test.html file + f, err := os.Open("youtube_test.html") + if err != nil { + t.Fatal(err) + } + defer f.Close() + + // Parse the video + URLs, err := Parse(f) + if err != nil { + t.Fatal(err) + } + + // Check the number of URLs + expected := 146 + if len(URLs) != expected { + t.Fatalf("Expected %d URLs, got %d", expected, len(URLs)) + } +} diff --git a/internal/pkg/crawl/sitespecific/youtube/youtube_test.html b/internal/pkg/crawl/sitespecific/youtube/youtube_test.html new file mode 100644 index 00000000..77474015 --- /dev/null +++ b/internal/pkg/crawl/sitespecific/youtube/youtube_test.html @@ -0,0 +1,88 @@ +