Skip to content

Commit

Permalink
add: yt-dlp support to gather YouTube URLs from watch pages
Browse files Browse the repository at this point in the history
  • Loading branch information
CorentinB committed Aug 12, 2024
1 parent 26cc707 commit 7c95917
Show file tree
Hide file tree
Showing 13 changed files with 475 additions and 106 deletions.
4 changes: 4 additions & 0 deletions cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func getCMDsFlags(getCmd *cobra.Command) {
getCmd.PersistentFlags().String("es-password", "", "ElasticSearch password to use for indexing crawl logs.")
getCmd.PersistentFlags().String("es-index-prefix", "zeno", "ElasticSearch index prefix to use for indexing crawl logs. Default is : `zeno`, without `-`")

// Dependencies flags
getCmd.PersistentFlags().Bool("no-ytdlp", false, "Disable youtube-dlp usage for video extraction.")
getCmd.PersistentFlags().String("ytdlp-path", "", "Path to youtube-dlp binary.")

// Alias support
// As cobra doesn't support aliases natively (couldn't find a way to do it), we have to do it manually
// This is a workaround to allow users to use `--hops` instead of `--max-hops` for example
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ type Config struct {
NoStdoutLogging bool `mapstructure:"no-stdout-log"`
NoHandover bool `mapstructure:"no-handover"`
NoBatchWriteWAL bool `mapstructure:"ultrasafe-queue"`

// Dependencies
NoYTDLP bool `mapstructure:"no-ytdlp"`
YTDLPPath string `mapstructure:"ytdlp-path"`
}

var (
Expand Down
108 changes: 108 additions & 0 deletions internal/pkg/crawl/assets.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,124 @@
package crawl

import (
"io"
"net/http"
"net/url"
"regexp"
"strings"
"sync/atomic"

"github.com/PuerkitoBio/goquery"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/cloudflarestream"
"github.com/internetarchive/Zeno/internal/pkg/queue"
"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/remeh/sizedwaitgroup"
)

func (c *Crawl) captureAsset(item *queue.Item, cookies []*http.Cookie) error {
var resp *http.Response

// Prepare GET request
req, err := http.NewRequest("GET", utils.URLToString(item.URL), nil)
if err != nil {
return err
}

req.Header.Set("Referer", utils.URLToString(item.ParentURL))
req.Header.Set("User-Agent", c.UserAgent)

// Apply cookies obtained from the original URL captured
for i := range cookies {
req.AddCookie(cookies[i])
}

resp, err = c.executeGET(item, req, false)
if err != nil && err.Error() == "URL from redirection has already been seen" {
return nil
} else if err != nil {
return err
}
defer resp.Body.Close()

// needed for WARC writing
io.Copy(io.Discard, resp.Body)

return nil
}

func (c *Crawl) captureAssets(item *queue.Item, assets []*url.URL, cookies []*http.Cookie) {
// TODO: implement a counter for the number of assets
// currently being processed
// c.Frontier.QueueCount.Incr(int64(len(assets)))
swg := sizedwaitgroup.New(int(c.MaxConcurrentAssets))
excluded := false

for _, asset := range assets {
// TODO: implement a counter for the number of assets
// currently being processed
// c.Frontier.QueueCount.Incr(-1)

// Just making sure we do not over archive by archiving the original URL
if utils.URLToString(item.URL) == utils.URLToString(asset) {
continue
}

// We ban googlevideo.com URLs because they are heavily rate limited by default, and
// we don't want the crawler to spend an innapropriate amount of time archiving them
if strings.Contains(item.URL.Host, "googlevideo.com") {
continue
}

// If the URL match any excluded string, we ignore it
for _, excludedString := range c.ExcludedStrings {
if strings.Contains(utils.URLToString(asset), excludedString) {
excluded = true
break
}
}

if excluded {
excluded = false
continue
}

swg.Add()
c.URIsPerSecond.Incr(1)

go func(asset *url.URL, swg *sizedwaitgroup.SizedWaitGroup) {
defer swg.Done()

// Create the asset's item
newAsset, err := queue.NewItem(asset, item.URL, "asset", item.Hop, "", false)
if err != nil {
c.Log.WithFields(c.genLogFields(err, asset, map[string]interface{}{
"parentHop": item.Hop,
"parentUrl": utils.URLToString(item.URL),
"type": "asset",
})).Error("error while creating asset item")
return
}

// Capture the asset
err = c.captureAsset(newAsset, cookies)
if err != nil {
c.Log.WithFields(c.genLogFields(err, &asset, map[string]interface{}{
"parentHop": item.Hop,
"parentUrl": utils.URLToString(item.URL),
"type": "asset",
})).Error("error while capturing asset")
return
}

// If we made it to this point, it means that the asset have been crawled successfully,
// then we can increment the locallyCrawled variable
atomic.AddUint64(&item.LocallyCrawled, 1)
}(asset, &swg)
}

swg.Wait()
}

func (c *Crawl) extractAssets(base *url.URL, item *queue.Item, doc *goquery.Document) (assets []*url.URL, err error) {
var rawAssets []string

Expand Down
120 changes: 18 additions & 102 deletions internal/pkg/crawl/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/PuerkitoBio/goquery"
Expand All @@ -20,9 +19,9 @@ import (
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/tiktok"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/truthsocial"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/vk"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/youtube"
"github.com/internetarchive/Zeno/internal/pkg/queue"
"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/remeh/sizedwaitgroup"
)

func (c *Crawl) executeGET(item *queue.Item, req *http.Request, isRedirection bool) (resp *http.Response, err error) {
Expand Down Expand Up @@ -188,37 +187,6 @@ func (c *Crawl) executeGET(item *queue.Item, req *http.Request, isRedirection bo
return resp, nil
}

func (c *Crawl) captureAsset(item *queue.Item, cookies []*http.Cookie) error {
var resp *http.Response

// Prepare GET request
req, err := http.NewRequest("GET", utils.URLToString(item.URL), nil)
if err != nil {
return err
}

req.Header.Set("Referer", utils.URLToString(item.ParentURL))
req.Header.Set("User-Agent", c.UserAgent)

// Apply cookies obtained from the original URL captured
for i := range cookies {
req.AddCookie(cookies[i])
}

resp, err = c.executeGET(item, req, false)
if err != nil && err.Error() == "URL from redirection has already been seen" {
return nil
} else if err != nil {
return err
}
defer resp.Body.Close()

// needed for WARC writing
io.Copy(io.Discard, resp.Body)

return nil
}

// Capture capture the URL and return the outlinks
func (c *Crawl) Capture(item *queue.Item) error {
var (
Expand Down Expand Up @@ -370,6 +338,22 @@ func (c *Crawl) Capture(item *queue.Item) error {
}
defer resp.Body.Close()

// If it was a YouTube watch page, we potentially want to run it through the YouTube extractor
// TODO: support other watch page URLs
if strings.Contains(item.URL.Host, "youtube.com") && strings.Contains(item.URL.Path, "/watch") && !c.NoYTDLP {
URLs, err := youtube.Parse(resp.Body)
if err != nil {
c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while parsing YouTube watch page")
return err
}

if len(URLs) > 0 {
c.captureAssets(item, URLs, resp.Cookies())
}

return nil
}

// Scrape potential URLs from Link HTTP header
var (
links = Parse(resp.Header.Get("link"))
Expand Down Expand Up @@ -577,76 +561,8 @@ func (c *Crawl) Capture(item *queue.Item) error {
}
}

// TODO: implement a counter for the number of assets
// currently being processed
// c.Frontier.QueueCount.Incr(int64(len(assets)))
swg := sizedwaitgroup.New(int(c.MaxConcurrentAssets))
excluded := false

for _, asset := range assets {
// TODO: implement a counter for the number of assets
// currently being processed
// c.Frontier.QueueCount.Incr(-1)

// Just making sure we do not over archive by archiving the original URL
if utils.URLToString(item.URL) == utils.URLToString(asset) {
continue
}

// We ban googlevideo.com URLs because they are heavily rate limited by default, and
// we don't want the crawler to spend an innapropriate amount of time archiving them
if strings.Contains(item.URL.Host, "googlevideo.com") {
continue
}

// If the URL match any excluded string, we ignore it
for _, excludedString := range c.ExcludedStrings {
if strings.Contains(utils.URLToString(asset), excludedString) {
excluded = true
break
}
}

if excluded {
excluded = false
continue
}

swg.Add()
c.URIsPerSecond.Incr(1)

go func(asset *url.URL, swg *sizedwaitgroup.SizedWaitGroup) {
defer swg.Done()

// Create the asset's item
newAsset, err := queue.NewItem(asset, item.URL, "asset", item.Hop, "", false)
if err != nil {
c.Log.WithFields(c.genLogFields(err, asset, map[string]interface{}{
"parentHop": item.Hop,
"parentUrl": utils.URLToString(item.URL),
"type": "asset",
})).Error("error while creating asset item")
return
}

// Capture the asset
err = c.captureAsset(newAsset, resp.Cookies())
if err != nil {
c.Log.WithFields(c.genLogFields(err, &asset, map[string]interface{}{
"parentHop": item.Hop,
"parentUrl": utils.URLToString(item.URL),
"type": "asset",
})).Error("error while capturing asset")
return
}

// If we made it to this point, it means that the asset have been crawled successfully,
// then we can increment the locallyCrawled variable
atomic.AddUint64(&item.LocallyCrawled, 1)
}(asset, &swg)
}
c.captureAssets(item, assets, resp.Cookies())

swg.Wait()
return err
}

Expand Down
12 changes: 10 additions & 2 deletions internal/pkg/crawl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type Crawl struct {
CDXDedupeServer string
WARCFullOnDisk bool
WARCPoolSize int
WARCDedupSize int
WARCDedupeSize int
DisableLocalDedupe bool
CertValidation bool
WARCCustomCookie string
Expand All @@ -116,6 +116,10 @@ type Crawl struct {
HQProducerChannel chan *queue.Item
HQChannelsWg *sync.WaitGroup
HQRateLimitingSendBack bool

// Dependencies
NoYTDLP bool
YTDLPPath string
}

func GenerateCrawlConfig(config *config.Config) (*Crawl, error) {
Expand Down Expand Up @@ -231,7 +235,7 @@ func GenerateCrawlConfig(config *config.Config) (*Crawl, error) {
c.CertValidation = config.CertValidation
c.WARCFullOnDisk = config.WARCOnDisk
c.WARCPoolSize = config.WARCPoolSize
c.WARCDedupSize = config.WARCDedupeSize
c.WARCDedupeSize = config.WARCDedupeSize
c.WARCCustomCookie = config.CDXCookie

c.API = config.API
Expand All @@ -246,6 +250,10 @@ func GenerateCrawlConfig(config *config.Config) (*Crawl, error) {
c.PrometheusMetrics.Prefix = config.PrometheusPrefix
}

// Dependencies
c.NoYTDLP = config.NoYTDLP
c.YTDLPPath = config.YTDLPPath

if config.UserAgent != "Zeno" {
c.UserAgent = config.UserAgent
} else {
Expand Down
19 changes: 17 additions & 2 deletions internal/pkg/crawl/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"git.archive.org/wb/gocrawlhq"
"github.com/CorentinB/warc"
"github.com/internetarchive/Zeno/internal/pkg/crawl/dependencies/ytdlp"
"github.com/internetarchive/Zeno/internal/pkg/queue"
"github.com/internetarchive/Zeno/internal/pkg/seencheck"
"github.com/internetarchive/Zeno/internal/pkg/utils"
Expand Down Expand Up @@ -67,9 +68,9 @@ func (c *Crawl) Start() (err error) {
// Init WARC rotator settings
rotatorSettings := c.initWARCRotatorSettings()

dedupeOptions := warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, SizeThreshold: c.WARCDedupSize}
dedupeOptions := warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, SizeThreshold: c.WARCDedupeSize}
if c.CDXDedupeServer != "" {
dedupeOptions = warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, CDXDedupe: true, CDXURL: c.CDXDedupeServer, CDXCookie: c.WARCCustomCookie, SizeThreshold: c.WARCDedupSize}
dedupeOptions = warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, CDXDedupe: true, CDXURL: c.CDXDedupeServer, CDXCookie: c.WARCCustomCookie, SizeThreshold: c.WARCDedupeSize}
}

// Init the HTTP client responsible for recording HTTP(s) requests / responses
Expand Down Expand Up @@ -125,6 +126,20 @@ func (c *Crawl) Start() (err error) {
go c.startAPI()
}

// Verify that dependencies exist on the system
if !c.NoYTDLP {
// If a yt-dlp path is specified, we use it,
// otherwise we try to find yt-dlp on the system
if c.YTDLPPath == "" {
path, found := ytdlp.FindPath()
if !found {
c.Log.Warn("yt-dlp not found on the system, please install it or specify the path in the configuration if you wish to use it")
} else {
c.YTDLPPath = path
}
}
}

// Parse input cookie file if specified
if c.CookieFile != "" {
cookieJar, err := cookiejar.NewFileJar(c.CookieFile, nil)
Expand Down
11 changes: 11 additions & 0 deletions internal/pkg/crawl/dependencies/ytdlp/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ytdlp

type Video struct {
IsLive bool `json:"is_live"`
RequestedFormats []struct {
URL string `json:"url"`
} `json:"requested_formats"`
Thumbnails []struct {
URL string `json:"url"`
} `json:"thumbnails"`
}
Loading

0 comments on commit 7c95917

Please sign in to comment.