From b680251252017a5eb97f81fb7c14e63eeb8ee7ab Mon Sep 17 00:00:00 2001 From: lixizan Date: Mon, 6 Nov 2023 14:45:56 +0800 Subject: [PATCH] fix --- event.go | 11 ++++--- event_test.go | 45 ++++++++++++++++++++++++- internal/helper/helper.go | 13 ++++++++ internal/helper/helper_test.go | 18 ++++++++++ internal/helper/random.go | 60 ++++++++++++++++++++++++++++++++++ types.go | 7 ++-- 6 files changed, 144 insertions(+), 10 deletions(-) create mode 100644 internal/helper/helper.go create mode 100644 internal/helper/helper_test.go create mode 100644 internal/helper/random.go diff --git a/event.go b/event.go index 2ef0ff9..e6c6917 100644 --- a/event.go +++ b/event.go @@ -85,8 +85,10 @@ func (c *EventEmitter) Publish(ctx context.Context, topic string, msg any) error defer t.Unlock() for _, v := range t.subs { - if err := t.Emit(ctx, msg, v.cb); err != nil { - return err + if cb, ok := v.topics[topic]; ok { + if err := t.Emit(ctx, msg, cb); err != nil { + return err + } } } return nil @@ -154,12 +156,11 @@ func (c *bucket) addSubscriber(subId int64, topic string, f func(msg any)) *subs if !ok { sub = &subscriberField{ subId: subId, - cb: f, - topics: make(map[string]struct{}), + topics: make(map[string]eventCallback), } c.Subscribers[subId] = sub } - sub.Add(topic) + sub.Add(topic, f) return sub } diff --git a/event_test.go b/event_test.go index aba23ae..decdaa4 100644 --- a/event_test.go +++ b/event_test.go @@ -3,6 +3,7 @@ package event_emitter import ( "context" "fmt" + "github.com/lxzan/event_emitter/internal/helper" "github.com/stretchr/testify/assert" "sync" "testing" @@ -99,7 +100,49 @@ func TestEventEmitter_Publish(t *testing.T) { wg.Wait() for i := 0; i < count; i++ { topic := fmt.Sprintf("topic%d", i) - assert.Equal(t, mapping[topic], count-i) + assert.Equal(t, mapping[topic], i+1) + } + }) + + t.Run("batch3", func(t *testing.T) { + var em = New(&Config{BucketNum: 1}) + var count = 1000 + var mapping1 = make(map[string]int) + var mapping2 = make(map[string]int) + var mu = &sync.Mutex{} + var subjects = make(map[string]uint8) + var wg = &sync.WaitGroup{} + + for i := 0; i < count; i++ { + var topics []string + for j := 0; j < 100; j++ { + topic := fmt.Sprintf("topic-%d", helper.Numeric.Intn(count)) + topics = append(topics, topic) + } + + topics = helper.Uniq(topics) + wg.Add(len(topics)) + for j, _ := range topics { + var topic = topics[j] + mapping1[topic]++ + subjects[topic] = 1 + em.Subscribe(int64(i), topic, func(msg any) { + mu.Lock() + mapping2[topic]++ + mu.Unlock() + wg.Done() + }) + } + } + + for k, _ := range subjects { + var err = em.Publish(context.Background(), k, "hello") + assert.NoError(t, err) + } + + wg.Wait() + for k, _ := range subjects { + assert.Equal(t, mapping1[k], mapping2[k]) } }) } diff --git a/internal/helper/helper.go b/internal/helper/helper.go new file mode 100644 index 0000000..e2b7cfc --- /dev/null +++ b/internal/helper/helper.go @@ -0,0 +1,13 @@ +package helper + +func Uniq[T comparable](arr []T) []T { + var m = make(map[T]struct{}, len(arr)) + var list = make([]T, 0, len(arr)) + for _, item := range arr { + m[item] = struct{}{} + } + for k, _ := range m { + list = append(list, k) + } + return list +} diff --git a/internal/helper/helper_test.go b/internal/helper/helper_test.go new file mode 100644 index 0000000..91e2f28 --- /dev/null +++ b/internal/helper/helper_test.go @@ -0,0 +1,18 @@ +package helper + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestRandomString(t *testing.T) { + assert.Less(t, Numeric.Intn(10), 10) + assert.Equal(t, len(AlphabetNumeric.Generate(16)), 16) + Numeric.Uint32() + Numeric.Uint64() +} + +func TestUniq(t *testing.T) { + assert.ElementsMatch(t, Uniq([]int{1, 3, 5, 7, 7, 9}), []int{1, 3, 5, 7, 9}) + assert.ElementsMatch(t, Uniq([]string{"ming", "ming", "shi"}), []string{"ming", "shi"}) +} diff --git a/internal/helper/random.go b/internal/helper/random.go new file mode 100644 index 0000000..46659cf --- /dev/null +++ b/internal/helper/random.go @@ -0,0 +1,60 @@ +package helper + +import ( + "math/rand" + "sync" + "time" +) + +type RandomString struct { + mu sync.Mutex + r *rand.Rand + layout string +} + +var ( + AlphabetNumeric = &RandomString{ + layout: "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ", + r: rand.New(rand.NewSource(time.Now().UnixNano())), + mu: sync.Mutex{}, + } + + Numeric = &RandomString{ + layout: "0123456789", + r: rand.New(rand.NewSource(time.Now().UnixNano())), + mu: sync.Mutex{}, + } +) + +func (c *RandomString) Generate(n int) []byte { + c.mu.Lock() + var b = make([]byte, n, n) + var length = len(c.layout) + for i := 0; i < n; i++ { + var idx = c.r.Intn(length) + b[i] = c.layout[idx] + } + c.mu.Unlock() + return b +} + +func (c *RandomString) Intn(n int) int { + c.mu.Lock() + x := c.r.Intn(n) + c.mu.Unlock() + return x +} + +func (c *RandomString) Uint32() uint32 { + c.mu.Lock() + x := c.r.Uint32() + c.mu.Unlock() + return x +} + +func (c *RandomString) Uint64() uint64 { + c.mu.Lock() + x := c.r.Uint64() + c.mu.Unlock() + return x +} diff --git a/types.go b/types.go index 3c301e4..7912cb4 100644 --- a/types.go +++ b/types.go @@ -42,13 +42,12 @@ func (c *topicField) Emit(ctx context.Context, msg any, f func(any)) error { type subscriberField struct { sync.Mutex subId int64 - cb eventCallback - topics map[string]struct{} + topics map[string]eventCallback } -func (c *subscriberField) Add(k string) { +func (c *subscriberField) Add(k string, cb eventCallback) { c.Lock() - c.topics[k] = struct{}{} + c.topics[k] = cb c.Unlock() }