Skip to content

Commit

Permalink
feat: create feature toggles if they don't exists on subscription; on…
Browse files Browse the repository at this point in the history
…ly notify clients subscribed to certain feature toggles
  • Loading branch information
flohansen committed Jun 23, 2024
1 parent ab72df1 commit 39230b7
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 66 deletions.
53 changes: 42 additions & 11 deletions internal/notification/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,74 @@ import (

type FeatureStore interface {
GetAll(ctx context.Context) ([]sqlc.Feature, error)
Upsert(ctx context.Context, feature sqlc.Feature) error
}

type FeatureNotifier struct {
proto.UnimplementedFeatureStateServiceServer
store FeatureStore
streams map[proto.FeatureStateService_SubscribeFeatureChangesServer]struct{}
mu *sync.Mutex
store FeatureStore
streams map[proto.FeatureStateService_SubscribeFeatureChangesServer]struct{}
subscriptions map[string]map[proto.FeatureStateService_SubscribeFeatureChangesServer]struct{}
mu *sync.Mutex
}

func NewFeatureNotifier(grpcServer grpc.ServiceRegistrar, store FeatureStore) *FeatureNotifier {
notifier := FeatureNotifier{
streams: make(map[proto.FeatureStateService_SubscribeFeatureChangesServer]struct{}),
store: store,
mu: &sync.Mutex{},
streams: make(map[proto.FeatureStateService_SubscribeFeatureChangesServer]struct{}),
subscriptions: make(map[string]map[proto.FeatureStateService_SubscribeFeatureChangesServer]struct{}),
store: store,
mu: &sync.Mutex{},
}
proto.RegisterFeatureStateServiceServer(grpcServer, &notifier)
return &notifier
}

func (n *FeatureNotifier) SubscribeFeatureChanges(_ *proto.FeatureSubscription, stream proto.FeatureStateService_SubscribeFeatureChangesServer) error {
func (n *FeatureNotifier) SubscribeFeatureChanges(subscription *proto.FeatureSubscription, stream proto.FeatureStateService_SubscribeFeatureChangesServer) error {
n.mu.Lock()
n.streams[stream] = struct{}{}
for _, f := range subscription.FeatureToggles {
if _, ok := n.subscriptions[f.FeatureId]; !ok {
n.subscriptions[f.FeatureId] = make(map[proto.FeatureStateService_SubscribeFeatureChangesServer]struct{})
}

n.subscriptions[f.FeatureId][stream] = struct{}{}
}
n.mu.Unlock()

features, err := n.store.GetAll(stream.Context())
if err != nil {
return err
}

featureLookup := make(map[string]sqlc.Feature)
for _, f := range features {
featureLookup[f.FeatureID] = f
}

var featuresToAdd []sqlc.Feature
for _, f := range subscription.FeatureToggles {
if _, ok := featureLookup[f.FeatureId]; !ok {
featuresToAdd = append(featuresToAdd, sqlc.Feature{
FeatureID: f.FeatureId,
Description: f.Description,
Enabled: false,
})
}
}

for _, f := range featuresToAdd {
n.store.Upsert(stream.Context(), f)
}

for _, feature := range features {
n.Notify(feature)
}

<-stream.Context().Done()

n.mu.Lock()
delete(n.streams, stream)
for _, f := range subscription.FeatureToggles {
delete(n.subscriptions[f.FeatureId], stream)
}
n.mu.Unlock()

return nil
Expand All @@ -57,8 +88,8 @@ func (n *FeatureNotifier) Notify(feature sqlc.Feature) error {
n.mu.Lock()
defer n.mu.Unlock()

for stream := range n.streams {
err := stream.Send(&proto.Feature{
for stream := range n.subscriptions[feature.FeatureID] {
err := stream.Send(&proto.FeatureToggleChange{
FeatureId: feature.FeatureID,
Enabled: feature.Enabled,
})
Expand Down
23 changes: 21 additions & 2 deletions internal/notification/feature_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,34 @@ func TestFeatureNotifier(t *testing.T) {
Return([]sqlc.Feature{}, nil).
Times(2)

store.EXPECT().
Upsert(gomock.Any(), sqlc.Feature{
FeatureID: "SOME_FEATURE_ID",
}).
Return(nil).
Times(2)

ctx := context.Background()
client1 := createClient(t, ctx, bufDialer(lis))
stream1, err := client1.SubscribeFeatureChanges(ctx, &proto.FeatureSubscription{})
stream1, err := client1.SubscribeFeatureChanges(ctx, &proto.FeatureSubscription{
FeatureToggles: []*proto.FeatureToggle{
{
FeatureId: "SOME_FEATURE_ID",
},
},
})
if err != nil {
t.Fatal(err)
}

client2 := createClient(t, ctx, bufDialer(lis))
stream2, err := client2.SubscribeFeatureChanges(ctx, &proto.FeatureSubscription{})
stream2, err := client2.SubscribeFeatureChanges(ctx, &proto.FeatureSubscription{
FeatureToggles: []*proto.FeatureToggle{
{
FeatureId: "SOME_FEATURE_ID",
},
},
})
if err != nil {
t.Fatal(err)
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/dasher/dasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,18 @@ func Connect(ctx context.Context, addr string) {
errors.Wrap(err, "create new grpc client")
}

featureToggles := make([]*proto.FeatureToggle, 0, len(registeredFeatures))
for featureID, feature := range registeredFeatures {
featureToggles = append(featureToggles, &proto.FeatureToggle{
FeatureId: featureID,
Description: feature.Description,
})
}

client := proto.NewFeatureStateServiceClient(conn)
stream, err := client.SubscribeFeatureChanges(ctx, &proto.FeatureSubscription{})
stream, err := client.SubscribeFeatureChanges(ctx, &proto.FeatureSubscription{
FeatureToggles: featureToggles,
})
if err != nil {
log.Fatal(errors.Wrap(err, "subscribe to feature changes"))
}
Expand Down
Loading

0 comments on commit 39230b7

Please sign in to comment.