Skip to content

Commit

Permalink
Merge pull request #11 from lightstep/jmacd/inline_heap
Browse files Browse the repository at this point in the history
Inline the large-weight heap to avoid interface conversions
  • Loading branch information
jmacd authored Nov 16, 2019
2 parents 358db24 + f53b1a8 commit 879f6b8
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 49 deletions.
57 changes: 57 additions & 0 deletions internal/sampleheap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2019, LightStep Inc.

package internal

type Vsample struct {
Sample interface{}
Weight float64
}

type SampleHeap []Vsample

func (sh *SampleHeap) Push(v Vsample) {
l := append(*sh, v)
n := len(l) - 1

// This copies the body of heap.up().
j := n
for {
i := (j - 1) / 2 // parent
if i == j || l[j].Weight >= l[i].Weight {
break
}
l[i], l[j] = l[j], l[i]
j = i
}

*sh = l
}

func (sh *SampleHeap) Pop() Vsample {
l := *sh
n := len(l) - 1
result := l[0]
l[0] = l[n]
l = l[:n]

// This copies the body of heap.down().
i := 0
for {
j1 := 2*i + 1
if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
break
}
j := j1 // left child
if j2 := j1 + 1; j2 < n && l[j2].Weight < l[j1].Weight {
j = j2 // = 2*i + 2 // right child
}
if l[j].Weight >= l[i].Weight {
break
}
l[i], l[j] = l[j], l[i]
i = j
}

*sh = l
return result
}
58 changes: 58 additions & 0 deletions internal/sampleheap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2019, LightStep Inc.

package internal_test

import (
"container/heap"
"math/rand"
"testing"

"github.com/lightstep/varopt/internal"
"github.com/stretchr/testify/require"
)

type simpleHeap []float64

func (s *simpleHeap) Len() int {
return len(*s)
}

func (s *simpleHeap) Swap(i, j int) {
(*s)[i], (*s)[j] = (*s)[j], (*s)[i]
}

func (s *simpleHeap) Less(i, j int) bool {
return (*s)[i] < (*s)[j]
}

func (s *simpleHeap) Push(x interface{}) {
*s = append(*s, x.(float64))
}

func (s *simpleHeap) Pop() interface{} {
old := *s
n := len(old)
x := old[n-1]
*s = old[0 : n-1]
return x
}

func TestLargeHeap(t *testing.T) {
var L internal.SampleHeap
var S simpleHeap

for i := 0; i < 1e6; i++ {
v := rand.NormFloat64()
L.Push(internal.Vsample{Weight: v})
heap.Push(&S, v)
}

for len(S) > 0 {
v1 := heap.Pop(&S).(float64)
v2 := L.Pop().Weight

require.Equal(t, v1, v2)
}

require.Equal(t, 0, len(L))
}
68 changes: 19 additions & 49 deletions varopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
package varopt

import (
"container/heap"
"fmt"
"math"
"math/rand"

"github.com/lightstep/varopt/internal"
)

// Varopt implements the algorithm from Stream sampling for
Expand All @@ -18,14 +19,14 @@ type Varopt struct {
// Random number generator
rnd *rand.Rand

// Large-weight items
L largeHeap
// Large-weight items stored in a min-heap.
L internal.SampleHeap

// Light-weight items.
T []vsample
T []internal.Vsample

// Temporary buffer.
X []vsample
X []internal.Vsample

// Current threshold
tau float64
Expand All @@ -42,13 +43,6 @@ type Varopt struct {
// passed in separately.
type Sample interface{}

type vsample struct {
sample Sample
weight float64
}

type largeHeap []vsample

var ErrInvalidWeight = fmt.Errorf("Negative, zero, or NaN weight")

// New returns a new Varopt sampler with given capacity (i.e.,
Expand All @@ -64,9 +58,9 @@ func New(capacity int, rnd *rand.Rand) *Varopt {
//
// An error will be returned if the weight is either negative or NaN.
func (s *Varopt) Add(sample Sample, weight float64) error {
individual := vsample{
sample: sample,
weight: weight,
individual := internal.Vsample{
Sample: sample,
Weight: weight,
}

if weight <= 0 || math.IsNaN(weight) {
Expand All @@ -77,7 +71,7 @@ func (s *Varopt) Add(sample Sample, weight float64) error {
s.totalWeight += weight

if s.Size() < s.capacity {
heap.Push(&s.L, individual)
s.L.Push(individual)
return nil
}

Expand All @@ -87,24 +81,24 @@ func (s *Varopt) Add(sample Sample, weight float64) error {
W := s.tau * float64(len(s.T))

if weight > s.tau {
heap.Push(&s.L, individual)
s.L.Push(individual)
} else {
s.X = append(s.X, individual)
W += weight
}

for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].weight {
h := heap.Pop(&s.L).(vsample)
for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].Weight {
h := s.L.Pop()
s.X = append(s.X, h)
W += h.weight
W += h.Weight
}

s.tau = W / float64(len(s.T)+len(s.X)-1)
r := s.uniform()
d := 0

for d < len(s.X) && r >= 0 {
wxd := s.X[d].weight
wxd := s.X[d].Weight
r -= (1 - wxd/s.tau)
d++
}
Expand Down Expand Up @@ -137,21 +131,21 @@ func (s *Varopt) uniform() float64 {
// GetOriginalWeight(i).
func (s *Varopt) Get(i int) (Sample, float64) {
if i < len(s.L) {
return s.L[i].sample, s.L[i].weight
return s.L[i].Sample, s.L[i].Weight
}

return s.T[i-len(s.L)].sample, s.tau
return s.T[i-len(s.L)].Sample, s.tau
}

// GetOriginalWeight returns the original input weight of the sample
// item that was passed to Add(). This can be useful for computing a
// frequency from the adjusted sample weight.
func (s *Varopt) GetOriginalWeight(i int) float64 {
if i < len(s.L) {
return s.L[i].weight
return s.L[i].Weight
}

return s.T[i-len(s.L)].weight
return s.T[i-len(s.L)].Weight
}

// Capacity returns the size of the reservoir. This is the maximum
Expand Down Expand Up @@ -182,27 +176,3 @@ func (s *Varopt) TotalCount() int {
func (s *Varopt) Tau() float64 {
return s.tau
}

func (b largeHeap) Len() int {
return len(b)
}

func (b largeHeap) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}

func (b largeHeap) Less(i, j int) bool {
return b[i].weight < b[j].weight
}

func (b *largeHeap) Push(x interface{}) {
*b = append(*b, x.(vsample))
}

func (b *largeHeap) Pop() interface{} {
old := *b
n := len(old)
x := old[n-1]
*b = old[0 : n-1]
return x
}

0 comments on commit 879f6b8

Please sign in to comment.