Skip to content

Commit

Permalink
Rewrite RPC module and add RPC functions to clients
Browse files Browse the repository at this point in the history
1. Client and Multiclient now have all RPC functions available,
including the ones that needs to send transactions. Clients will
try to use connected nodes as RPC address first, and fallback to
seed node if failed.

2. Transaction related function now accepts transaction config as
last param, which includes fee, nonce and attributes.

Signed-off-by: Yilun <[email protected]>
  • Loading branch information
yilunzhang committed Apr 20, 2020
1 parent 59be53a commit 13d76a0
Show file tree
Hide file tree
Showing 9 changed files with 949 additions and 325 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ balance, err := wallet.BalanceByAddress("NKNxxxxx")
Transfer asset to some address:

```go
txnHash, err := wallet.Transfer(account.WalletAddress(), "100", "0")
txnHash, err := wallet.Transfer(account.WalletAddress(), "100", nil)
```

Open nano pay channel to specified address:
Expand Down Expand Up @@ -323,25 +323,25 @@ txnHash, err := wallet.SendRawTransaction(txn)
Register name for this wallet:

```go
txnHash, err = wallet.RegisterName("somename")
txnHash, err = wallet.RegisterName("somename", nil)
```

Delete name for this wallet:

```go
txnHash, err = wallet.DeleteName("somename")
txnHash, err = wallet.DeleteName("somename", nil)
```

Subscribe to specified topic for this wallet for next 100 blocks:

```go
txnHash, err = wallet.Subscribe("identifier", "topic", 100, "meta", "0")
txnHash, err = wallet.Subscribe("identifier", "topic", 100, "meta", nil)
```

Unsubscribe from specified topic:

```go
txnHash, err = wallet.Unsubscribe("identifier", "topic", "0")
txnHash, err = wallet.Unsubscribe("identifier", "topic", nil)
```

## iOS/Android
Expand Down
220 changes: 195 additions & 25 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/gorilla/websocket"
"github.com/nknorg/nkn-sdk-go/payloads"
"github.com/nknorg/nkn/api/common"
apiCommon "github.com/nknorg/nkn/api/common"
"github.com/nknorg/nkn/common"
"github.com/nknorg/nkn/crypto"
"github.com/nknorg/nkn/crypto/ed25519"
"github.com/nknorg/nkn/pb"
"github.com/nknorg/nkn/transaction"
"github.com/nknorg/nkn/util/address"
"github.com/nknorg/nkn/util/config"
"github.com/patrickmn/go-cache"
Expand Down Expand Up @@ -68,14 +70,15 @@ type Client struct {
closed bool
conn *websocket.Conn
node *Node
urlString string
sharedKeys map[string]*[sharedKeySize]byte
wallet *Wallet
}

// clientInterface is the common interface of client and multiclient.
type clientInterface interface {
getConfig() *ClientConfig
send(dests []string, payload *payloads.Payload, encrypted bool, maxHoldingSeconds int32) error
GetSubscribers(topic string, offset, limit int, meta, txPool bool) (*Subscribers, error)
}

