Skip to content

Commit

Permalink
Implement NoAck message logic for both sender and receiver
Browse files Browse the repository at this point in the history
Signed-off-by: Yilun <[email protected]>
  • Loading branch information
yilunzhang committed Mar 23, 2020
1 parent 5252579 commit 34fe636
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 136 deletions.
109 changes: 21 additions & 88 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ func (c *Client) handleMessage(msgType int, data []byte) error {
Type: int32(payload.Type),
Encrypted: payloadMsg.Encrypted,
Pid: payload.Pid,
NoAck: payload.NoAck,
}
}

Expand All @@ -505,27 +506,21 @@ func (c *Client) handleMessage(msgType int, data []byte) error {
return nil
}

msg.reply = func(data interface{}) error {
pid := payload.Pid
var payload *payloads.Payload
var err error
switch v := data.(type) {
case []byte:
payload, err = newBinaryPayload(v, nil, pid, false)
case string:
payload, err = newTextPayload(v, nil, pid, false)
case nil:
payload, err = newAckPayload(pid)
default:
err = ErrInvalidPayloadType
if payload.NoAck {
msg.reply = func(data interface{}) error {
return nil
}
if err != nil {
return err
}
if err := c.send([]string{inboundMsg.Src}, payload, payloadMsg.Encrypted, 0); err != nil {
return err
} else {
msg.reply = func(data interface{}) error {
payload, err := newReplyPayload(data, payload.Pid)
if err != nil {
return err
}
if err := c.send([]string{inboundMsg.Src}, payload, payloadMsg.Encrypted, 0); err != nil {
return err
}
return nil
}
return nil
}

c.RLock()
Expand Down Expand Up @@ -732,69 +727,13 @@ func (c *Client) sendReceipt(prevSignature []byte) error {
return err
}

func newBinaryPayload(data, pid, replyToPid []byte, noAck bool) (*payloads.Payload, error) {
if len(pid) == 0 && len(replyToPid) == 0 {
var err error
pid, err = RandomBytes(MessageIDSize)
if err != nil {
return nil, err
}
}

return &payloads.Payload{
Type: payloads.BINARY,
Pid: pid,
Data: data,
ReplyToPid: replyToPid,
NoAck: noAck,
}, nil
}

func newTextPayload(text string, pid, replyToPid []byte, noAck bool) (*payloads.Payload, error) {
if len(pid) == 0 && len(replyToPid) == 0 {
var err error
pid, err = RandomBytes(MessageIDSize)
if err != nil {
return nil, err
}
}

data, err := proto.Marshal(&payloads.TextData{Text: text})
if err != nil {
return nil, err
}

return &payloads.Payload{
Type: payloads.TEXT,
Pid: pid,
Data: data,
ReplyToPid: replyToPid,
NoAck: noAck,
}, nil
}

func newAckPayload(replyToPid []byte) (*payloads.Payload, error) {
return &payloads.Payload{
Type: payloads.ACK,
ReplyToPid: replyToPid,
}, nil
}

func (c *Client) Send(dests *StringArray, data interface{}, config *MessageConfig) (*OnMessage, error) {
config, err := MergeMessageConfig(c.config.MessageConfig, config)
if err != nil {
return nil, err
}

var payload *payloads.Payload
switch v := data.(type) {
case []byte:
payload, err = newBinaryPayload(v, config.MessageID, nil, config.NoAck)
case string:
payload, err = newTextPayload(v, config.MessageID, nil, config.NoAck)
default:
err = ErrInvalidPayloadType
}
payload, err := newMessagePayload(data, config.MessageID, config.NoAck)
if err != nil {
return nil, err
}
Expand All @@ -803,9 +742,11 @@ func (c *Client) Send(dests *StringArray, data interface{}, config *MessageConfi
return nil, err
}

pidString := string(payload.Pid)
onReply := NewOnMessage(1, nil)
c.responseChannels.Add(pidString, onReply, cache.DefaultExpiration)
var onReply *OnMessage
if !config.NoAck {
onReply = NewOnMessage(1, nil)
c.responseChannels.Add(string(payload.Pid), onReply, cache.DefaultExpiration)
}

return onReply, nil
}
Expand Down Expand Up @@ -998,15 +939,7 @@ func publish(c clientInterface, topic string, data interface{}, config *MessageC
return err
}

var payload *payloads.Payload
switch v := data.(type) {
case []byte:
payload, err = newBinaryPayload(v, config.MessageID, nil, true)
case string:
payload, err = newTextPayload(v, config.MessageID, nil, true)
default:
err = ErrInvalidPayloadType
}
payload, err := newMessagePayload(data, config.MessageID, true)
if err != nil {
return err
}
Expand Down
74 changes: 74 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/rand"
"errors"

"github.com/gogo/protobuf/proto"
"github.com/nknorg/nkn-sdk-go/payloads"
"golang.org/x/crypto/nacl/box"
)
Expand All @@ -27,6 +28,7 @@ type Message struct {
Type int32
Encrypted bool
Pid []byte
NoAck bool
reply func(interface{}) error
}

Expand Down Expand Up @@ -63,3 +65,75 @@ func decrypt(message []byte, nonce [nonceSize]byte, sharedKey *[sharedKeySize]by

return decrypted, nil
}

func newBinaryPayload(data, pid, replyToPid []byte, noAck bool) (*payloads.Payload, error) {
if len(pid) == 0 && len(replyToPid) == 0 {
var err error
pid, err = RandomBytes(MessageIDSize)
if err != nil {
return nil, err
}
}

return &payloads.Payload{
Type: payloads.BINARY,
Pid: pid,
Data: data,
ReplyToPid: replyToPid,
NoAck: noAck,
}, nil
}

func newTextPayload(text string, pid, replyToPid []byte, noAck bool) (*payloads.Payload, error) {
if len(pid) == 0 && len(replyToPid) == 0 {
var err error
pid, err = RandomBytes(MessageIDSize)
if err != nil {
return nil, err
}
}

data, err := proto.Marshal(&payloads.TextData{Text: text})
if err != nil {
return nil, err
}

return &payloads.Payload{
Type: payloads.TEXT,
Pid: pid,
Data: data,
ReplyToPid: replyToPid,
NoAck: noAck,
}, nil
}

func newAckPayload(replyToPid []byte) (*payloads.Payload, error) {
return &payloads.Payload{
Type: payloads.ACK,
ReplyToPid: replyToPid,
}, nil
}

func newMessagePayload(data interface{}, pid []byte, noAck bool) (*payloads.Payload, error) {
switch v := data.(type) {
case []byte:
return newBinaryPayload(v, pid, nil, noAck)
case string:
return newTextPayload(v, pid, nil, noAck)
default:
return nil, ErrInvalidPayloadType
}
}

func newReplyPayload(data interface{}, replyToPid []byte) (*payloads.Payload, error) {
switch v := data.(type) {
case []byte:
return newBinaryPayload(v, nil, replyToPid, false)
case string:
return newTextPayload(v, nil, replyToPid, false)
case nil:
return newAckPayload(replyToPid)
default:
return nil, ErrInvalidPayloadType
}
}
83 changes: 35 additions & 48 deletions multiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,27 +166,21 @@ func NewMultiClient(account *Account, baseIdentifier string, numSubClients int,
msgCache.Set(cacheKey, struct{}{}, cache.DefaultExpiration)

msg.Src, _ = removeIdentifier(msg.Src)
msg.reply = func(data interface{}) error {
pid := msg.Pid
var payload *payloads.Payload
var err error
switch v := data.(type) {
case []byte:
payload, err = newBinaryPayload(v, nil, pid, false)
case string:
payload, err = newTextPayload(v, nil, pid, false)
case nil:
payload, err = newAckPayload(pid)
default:
err = ErrInvalidPayloadType
if msg.NoAck {
msg.reply = func(data interface{}) error {
return nil
}
if err != nil {
return err
} else {
msg.reply = func(data interface{}) error {
payload, err := newReplyPayload(data, msg.Pid)
if err != nil {
return err
}
if err := m.send([]string{msg.Src}, payload, msg.Encrypted, 0); err != nil {
return err
}
return nil
}
if err := m.send([]string{msg.Src}, payload, msg.Encrypted, 0); err != nil {
return err
}
return nil
}
m.OnMessage.receive(msg, true)
}
Expand All @@ -208,15 +202,7 @@ func (m *MultiClient) SendWithClient(clientID int, dests *StringArray, data inte
return nil, err
}

var payload *payloads.Payload
switch v := data.(type) {
case []byte:
payload, err = newBinaryPayload(v, config.MessageID, nil, config.NoAck)
case string:
payload, err = newTextPayload(v, config.MessageID, nil, config.NoAck)
default:
err = ErrInvalidPayloadType
}
payload, err := newMessagePayload(data, config.MessageID, config.NoAck)
if err != nil {
return nil, err
}
Expand All @@ -225,9 +211,11 @@ func (m *MultiClient) SendWithClient(clientID int, dests *StringArray, data inte
return nil, err
}

pidString := string(payload.Pid)
onReply := NewOnMessage(1, nil)
client.responseChannels.Add(pidString, onReply, cache.DefaultExpiration)
var onReply *OnMessage
if !config.NoAck {
onReply = NewOnMessage(1, nil)
client.responseChannels.Add(string(payload.Pid), onReply, cache.DefaultExpiration)
}

return onReply, nil
}
Expand Down Expand Up @@ -264,29 +252,26 @@ func (m *MultiClient) Send(dests *StringArray, data interface{}, config *Message
return nil, err
}

var payload *payloads.Payload
switch v := data.(type) {
case []byte:
payload, err = newBinaryPayload(v, config.MessageID, nil, config.NoAck)
case string:
payload, err = newTextPayload(v, config.MessageID, nil, config.NoAck)
default:
err = ErrInvalidPayloadType
}
payload, err := newMessagePayload(data, config.MessageID, config.NoAck)
if err != nil {
return nil, err
}

var lock sync.Mutex
var errMsg []string
onReply := NewOnMessage(1, nil)
onRawReply := NewOnMessage(1, nil)
var onReply, onRawReply *OnMessage

success := make(chan struct{}, 0)
fail := make(chan struct{}, 0)

// response channel is added first to prevent some client fail to handle response if send finish before receive response
for _, client := range m.Clients {
client.responseChannels.Add(string(payload.Pid), onRawReply, cache.DefaultExpiration)
if !config.NoAck {
onReply = NewOnMessage(1, nil)
onRawReply = NewOnMessage(1, nil)

// response channel is added first to prevent some client fail to handle response if send finish before receive response
for _, client := range m.Clients {
client.responseChannels.Add(string(payload.Pid), onRawReply, cache.DefaultExpiration)
}
}

go func() {
Expand All @@ -311,9 +296,11 @@ func (m *MultiClient) Send(dests *StringArray, data interface{}, config *Message
}
}

msg := <-onRawReply.C
msg.Src, _ = removeIdentifier(msg.Src)
onReply.receive(msg, false)
if !config.NoAck {
msg := <-onRawReply.C
msg.Src, _ = removeIdentifier(msg.Src)
onReply.receive(msg, false)
}
}()

select {
Expand Down

0 comments on commit 34fe636

Please sign in to comment.