Skip to content

Commit

Permalink
Rename: noAck->noReply, pid->messageId, replyToPid->replyToId for con…
Browse files Browse the repository at this point in the history
…sistency

Signed-off-by: Yilun <[email protected]>
  • Loading branch information
yilunzhang committed Mar 25, 2020
1 parent 34fe636 commit 069939f
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 136 deletions.
22 changes: 11 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,16 +487,16 @@ func (c *Client) handleMessage(msgType int, data []byte) error {
Data: data,
Type: int32(payload.Type),
Encrypted: payloadMsg.Encrypted,
Pid: payload.Pid,
NoAck: payload.NoAck,
MessageId: payload.MessageId,
NoReply: payload.NoReply,
}
}

if len(payload.ReplyToPid) > 0 {
pidString := string(payload.ReplyToPid)
onReply, ok := c.responseChannels.Get(pidString)
if len(payload.ReplyToId) > 0 {
msgIDString := string(payload.ReplyToId)
onReply, ok := c.responseChannels.Get(msgIDString)
if ok {
c.responseChannels.Delete(pidString)
c.responseChannels.Delete(msgIDString)
onReply.(*OnMessage).receive(msg, false)
}
return nil
Expand All @@ -506,13 +506,13 @@ func (c *Client) handleMessage(msgType int, data []byte) error {
return nil
}

if payload.NoAck {
if payload.NoReply {
msg.reply = func(data interface{}) error {
return nil
}
} else {
msg.reply = func(data interface{}) error {
payload, err := newReplyPayload(data, payload.Pid)
payload, err := newReplyPayload(data, payload.MessageId)
if err != nil {
return err
}
Expand Down Expand Up @@ -733,7 +733,7 @@ func (c *Client) Send(dests *StringArray, data interface{}, config *MessageConfi
return nil, err
}

payload, err := newMessagePayload(data, config.MessageID, config.NoAck)
payload, err := newMessagePayload(data, config.MessageID, config.NoReply)
if err != nil {
return nil, err
}
Expand All @@ -743,9 +743,9 @@ func (c *Client) Send(dests *StringArray, data interface{}, config *MessageConfi
}

var onReply *OnMessage
if !config.NoAck {
if !config.NoReply {
onReply = NewOnMessage(1, nil)
c.responseChannels.Add(string(payload.Pid), onReply, cache.DefaultExpiration)
c.responseChannels.Add(string(payload.MessageId), onReply, cache.DefaultExpiration)
}

return onReply, nil
Expand Down
4 changes: 2 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func DefaultClientConfig() *ClientConfig {

type MessageConfig struct {
Unencrypted bool
NoAck bool
NoReply bool
MaxHoldingSeconds int32
MessageID []byte

Expand All @@ -105,7 +105,7 @@ type MessageConfig struct {

var defaultMessageConfig = MessageConfig{
Unencrypted: false,
NoAck: false,
NoReply: false,
MaxHoldingSeconds: 0,
MessageID: nil,
Offset: 0,
Expand Down
56 changes: 28 additions & 28 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type Message struct {
Data []byte
Type int32
Encrypted bool
Pid []byte
NoAck bool
MessageId []byte
NoReply bool
reply func(interface{}) error
}

Expand Down Expand Up @@ -66,28 +66,28 @@ func decrypt(message []byte, nonce [nonceSize]byte, sharedKey *[sharedKeySize]by
return decrypted, nil
}

func newBinaryPayload(data, pid, replyToPid []byte, noAck bool) (*payloads.Payload, error) {
if len(pid) == 0 && len(replyToPid) == 0 {
func newBinaryPayload(data, messageId, replyToId []byte, noReply bool) (*payloads.Payload, error) {
if len(messageId) == 0 && len(replyToId) == 0 {
var err error
pid, err = RandomBytes(MessageIDSize)
messageId, err = RandomBytes(MessageIDSize)
if err != nil {
return nil, err
}
}

return &payloads.Payload{
Type: payloads.BINARY,
Pid: pid,
Data: data,
ReplyToPid: replyToPid,
NoAck: noAck,
Type: payloads.BINARY,
MessageId: messageId,
Data: data,
ReplyToId: replyToId,
NoReply: noReply,
}, nil
}

func newTextPayload(text string, pid, replyToPid []byte, noAck bool) (*payloads.Payload, error) {
if len(pid) == 0 && len(replyToPid) == 0 {
func newTextPayload(text string, messageId, replyToId []byte, noReply bool) (*payloads.Payload, error) {
if len(messageId) == 0 && len(replyToId) == 0 {
var err error
pid, err = RandomBytes(MessageIDSize)
messageId, err = RandomBytes(MessageIDSize)
if err != nil {
return nil, err
}
Expand All @@ -99,40 +99,40 @@ func newTextPayload(text string, pid, replyToPid []byte, noAck bool) (*payloads.
}

return &payloads.Payload{
Type: payloads.TEXT,
Pid: pid,
Data: data,
ReplyToPid: replyToPid,
NoAck: noAck,
Type: payloads.TEXT,
MessageId: messageId,
Data: data,
ReplyToId: replyToId,
NoReply: noReply,
}, nil
}

func newAckPayload(replyToPid []byte) (*payloads.Payload, error) {
func newAckPayload(replyToId []byte) (*payloads.Payload, error) {
return &payloads.Payload{
Type: payloads.ACK,
ReplyToPid: replyToPid,
Type: payloads.ACK,
ReplyToId: replyToId,
}, nil
}

func newMessagePayload(data interface{}, pid []byte, noAck bool) (*payloads.Payload, error) {
func newMessagePayload(data interface{}, messageId []byte, noReply bool) (*payloads.Payload, error) {
switch v := data.(type) {
case []byte:
return newBinaryPayload(v, pid, nil, noAck)
return newBinaryPayload(v, messageId, nil, noReply)
case string:
return newTextPayload(v, pid, nil, noAck)
return newTextPayload(v, messageId, nil, noReply)
default:
return nil, ErrInvalidPayloadType
}
}

func newReplyPayload(data interface{}, replyToPid []byte) (*payloads.Payload, error) {
func newReplyPayload(data interface{}, replyToId []byte) (*payloads.Payload, error) {
switch v := data.(type) {
case []byte:
return newBinaryPayload(v, nil, replyToPid, false)
return newBinaryPayload(v, nil, replyToId, false)
case string:
return newTextPayload(v, nil, replyToPid, false)
return newTextPayload(v, nil, replyToId, false)
case nil:
return newAckPayload(replyToPid)
return newAckPayload(replyToId)
default:
return nil, ErrInvalidPayloadType
}
Expand Down
28 changes: 14 additions & 14 deletions multiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,28 +151,28 @@ func NewMultiClient(account *Account, baseIdentifier string, numSubClients int,
if !msg.Encrypted {
continue
}
err := m.handleSessionMsg(addIdentifier("", i-offset), msg.Src, msg.Pid, msg.Data)
err := m.handleSessionMsg(addIdentifier("", i-offset), msg.Src, msg.MessageId, msg.Data)
if err != nil {
if err != ncp.ErrSessionClosed && err != errAddrNotAllowed {
log.Println(err)
}
continue
}
} else {
cacheKey := string(msg.Pid)
cacheKey := string(msg.MessageId)
if _, ok := msgCache.Get(cacheKey); ok {
continue
}
msgCache.Set(cacheKey, struct{}{}, cache.DefaultExpiration)

msg.Src, _ = removeIdentifier(msg.Src)
if msg.NoAck {
if msg.NoReply {
msg.reply = func(data interface{}) error {
return nil
}
} else {
msg.reply = func(data interface{}) error {
payload, err := newReplyPayload(data, msg.Pid)
payload, err := newReplyPayload(data, msg.MessageId)
if err != nil {
return err
}
Expand Down Expand Up @@ -202,7 +202,7 @@ func (m *MultiClient) SendWithClient(clientID int, dests *StringArray, data inte
return nil, err
}

payload, err := newMessagePayload(data, config.MessageID, config.NoAck)
payload, err := newMessagePayload(data, config.MessageID, config.NoReply)
if err != nil {
return nil, err
}
Expand All @@ -212,9 +212,9 @@ func (m *MultiClient) SendWithClient(clientID int, dests *StringArray, data inte
}

var onReply *OnMessage
if !config.NoAck {
if !config.NoReply {
onReply = NewOnMessage(1, nil)
client.responseChannels.Add(string(payload.Pid), onReply, cache.DefaultExpiration)
client.responseChannels.Add(string(payload.MessageId), onReply, cache.DefaultExpiration)
}

return onReply, nil
Expand Down Expand Up @@ -252,7 +252,7 @@ func (m *MultiClient) Send(dests *StringArray, data interface{}, config *Message
return nil, err
}

payload, err := newMessagePayload(data, config.MessageID, config.NoAck)
payload, err := newMessagePayload(data, config.MessageID, config.NoReply)
if err != nil {
return nil, err
}
Expand All @@ -264,13 +264,13 @@ func (m *MultiClient) Send(dests *StringArray, data interface{}, config *Message
success := make(chan struct{}, 0)
fail := make(chan struct{}, 0)

if !config.NoAck {
if !config.NoReply {
onReply = NewOnMessage(1, nil)
onRawReply = NewOnMessage(1, nil)

// response channel is added first to prevent some client fail to handle response if send finish before receive response
for _, client := range m.Clients {
client.responseChannels.Add(string(payload.Pid), onRawReply, cache.DefaultExpiration)
client.responseChannels.Add(string(payload.MessageId), onRawReply, cache.DefaultExpiration)
}
}

Expand All @@ -296,7 +296,7 @@ func (m *MultiClient) Send(dests *StringArray, data interface{}, config *Message
}
}

if !config.NoAck {
if !config.NoReply {
msg := <-onRawReply.C
msg.Src, _ = removeIdentifier(msg.Src)
onReply.receive(msg, false)
Expand Down Expand Up @@ -379,9 +379,9 @@ func (m *MultiClient) newSession(remoteAddr string, sessionID []byte, config *nc
sort.Strings(clientIDs)
return ncp.NewSession(m.addr, NewClientAddr(remoteAddr), clientIDs, nil, (func(localClientID, remoteClientID string, buf []byte, writeTimeout time.Duration) error {
payload := &payloads.Payload{
Type: payloads.SESSION,
Pid: sessionID,
Data: buf,
Type: payloads.SESSION,
MessageId: sessionID,
Data: buf,
}
client := clients[localClientID]
if writeTimeout > 0 {
Expand Down
Loading

0 comments on commit 069939f

Please sign in to comment.