diff --git a/event_test.go b/event_test.go index d0a20d0..decdaa4 100644 --- a/event_test.go +++ b/event_test.go @@ -3,6 +3,7 @@ package event_emitter import ( "context" "fmt" + "github.com/lxzan/event_emitter/internal/helper" "github.com/stretchr/testify/assert" "sync" "testing" @@ -102,6 +103,48 @@ func TestEventEmitter_Publish(t *testing.T) { assert.Equal(t, mapping[topic], i+1) } }) + + t.Run("batch3", func(t *testing.T) { + var em = New(&Config{BucketNum: 1}) + var count = 1000 + var mapping1 = make(map[string]int) + var mapping2 = make(map[string]int) + var mu = &sync.Mutex{} + var subjects = make(map[string]uint8) + var wg = &sync.WaitGroup{} + + for i := 0; i < count; i++ { + var topics []string + for j := 0; j < 100; j++ { + topic := fmt.Sprintf("topic-%d", helper.Numeric.Intn(count)) + topics = append(topics, topic) + } + + topics = helper.Uniq(topics) + wg.Add(len(topics)) + for j, _ := range topics { + var topic = topics[j] + mapping1[topic]++ + subjects[topic] = 1 + em.Subscribe(int64(i), topic, func(msg any) { + mu.Lock() + mapping2[topic]++ + mu.Unlock() + wg.Done() + }) + } + } + + for k, _ := range subjects { + var err = em.Publish(context.Background(), k, "hello") + assert.NoError(t, err) + } + + wg.Wait() + for k, _ := range subjects { + assert.Equal(t, mapping1[k], mapping2[k]) + } + }) } func TestEventEmitter_UnSubscribe(t *testing.T) { diff --git a/internal/helper/helper.go b/internal/helper/helper.go new file mode 100644 index 0000000..e2b7cfc --- /dev/null +++ b/internal/helper/helper.go @@ -0,0 +1,13 @@ +package helper + +func Uniq[T comparable](arr []T) []T { + var m = make(map[T]struct{}, len(arr)) + var list = make([]T, 0, len(arr)) + for _, item := range arr { + m[item] = struct{}{} + } + for k, _ := range m { + list = append(list, k) + } + return list +} diff --git a/internal/helper/helper_test.go b/internal/helper/helper_test.go new file mode 100644 index 0000000..91e2f28 --- /dev/null +++ b/internal/helper/helper_test.go @@ -0,0 +1,18 @@ +package helper + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestRandomString(t *testing.T) { + assert.Less(t, Numeric.Intn(10), 10) + assert.Equal(t, len(AlphabetNumeric.Generate(16)), 16) + Numeric.Uint32() + Numeric.Uint64() +} + +func TestUniq(t *testing.T) { + assert.ElementsMatch(t, Uniq([]int{1, 3, 5, 7, 7, 9}), []int{1, 3, 5, 7, 9}) + assert.ElementsMatch(t, Uniq([]string{"ming", "ming", "shi"}), []string{"ming", "shi"}) +} diff --git a/internal/helper/random.go b/internal/helper/random.go new file mode 100644 index 0000000..46659cf --- /dev/null +++ b/internal/helper/random.go @@ -0,0 +1,60 @@ +package helper + +import ( + "math/rand" + "sync" + "time" +) + +type RandomString struct { + mu sync.Mutex + r *rand.Rand + layout string +} + +var ( + AlphabetNumeric = &RandomString{ + layout: "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ", + r: rand.New(rand.NewSource(time.Now().UnixNano())), + mu: sync.Mutex{}, + } + + Numeric = &RandomString{ + layout: "0123456789", + r: rand.New(rand.NewSource(time.Now().UnixNano())), + mu: sync.Mutex{}, + } +) + +func (c *RandomString) Generate(n int) []byte { + c.mu.Lock() + var b = make([]byte, n, n) + var length = len(c.layout) + for i := 0; i < n; i++ { + var idx = c.r.Intn(length) + b[i] = c.layout[idx] + } + c.mu.Unlock() + return b +} + +func (c *RandomString) Intn(n int) int { + c.mu.Lock() + x := c.r.Intn(n) + c.mu.Unlock() + return x +} + +func (c *RandomString) Uint32() uint32 { + c.mu.Lock() + x := c.r.Uint32() + c.mu.Unlock() + return x +} + +func (c *RandomString) Uint64() uint64 { + c.mu.Lock() + x := c.r.Uint64() + c.mu.Unlock() + return x +}