type setClientResult struct {
Expand All @@ -100,6 +103,11 @@ func NewClient(account *Account, identifier string, config *ClientConfig) (*Clie
copy(sk[:], account.PrivKey())
curveSecretKey := ed25519.PrivateKeyToCurve25519PrivateKey(&sk)

w, err := NewWallet(account, &WalletConfig{SeedRPCServerAddr: config.SeedRPCServerAddr})
if err != nil {
return nil, err
}

c := Client{
config: config,
account: account,
Expand All @@ -112,6 +120,7 @@ func NewClient(account *Account, identifier string, config *ClientConfig) (*Clie
reconnectChan: make(chan struct{}, 1),
responseChannels: cache.New(time.Duration(config.MsgCacheExpiration)*time.Millisecond, time.Duration(config.MsgCacheExpiration)*time.Millisecond),
sharedKeys: make(map[string]*[sharedKeySize]byte),
wallet: w,
}

go c.handleReconnect()
Expand Down Expand Up @@ -353,12 +362,12 @@ func (c *Client) handleMessage(msgType int, data []byte) error {
if err := json.Unmarshal(*msg["Action"], &action); err != nil {
return err
}
var errCode common.ErrCode
var errCode apiCommon.ErrCode
if err := json.Unmarshal(*msg["Error"], &errCode); err != nil {
return err
}
if errCode != common.SUCCESS {
if errCode == common.WRONG_NODE {
if errCode != apiCommon.SUCCESS {
if errCode == apiCommon.WRONG_NODE {
var node Node
if err := json.Unmarshal(*msg["Result"], &node); err != nil {
return err
Expand All @@ -373,7 +382,7 @@ func (c *Client) handleMessage(msgType int, data []byte) error {
c.Close()
}
return errorWithCode{
err: errors.New(common.ErrMessage[errCode]),
err: errors.New(apiCommon.ErrMessage[errCode]),
code: int32(errCode),
}
}
Expand Down Expand Up @@ -508,20 +517,29 @@ func (c *Client) handleMessage(msgType int, data []byte) error {
}

func (c *Client) connectToNode(node *Node) error {
urlString := (&url.URL{Scheme: "ws", Host: node.Address}).String()
wsAddr := (&url.URL{Scheme: "ws", Host: node.Addr}).String()
dialer := websocket.DefaultDialer
dialer.HandshakeTimeout = time.Duration(c.config.WsHandshakeTimeout) * time.Millisecond

conn, _, err := dialer.Dial(urlString, nil)
conn, _, err := dialer.Dial(wsAddr, nil)
if err != nil {
return err
}

var rpcAddr string
if len(node.RPCAddr) > 0 {
rpcAddr = (&url.URL{Scheme: "http", Host: node.RPCAddr}).String()
}

c.lock.Lock()
prevConn := c.conn
c.conn = conn
c.node = node
c.urlString = urlString
if len(rpcAddr) > 0 {
c.wallet.config.SeedRPCServerAddr = NewStringArray(rpcAddr)
} else {
c.wallet.config.SeedRPCServerAddr = NewStringArray()
}
c.lock.Unlock()

if prevConn != nil {
Expand Down Expand Up @@ -748,7 +766,7 @@ func (c *Client) processDest(dest string) (string, error) {
}
addr := strings.Split(dest, ".")
if len(addr[len(addr)-1]) < 2*ed25519.PublicKeySize {
reg, err := GetRegistrant(addr[len(addr)-1], c.config)
reg, err := c.GetRegistrant(addr[len(addr)-1])
if err != nil {
return "", err
}
Expand Down Expand Up @@ -806,7 +824,7 @@ func (c *Client) newOutboundMessage(dests []string, plds [][]byte, encrypted boo
MaxHoldingSeconds: uint32(maxHoldingSeconds),
}

nodePk, err := hex.DecodeString(c.node.PublicKey)
nodePk, err := hex.DecodeString(c.node.PubKey)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -998,7 +1016,7 @@ func publish(c clientInterface, topic string, data interface{}, config *MessageC

offset := int(config.Offset)
limit := int(config.Limit)
res, err := GetSubscribers(topic, offset, limit, false, config.TxPool, c.getConfig())
res, err := c.GetSubscribers(topic, offset, limit, false, config.TxPool)
if err != nil {
return err
}
Expand All @@ -1013,7 +1031,7 @@ func publish(c clientInterface, topic string, data interface{}, config *MessageC

for len(subscribers) >= limit {
offset += limit
res, err = GetSubscribers(topic, offset, limit, false, false, c.getConfig())
res, err = c.GetSubscribers(topic, offset, limit, false, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -1063,27 +1081,179 @@ func (c *Client) getConfig() *ClientConfig {
return c.config
}

// GetSubscribers gets the subscribers of a topic with a offset and max number
// of results (limit). If meta is true, results contain each subscriber's
// metadata. If txPool is true, results contain subscribers in txPool. Enabling
// this will get subscribers sooner after they send subscribe transactions, but
// might affect the correctness of subscribers because transactions in txpool is
// not guaranteed to be packed into a block.
// ProgramHash returns the program hash of this client's account.
func (c *Client) ProgramHash() common.Uint160 {
return c.wallet.ProgramHash()
}

// SignTransaction signs an unsigned transaction using this client's key pair.
func (c *Client) SignTransaction(tx *transaction.Transaction) error {
return c.wallet.SignTransaction(tx)
}

// NewNanoPay is a shortcut for NewNanoPay using this client's wallet address as
// sender.
//
// Offset and limit are changed to signed int for gomobile compatibility
// Duration is changed to signed int for gomobile compatibility.
func (c *Client) NewNanoPay(recipientAddress, fee string, duration int) (*NanoPay, error) {
return NewNanoPay(c.wallet, c, recipientAddress, fee, duration)
}

// NewNanoPayClaimer is a shortcut for NewNanoPayClaimer using this client as
// RPC client.
func (c *Client) NewNanoPayClaimer(recipientAddress string, claimIntervalMs int32, onError *OnError) (*NanoPayClaimer, error) {
if len(recipientAddress) == 0 {
recipientAddress = c.wallet.address
}
return NewNanoPayClaimer(recipientAddress, claimIntervalMs, onError, c)
}

// GetNonce is the same as package level GetNonce, but using connected node as
// the RPC server, followed by this client's SeedRPCServerAddr if failed.
func (c *Client) GetNonce(txPool bool) (int64, error) {
return c.GetNonceByAddress(c.wallet.address, txPool)
}

// GetNonceByAddress is the same as package level GetNonce, but using connected
// node as the RPC server, followed by this client's SeedRPCServerAddr if
// failed.
func (c *Client) GetNonceByAddress(address string, txPool bool) (int64, error) {
if c.wallet.config.SeedRPCServerAddr.Len() > 0 {
res, err := GetNonce(address, txPool, c.wallet.config)
if err == nil {
return res, err
}
}
return GetNonce(address, txPool, c.config)
}

// GetHeight is the same as package level GetHeight, but using connected node as
// the RPC server, followed by this client's SeedRPCServerAddr if failed.
func (c *Client) GetHeight() (int32, error) {
if c.wallet.config.SeedRPCServerAddr.Len() > 0 {
res, err := GetHeight(c.wallet.config)
if err == nil {
return res, err
}
}
return GetHeight(c.config)
}

// Balance is the same as package level GetBalance, but using connected node as
// the RPC server, followed by this client's SeedRPCServerAddr if failed.
func (c *Client) Balance() (*Amount, error) {
return c.BalanceByAddress(c.wallet.address)
}

// BalanceByAddress is the same as package level GetBalance, but using connected
// node as the RPC server, followed by this client's SeedRPCServerAddr if
// failed.
func (c *Client) BalanceByAddress(address string) (*Amount, error) {
if c.wallet.config.SeedRPCServerAddr.Len() > 0 {
res, err := GetBalance(address, c.wallet.config)
if err == nil {
return res, err
}
}
return GetBalance(address, c.config)
}

// GetSubscribers is the same as package level GetSubscribers, but using
// connected node as the RPC server, followed by this client's SeedRPCServerAddr
// if failed.
func (c *Client) GetSubscribers(topic string, offset, limit int, meta, txPool bool) (*Subscribers, error) {
if c.wallet.config.SeedRPCServerAddr.Len() > 0 {
res, err := GetSubscribers(topic, offset, limit, meta, txPool, c.wallet.config)
if err == nil {
return res, err
}
}
return GetSubscribers(topic, offset, limit, meta, txPool, c.config)
}

// GetSubscription gets the subscription details of a subscriber in a topic.
// GetSubscription is the same as package level GetSubscription, but using
// connected node as the RPC server, followed by this client's SeedRPCServerAddr
// if failed.
func (c *Client) GetSubscription(topic string, subscriber string) (*Subscription, error) {
if c.wallet.config.SeedRPCServerAddr.Len() > 0 {
res, err := GetSubscription(topic, subscriber, c.wallet.config)
if err == nil {
return res, err
}
}
return GetSubscription(topic, subscriber, c.config)
}

// GetSubscribersCount returns the number of subscribers of a topic (not
// including txPool).
//
// Count is changed to signed int for gomobile compatibility
// GetSubscribersCount is the same as package level GetSubscribersCount, but
// using connected node as the RPC server, followed by this client's
// SeedRPCServerAddr if failed.
func (c *Client) GetSubscribersCount(topic string) (int, error) {
if c.wallet.config.SeedRPCServerAddr.Len() > 0 {
res, err := GetSubscribersCount(topic, c.wallet.config)
if err == nil {
return res, err
}
}
return GetSubscribersCount(topic, c.config)
}

// GetRegistrant is the same as package level GetRegistrant, but using connected
// node as the RPC server, followed by this client's SeedRPCServerAddr if
// failed.
func (c *Client) GetRegistrant(name string) (*Registrant, error) {
if c.wallet.config.SeedRPCServerAddr.Len() > 0 {
res, err := GetRegistrant(name, c.wallet.config)
if err == nil {
return res, err
}
}
return GetRegistrant(name, c.config)
}

// SendRawTransaction is the same as package level SendRawTransaction, but using
// connected node as the RPC server, followed by this client's SeedRPCServerAddr
// if failed.
func (c *Client) SendRawTransaction(txn *transaction.Transaction) (string, error) {
if c.wallet.config.SeedRPCServerAddr.Len() > 0 {
res, err := SendRawTransaction(txn, c.wallet.config)
if err == nil {
return res, err
}
}
return SendRawTransaction(txn, c.config)
}

// Transfer is a shortcut for Transfer using this client as SignerRPCClient.
func (c *Client) Transfer(address, amount string, config *TransactionConfig) (string, error) {
return Transfer(c, address, amount, config)
}

// RegisterName is a shortcut for RegisterName using this client as
// SignerRPCClient.
func (c *Client) RegisterName(name string, config *TransactionConfig) (string, error) {
return RegisterName(c, name, config)
}

// TransferName is a shortcut for TransferName using this client as
// SignerRPCClient.
func (c *Client) TransferName(name string, recipientPubKey []byte, config *TransactionConfig) (string, error) {
return TransferName(c, name, recipientPubKey, config)
}

// DeleteName is a shortcut for DeleteName using this client as SignerRPCClient.
func (c *Client) DeleteName(name string, config *TransactionConfig) (string, error) {
return DeleteName(c, name, config)
}

// Subscribe is a shortcut for Subscribe using this client as SignerRPCClient.
//
// Duration is changed to signed int for gomobile compatibility.
func (c *Client) Subscribe(identifier, topic string, duration int, meta string, config *TransactionConfig) (string, error) {
return Subscribe(c, identifier, topic, duration, meta, config)
}

// Unsubscribe is a shortcut for Unsubscribe using this client as
// SignerRPCClient.
func (c *Client) Unsubscribe(identifier, topic string, config *TransactionConfig) (string, error) {
return Unsubscribe(c, identifier, topic, config)
}
Loading

0 comments on commit 13d76a0

Please sign in to comment.