Skip to content
This repository has been archived by the owner on Mar 19, 2022. It is now read-only.

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
damnever committed Oct 29, 2017
0 parents commit fe1b9b2
Show file tree
Hide file tree
Showing 20 changed files with 4,447 additions and 0 deletions.
22 changes: 22 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Binaries for programs and plugins
*.exe
*.dll
*.so
*.dylib

# Test binary, build with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Ignore dependencies
vendor/

# node
node_modules/
dist/

bin/
data/
.DS_Store
25 changes: 25 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Copyright (c) 2017, XiaoChao Dong (@damnever) <[email protected]>

All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the <organization> nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
160 changes: 160 additions & 0 deletions birpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package birpc

import (
"crypto/tls"
"fmt"
"net"
"runtime"
"time"

"github.com/damnever/sunflower/msg"
"github.com/damnever/sunflower/msg/msgpb"
"github.com/damnever/sunflower/pkg/retry"
"github.com/damnever/sunflower/pkg/util"
"github.com/damnever/sunflower/version"
)

// TODO(damnever):
// - more details about device information
// - report client side stats during heartbeat
// - runtime: goroutines, trace, profile, gc, etc.

var deviceInfo = fmt.Sprintf("%v/%v", runtime.GOOS, runtime.GOARCH)

type ClientHandler interface {
HandlePingResponse(req *msgpb.PingResponse)
HandleNewTunnelRequest(req *msgpb.NewTunnelRequest) *msgpb.NewTunnelResponse
HandleCloseTunnelRequest(req *msgpb.CloseTunnelRequest) *msgpb.CloseTunnelResponse
HandleShutdownRequest(req *msgpb.ShutdownRequest) bool
HandleUnknownMessage(m interface{})
HandleError(err error)
}

type ClientConfig struct {
ID string
Hash string
RemoteAddr string
HeartbeatInterval time.Duration
Timeout util.TimeoutConfig
Retrier *retry.Retrier
TLSConf *tls.Config
}

type Client struct {
config *ClientConfig
conn net.Conn
closed chan struct{}
}

func NewClient(conf *ClientConfig) (*Client, error) {
conn, err := connectAndHandshake(conf)
if err != nil {
return nil, err
}
return &Client{
config: conf,
conn: conn,
closed: make(chan struct{}),
}, nil
}

// Run starts communicating with server, do reconnecting and requests dispatching logic.
func (cli *Client) Run(handler ClientHandler) error {
pingReq := &msgpb.PingRequest{}

conf := cli.config
conn := NewConn(cli.conn, conf.Timeout.Read, conf.Timeout.Write)
conn.Go()
defer func() { conn.Close() }()

ticker := time.NewTicker(conf.HeartbeatInterval)
defer ticker.Stop()
tryConnect := tryConnectFunc(conf, cli.closed)

for {
select {
case <-cli.closed:
return nil
case err := <-conn.Err():
handler.HandleError(err)
conn.Close()
rawConn, fatalErr := tryConnect()
if fatalErr != nil {
if fatalErr == retry.ErrCanceled {
return nil
}
return fatalErr
}
conn = NewConn(rawConn, conf.Timeout.Read, conf.Timeout.Write)
conn.Go()

case m := <-conn.In():
switch x := m.(type) {
case *msgpb.PingResponse:
go handler.HandlePingResponse(x)
case *msgpb.NewTunnelRequest:
go func() { conn.Out() <- handler.HandleNewTunnelRequest(x) }()
case *msgpb.CloseTunnelRequest:
go func() { conn.Out() <- handler.HandleCloseTunnelRequest(x) }()
case *msgpb.ShutdownRequest:
if handler.HandleShutdownRequest(x) {
return nil
}
default:
go handler.HandleUnknownMessage(x)
}

case <-ticker.C:
conn.Out() <- pingReq
}
}
}

func (cli *Client) Close() error {
close(cli.closed)
return cli.conn.Close()
}

func tryConnectFunc(conf *ClientConfig, closed chan struct{}) func() (net.Conn, error) {
return func() (conn net.Conn, err error) {
conf.Retrier.Run(func() error {
select {
case <-closed:
err = retry.ErrCanceled
return err
default:
}
conn, err = connectAndHandshake(conf)
return err
})
return
}
}

func connectAndHandshake(conf *ClientConfig) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", conf.RemoteAddr, conf.Timeout.Connect)
if err != nil {
return nil, err
}

req := &msgpb.HandshakeRequest{
ID: conf.ID,
Hash: conf.Hash,
Version: version.Info(),
Device: deviceInfo,
}
conn.SetWriteDeadline(time.Now().Add(conf.Timeout.Write))
if err := msg.Write(conn, req); err != nil {
return nil, err
}

conn.SetReadDeadline(time.Now().Add(conf.Timeout.Read))
var resp msgpb.HandshakeResponse
if err = msg.ReadTo(conn, &resp); err != nil {
return nil, err
}
if err = msg.CodeToError(resp.ErrCode); err != nil {
return nil, err
}
return conn, nil
}
107 changes: 107 additions & 0 deletions birpc/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package birpc

import (
"fmt"
"net"
"sync/atomic"
"time"

"github.com/damnever/sunflower/msg"
"github.com/damnever/sunflower/pkg/util"
)

const (
inChSize = 16
outChSize = 16
)

type Conn struct {
conn net.Conn
rdTimeout time.Duration
wrTimeout time.Duration
closeFlag int32
closed chan struct{}
errCh chan error
in chan interface{}
out chan interface{}
}

func NewConn(conn net.Conn, rdTimeout time.Duration, wrTimeout time.Duration) *Conn {
c := &Conn{
conn: conn,
rdTimeout: rdTimeout,
wrTimeout: wrTimeout,
closeFlag: 0,
closed: make(chan struct{}),
errCh: make(chan error, 2),
in: make(chan interface{}, inChSize),
out: make(chan interface{}, outChSize),
}
return c
}

func (c *Conn) In() <-chan interface{} {
return c.in
}

func (c *Conn) Out() chan<- interface{} {
return c.out
}

func (c *Conn) Err() <-chan error {
return c.errCh
}

func (c *Conn) Go() {
go c.push()
go c.pull()
}

func (c *Conn) Close() error {
if !atomic.CompareAndSwapInt32(&c.closeFlag, 0, 1) {
return nil
}
close(c.closed)
return c.conn.Close()
}

func (c *Conn) push() {
defer c.recoverPanic()
PUSH_LOOP:
for {
select {
case <-c.closed:
break PUSH_LOOP
case v := <-c.out:
c.conn.SetWriteDeadline(time.Now().Add(c.wrTimeout))
util.Must(msg.Write(c.conn, v))
}
}
}

func (c *Conn) pull() {
defer c.recoverPanic()
PULL_LOOP:
for {
c.conn.SetReadDeadline(time.Now().Add(c.rdTimeout))
m, err := msg.Read(c.conn)
util.Must(err)

select {
case <-c.closed:
break PULL_LOOP
case c.in <- m:
}
}
}

func (c *Conn) recoverPanic() {
if e := recover(); e != nil {
c.Close()
err, ok := e.(error)
if !ok {
err = fmt.Errorf("panic: %+v\n", e)
}
c.errCh <- err
}
}
Loading

0 comments on commit fe1b9b2

Please sign in to comment.