From 2774b22eab76fbed66e04f2ab65fdb9064d377c7 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 13 Jul 2024 13:00:14 -0400 Subject: [PATCH 1/2] chore: align structures to save space --- internal/pkg/queue/metadata.go | 4 ++-- internal/pkg/queue/queue.go | 16 ++++++++-------- internal/pkg/queue/stats.go | 14 +++++++------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/internal/pkg/queue/metadata.go b/internal/pkg/queue/metadata.go index a18abb55..43bd7b81 100644 --- a/internal/pkg/queue/metadata.go +++ b/internal/pkg/queue/metadata.go @@ -16,9 +16,9 @@ func (q *PersistentGroupedQueue) loadMetadata() error { var metadata struct { HostIndex map[string][]uint64 + Stats QueueStats HostOrder []string CurrentHost int - Stats QueueStats } err = q.metadataDecoder.Decode(&metadata) @@ -62,9 +62,9 @@ func (q *PersistentGroupedQueue) saveMetadata() error { metadata := struct { HostIndex map[string][]uint64 + Stats QueueStats HostOrder []string CurrentHost int - Stats QueueStats }{ HostIndex: q.hostIndex, HostOrder: q.hostOrder, diff --git a/internal/pkg/queue/queue.go b/internal/pkg/queue/queue.go index 0b668b32..81501cc7 100644 --- a/internal/pkg/queue/queue.go +++ b/internal/pkg/queue/queue.go @@ -39,27 +39,27 @@ type PersistentGroupedQueue struct { metadataEncoder *gob.Encoder metadataDecoder *gob.Decoder hostIndex map[string][]uint64 + cond *sync.Cond + stats QueueStats hostOrder []string currentHost int mutex sync.RWMutex - hostMutex sync.Mutex statsMutex sync.RWMutex - stats QueueStats - cond *sync.Cond + hostMutex sync.Mutex closed bool } type Item struct { + URL *url.URL + ParentItem *Item ID string - Hash uint64 - Hop uint8 Host string Type string + BypassSeencheck string + Hash uint64 Redirect int - URL *url.URL - ParentItem *Item LocallyCrawled uint64 - BypassSeencheck string + Hop uint8 } func init() { diff --git a/internal/pkg/queue/stats.go b/internal/pkg/queue/stats.go index 7a298072..06a4f57b 100644 --- a/internal/pkg/queue/stats.go +++ b/internal/pkg/queue/stats.go @@ -6,19 +6,19 @@ import ( ) type QueueStats struct { - TotalElements int `json:"total_elements"` - UniqueHosts int `json:"unique_hosts"` - ElementsPerHost map[string]int `json:"elements_per_host"` - EnqueueCount int `json:"enqueue_count"` - DequeueCount int `json:"dequeue_count"` FirstEnqueueTime time.Time `json:"first_enqueue_time"` LastEnqueueTime time.Time `json:"last_enqueue_time"` FirstDequeueTime time.Time `json:"first_dequeue_time"` LastDequeueTime time.Time `json:"last_dequeue_time"` + ElementsPerHost map[string]int `json:"elements_per_host"` + HostDistribution map[string]float64 `json:"host_distribution"` + TopHosts []HostStat `json:"top_hosts"` + TotalElements int `json:"total_elements"` + UniqueHosts int `json:"unique_hosts"` + EnqueueCount int `json:"enqueue_count"` + DequeueCount int `json:"dequeue_count"` AverageTimeBetweenEnqueues time.Duration `json:"average_time_between_enqueues"` AverageTimeBetweenDequeues time.Duration `json:"average_time_between_dequeues"` - TopHosts []HostStat `json:"top_hosts"` - HostDistribution map[string]float64 `json:"host_distribution"` AverageElementsPerHost float64 `json:"average_elements_per_host"` } From 53e4a54b36b3fc6c65e6e8aa7c5c0154ca8688f4 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 13 Jul 2024 13:30:45 -0400 Subject: [PATCH 2/2] feat: remove the dequeue recusion --- internal/pkg/queue/dequeue.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/pkg/queue/dequeue.go b/internal/pkg/queue/dequeue.go index 8c331c27..1634fe34 100644 --- a/internal/pkg/queue/dequeue.go +++ b/internal/pkg/queue/dequeue.go @@ -7,7 +7,7 @@ import ( // Dequeue removes and returns the next item from the queue // It blocks until an item is available -func (q *PersistentGroupedQueue) Dequeue() (*Item, error) { +func (q *PersistentGroupedQueue) Dequeue() (item *Item, err error) { if q.closed { return nil, ErrQueueClosed } @@ -43,11 +43,10 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) { q.hostIndex[host] = positions[1:] // Seek to position and decode item - _, err := q.queueFile.Seek(int64(position), io.SeekStart) + _, err = q.queueFile.Seek(int64(position), io.SeekStart) if err != nil { return nil, fmt.Errorf("failed to seek to item position: %w", err) } - var item Item err = q.queueDecoder.Decode(&item) if err != nil { return nil, fmt.Errorf("failed to decode item: %w", err) @@ -64,9 +63,9 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) { return nil, fmt.Errorf("failed to save metadata: %w", err) } - return &item, nil + break } // If we've checked all hosts and found no items, loop back to wait again - return q.Dequeue() + return }