Skip to content

Commit

Permalink
软连接到cwgo-pkg (#2)
Browse files Browse the repository at this point in the history
* consul package wrap

* wrap almost done

* fix test error

* rename import packages

* constant link to cwgo

* remove unused function

* replace utils => common

* softly connect to cwgo-pkg

---------

Co-authored-by: jichen <[email protected]>
  • Loading branch information
smx-Morgan and JiChenSSG authored Oct 19, 2024
1 parent 904b080 commit e7cfa19
Show file tree
Hide file tree
Showing 16 changed files with 162 additions and 1,009 deletions.
87 changes: 3 additions & 84 deletions client/circult_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,96 +15,15 @@
package client

import (
"strings"

"github.com/kitex-contrib/config-consul/consul"
"github.com/kitex-contrib/config-consul/utils"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"

configclient "github.com/cloudwego-contrib/cwgo-pkg/config/consul/client"
)

// WithCircuitBreaker sets the circuit breaker policy from consul configuration center.
func WithCircuitBreaker(dest, src string, consulClient consul.Client, uniqueID int64, opts utils.Options) []client.Option {
param, err := consulClient.ClientConfigParam(&consul.ConfigParamConfig{
Category: circuitBreakerConfigName,
ServerServiceName: dest,
ClientServiceName: src,
})
if err != nil {
panic(err)
}

for _, f := range opts.ConsulCustomFunctions {
f(&param)
}
key := param.Prefix + "/" + param.Path
cbSuite := initCircuitBreaker(param.Type, key, dest, src, consulClient, uniqueID)

return []client.Option{
client.WithCircuitBreaker(cbSuite),
client.WithCloseCallbacks(func() error {
// cancel the configuration listener when client is closed.
consulClient.DeregisterConfig(key, uniqueID)
err = cbSuite.Close()
if err != nil {
return err
}
return nil
}),
}
}

// keep consistent when initialising the circuit breaker suit and updating
// the circuit breaker policy.
func genServiceCBKeyWithRPCInfo(rpcInfo rpcinfo.RPCInfo) string {
if rpcInfo == nil {
return ""
}
return genServiceCBKey(rpcInfo.To().ServiceName(), rpcInfo.To().Method())
}

func genServiceCBKey(toService, method string) string {
sum := len(toService) + len(method) + 2
var buf strings.Builder
buf.Grow(sum)
buf.WriteString(toService)
buf.WriteByte('/')
buf.WriteString(method)
return buf.String()
}

func initCircuitBreaker(configType consul.ConfigType, key, dest, src string,
consulClient consul.Client, uniqueID int64,
) *circuitbreak.CBSuite {
cb := circuitbreak.NewCBSuite(genServiceCBKeyWithRPCInfo)
lcb := utils.ThreadSafeSet{}

onChangeCallback := func(data string, parser consul.ConfigParser) {
set := utils.Set{}
configs := map[string]circuitbreak.CBConfig{}
err := parser.Decode(configType, data, &configs)
if err != nil {
klog.Warnf("[consul] %s client consul circuit breaker: unmarshal data %s failed: %s, skip...", key, data, err)
return
}

for method, config := range configs {
set[method] = true
key := genServiceCBKey(dest, method)
cb.UpdateServiceCBConfig(key, config)
}

for _, method := range lcb.DiffAndEmplace(set) {
key := genServiceCBKey(dest, method)
// For deleted method configs, set to default policy
cb.UpdateServiceCBConfig(key, circuitbreak.GetDefaultCBConfig())
}
}

consulClient.RegisterConfigCallback(key, uniqueID, onChangeCallback)

return cb
return configclient.WithCircuitBreaker(dest, src, consulClient, uniqueID, opts)
}
41 changes: 3 additions & 38 deletions client/degradation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,12 @@ package client

import (
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/kitex-contrib/config-consul/consul"
"github.com/kitex-contrib/config-consul/pkg/degradation"
"github.com/kitex-contrib/config-consul/utils"

configclient "github.com/cloudwego-contrib/cwgo-pkg/config/consul/client"
)

func WithDegradation(dest, src string, consulClient consul.Client, uniqueID int64, opts utils.Options) []client.Option {
param, err := consulClient.ClientConfigParam(&consul.ConfigParamConfig{
Category: degradationConfigName,
ServerServiceName: dest,
ClientServiceName: src,
})
if err != nil {
panic(err)
}
for _, f := range opts.ConsulCustomFunctions {
f(&param)
}
key := param.Prefix + "/" + param.Path
container := initDegradationOptions(param.Type, key, dest, uniqueID, consulClient)
return []client.Option{
client.WithACLRules(container.GetAclRule()),
client.WithCloseCallbacks(func() error {
// cancel the configuration listener when client is closed.
consulClient.DeregisterConfig(key, uniqueID)
return nil
}),
}
}

func initDegradationOptions(configType consul.ConfigType, key, dest string, uniqueID int64, consulClient consul.Client) *degradation.DegradationContainer {
container := degradation.NewDegradationContainer()
onChangeCallback := func(data string, parser consul.ConfigParser) {
config := &degradation.DegradationConfig{}
err := parser.Decode(configType, data, config)
if err != nil {
klog.Warnf("[consul] %s server consul degradation config: unmarshal data %s failed: %s, skip...", key, data, err)
return
}
container.NotifyPolicyChange(config)
}
consulClient.RegisterConfigCallback(key, uniqueID, onChangeCallback)
return container
return configclient.WithDegradation(dest, src, consulClient, uniqueID, opts)
}
64 changes: 3 additions & 61 deletions client/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,69 +19,11 @@ import (
"github.com/kitex-contrib/config-consul/utils"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/retry"

configclient "github.com/cloudwego-contrib/cwgo-pkg/config/consul/client"
)

// WithRetryPolicy sets the retry policy from consul configuration center.
func WithRetryPolicy(dest, src string, consulClient consul.Client, uniqueID int64, opts utils.Options) []client.Option {
param, err := consulClient.ClientConfigParam(&consul.ConfigParamConfig{
Category: retryConfigName,
ServerServiceName: dest,
ClientServiceName: src,
})
if err != nil {
panic(err)
}

for _, f := range opts.ConsulCustomFunctions {
f(&param)
}
key := param.Prefix + "/" + param.Path
rc := initRetryContainer(param.Type, key, dest, consulClient, uniqueID)
return []client.Option{
client.WithRetryContainer(rc),
client.WithCloseCallbacks(func() error {
// cancel the configuration listener when client is closed.
consulClient.DeregisterConfig(key, uniqueID)
return nil
}),
client.WithCloseCallbacks(rc.Close),
}
}

func initRetryContainer(configType consul.ConfigType, key, dest string,
consulClient consul.Client, uniqueID int64,
) *retry.Container {
retryContainer := retry.NewRetryContainerWithPercentageLimit()

ts := utils.ThreadSafeSet{}

onChangeCallback := func(data string, parser consul.ConfigParser) {
// the key is method name, wildcard "*" can match anything.
rcs := map[string]*retry.Policy{}
err := parser.Decode(configType, data, &rcs)
if err != nil {
klog.Warnf("[consul] %s client consul retry: unmarshal data %s failed: %s, skip...", key, data, err)
return
}
set := utils.Set{}
for method, policy := range rcs {
set[method] = true
if policy.Enable && policy.BackupPolicy == nil && policy.FailurePolicy == nil {
klog.Warnf("[consul] %s client policy for method %s BackupPolicy and FailurePolicy must not be empty at same time",
dest, method)
continue
}
retryContainer.NotifyPolicyChange(method, *policy)
}

for _, method := range ts.DiffAndEmplace(set) {
retryContainer.DeletePolicy(method)
}
}

consulClient.RegisterConfigCallback(key, uniqueID, onChangeCallback)

return retryContainer
return configclient.WithRetryPolicy(dest, src, consulClient, uniqueID, opts)
}
48 changes: 3 additions & 45 deletions client/rpc_timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,11 @@ import (
"github.com/kitex-contrib/config-consul/utils"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/rpctimeout"

configclient "github.com/cloudwego-contrib/cwgo-pkg/config/consul/client"
)

// WithRPCTimeout sets the RPC timeout policy from consul configuration center.
func WithRPCTimeout(dest, src string, consulClient consul.Client, uniqueID int64, opts utils.Options) []client.Option {
param, err := consulClient.ClientConfigParam(&consul.ConfigParamConfig{
Category: rpcTimeoutConfigName,
ServerServiceName: dest,
ClientServiceName: src,
})
if err != nil {
panic(err)
}

for _, f := range opts.ConsulCustomFunctions {
f(&param)
}
key := param.Prefix + "/" + param.Path
return []client.Option{
client.WithTimeoutProvider(initRPCTimeoutContainer(param.Type, key, dest, consulClient, uniqueID)),
client.WithCloseCallbacks(func() error {
// cancel the configuration listener when client is closed.
consulClient.DeregisterConfig(key, uniqueID)
return nil
}),
}
}

func initRPCTimeoutContainer(configType consul.ConfigType, key, dest string,
consulClient consul.Client, uniqueID int64,
) rpcinfo.TimeoutProvider {
rpcTimeoutContainer := rpctimeout.NewContainer()

onChangeCallback := func(data string, parser consul.ConfigParser) {
configs := map[string]*rpctimeout.RPCTimeout{}
err := parser.Decode(configType, data, &configs)
if err != nil {
klog.Warnf("[consul] %s client consul rpc timeout: unmarshal data %s failed: %s, skip...", key, data, err)
return
}

rpcTimeoutContainer.NotifyPolicyChange(configs)
}

consulClient.RegisterConfigCallback(key, uniqueID, onChangeCallback)

return rpcTimeoutContainer
return configclient.WithRPCTimeout(dest, src, consulClient, uniqueID, opts)
}
44 changes: 4 additions & 40 deletions client/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,12 @@ import (
"github.com/kitex-contrib/config-consul/consul"
"github.com/kitex-contrib/config-consul/utils"

"github.com/cloudwego/kitex/client"
configclient "github.com/cloudwego-contrib/cwgo-pkg/config/consul/client"
)

const (
retryConfigName = "retry"
rpcTimeoutConfigName = "rpc_timeout"
circuitBreakerConfigName = "circuit_break"
degradationConfigName = "degradation"
)

type ConsulClientSuite struct {
uid int64
consulClient consul.Client
service string
client string
opts utils.Options
}
type ConsulClientSuite = configclient.ConsulClientSuite

// NewSuite service is the destination service name and client is the local identity.
func NewSuite(service, client string, cli consul.Client,
opts ...utils.Option,
) *ConsulClientSuite {
uid := consul.AllocateUniqueID()
su := &ConsulClientSuite{
uid: uid,
service: service,
client: client,
consulClient: cli,
}
for _, opt := range opts {
opt.Apply(&su.opts)
}

return su
}

// Options return a list client.Option
func (s *ConsulClientSuite) Options() []client.Option {
opts := make([]client.Option, 0, 7)
opts = append(opts, WithCircuitBreaker(s.service, s.client, s.consulClient, s.uid, s.opts)...)
opts = append(opts, WithRetryPolicy(s.service, s.client, s.consulClient, s.uid, s.opts)...)
opts = append(opts, WithRPCTimeout(s.service, s.client, s.consulClient, s.uid, s.opts)...)
opts = append(opts, WithDegradation(s.service, s.client, s.consulClient, s.uid, s.opts)...)
return opts
func NewSuite(service, client string, cli consul.Client, opts ...utils.Option) *ConsulClientSuite {
return configclient.NewSuite(service, client, cli, opts...)
}
Loading

0 comments on commit e7cfa19

Please sign in to comment.