Skip to content

Commit

Permalink
feat: roomproxy
Browse files Browse the repository at this point in the history
  • Loading branch information
hulucc committed May 28, 2021
1 parent be1116f commit 26a0323
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 48 deletions.
2 changes: 1 addition & 1 deletion infra/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func ProvideLocal(ctx context.Context) (*olric.Olric, error) {
c.MemberlistConfig.BindAddr = "127.0.0.1"
c.Started = func() {
defer close(ready)
log.Println("Olric is ready to accept connections")
log.Println("[INFO] Olric is ready to accept connections")
}
db, err := olric.New(c)
if err != nil {
Expand Down
8 changes: 2 additions & 6 deletions proxy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@ func NewBufferPool(ctx context.Context, mtu int) *Pool {
}

func (it *DefaultPool) Get() interface{} {
if it.Statd != nil {
it.Statd.Pool.Get()
}
it.Statd.Pool().Get()
return it.Pool.Get()
}

func (it *DefaultPool) Put(x interface{}) {
if it.Statd != nil {
it.Statd.Pool.Put()
}
it.Statd.Pool().Put()
it.Pool.Put(x)
}
6 changes: 2 additions & 4 deletions proxy/proxy_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ func BenchRecv() ProxyHandler {
for {
select {
case msg := <-pes.Downstream.Rx():
if s != nil {
s.DRx.Incr(len(msg.Buffer))
}
s.DRx().Incr(len(msg.Buffer))
msg.Drop(ctx.BufferPool)
case <-ctx.Done():
return ctx.Err()
Expand All @@ -30,7 +28,7 @@ func BenchSend(target *net.UDPAddr) ProxyHandler {
select {
case pes.Upstream.Tx() <- msg:
if s != nil {
s.UTx.Incr(1)
s.UTx().Incr(1)
}
case <-ctx.Done():
return ctx.Err()
Expand Down
9 changes: 5 additions & 4 deletions proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ func (it *Server) Run(ctx context.Context) error {
s.Worker = it.n
for {
for i := 0; i < it.n; i++ {
(&s.URxch[i]).Set(len(urx[i]))
(&s.DRxch[i]).Set(len(drx[i]))
s.URxch(i).Set(len(urx[i]))
s.DRxch(i).Set(len(drx[i]))
}
(&s.UTxch[0]).Set(len(utx))
(&s.DTxch[0]).Set(len(dtx))
s.UTxch(0).Set(len(utx))
s.DTxch(0).Set(len(dtx))
if ctx.Err() != nil {
return
}
Expand All @@ -112,6 +112,7 @@ func (it *Server) Run(ctx context.Context) error {
}
go report(ctx)

log.Println("[INFO] Proxy server is ready to accept packets.")
<-ctx.Done()
return ctx.Err()
}
Expand Down
130 changes: 113 additions & 17 deletions proxy/statd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,89 @@ type Poolx struct {
type Statd struct {
Name string
Worker int
URx Traffic
UTx Traffic
DRx Traffic
DTx Traffic
URxch [256]Ch
UTxch [256]Ch
DRxch [256]Ch
DTxch [256]Ch
Pool Poolx
urx Traffic
utx Traffic
drx Traffic
dtx Traffic
urxch [256]Ch
utxch [256]Ch
drxch [256]Ch
dtxch [256]Ch
pool Poolx
udrop Traffic
ddrop Traffic
}

func (it *Statd) URx() *Traffic {
if it == nil {
return nil
}
return &it.urx
}

func (it *Statd) UTx() *Traffic {
if it == nil {
return nil
}
return &it.utx
}

func (it *Statd) DRx() *Traffic {
if it == nil {
return nil
}
return &it.drx
}

func (it *Statd) DTx() *Traffic {
if it == nil {
return nil
}
return &it.dtx
}

func (it *Statd) URxch(i int) *Ch {
if it == nil {
return nil
}
return &it.urxch[i]
}
func (it *Statd) UTxch(i int) *Ch {
if it == nil {
return nil
}
return &it.utxch[i]
}
func (it *Statd) DRxch(i int) *Ch {
if it == nil {
return nil
}
return &it.drxch[i]
}
func (it *Statd) DTxch(i int) *Ch {
if it == nil {
return nil
}
return &it.dtxch[i]
}
func (it *Statd) UDrop() *Traffic {
if it == nil {
return nil
}
return &it.udrop
}
func (it *Statd) DDrop() *Traffic {
if it == nil {
return nil
}
return &it.ddrop
}

func (it *Statd) Pool() *Poolx {
if it == nil {
return nil
}
return &it.pool
}

func WithStatd(ctx context.Context, s *Statd) context.Context {
Expand All @@ -48,40 +122,53 @@ func GetStatd(ctx context.Context) *Statd {

func (it *Statd) String() string {
lines := []string{}
traffic := fmt.Sprintf("drx: %s dtx: %s urx: %s utx: %s", it.DRx.String(), it.DTx.String(), it.URx.String(), it.UTx.String())
traffic := fmt.Sprintf("drx: %s dtx: %s urx: %s utx: %s", it.DRx().String(), it.DTx().String(), it.URx().String(), it.UTx().String())
lines = append(lines, traffic)
tx := fmt.Sprintf("dtxch %s utxch %s", (&it.DTxch[0]).String(), (&it.UTxch[0]).String())
tx := fmt.Sprintf("dtxch %s utxch %s", it.DTxch(0).String(), it.UTxch(0).String())
lines = append(lines, tx)
for i := 0; i < it.Worker; i++ {
rx := fmt.Sprintf("drxch%d %s urxch%d %s", i, (&it.DRxch[i]).String(), i, (&it.URxch[i]).String())
rx := fmt.Sprintf("drxch%d %s urxch%d %s", i, it.DRxch(i).String(), i, it.URxch(i).String())
lines = append(lines, rx)
}
pool := fmt.Sprintf("pool %s", it.Pool.String())
pool := fmt.Sprintf("pool %s", it.Pool().String())
lines = append(lines, pool)
drop := fmt.Sprintf("ddrop: %s udrop: %s", it.DDrop().String(), it.UDrop().String())
lines = append(lines, drop)
return strings.Join(lines, "\n")
}

func (it *Statd) Tick() {
for range time.Tick(time.Second) {
fmt.Printf("%v\n", it.String())
it.URx.Reset()
it.UTx.Reset()
it.DRx.Reset()
it.DTx.Reset()
it.URx().Reset()
it.UTx().Reset()
it.DRx().Reset()
it.DTx().Reset()
it.UDrop().Reset()
it.DDrop().Reset()
}
}

func (it *Traffic) Reset() {
if it == nil {
return
}
atomic.SwapUint64(&it.pps, uint64(0))
atomic.SwapUint64(&it.bw, uint64(0))
}

func (it *Traffic) Incr(size int) {
if it == nil {
return
}
atomic.AddUint64(&it.pps, 1)
atomic.AddUint64(&it.bw, uint64(size))
}

func (it *Traffic) Add(n int, size int) {
if it == nil {
return
}
atomic.AddUint64(&it.pps, uint64(n))
atomic.AddUint64(&it.bw, uint64(size))
}
Expand All @@ -93,6 +180,9 @@ func (it *Traffic) String() string {
}

func (it *Ch) Set(size int) {
if it == nil {
return
}
atomic.StoreUint64(&it.size, uint64(size))
}

Expand All @@ -102,10 +192,16 @@ func (it *Ch) String() string {
}

func (it *Poolx) Get() {
if it == nil {
return
}
atomic.AddInt64(&it.size, 1)
}

func (it *Poolx) Put() {
if it == nil {
return
}
atomic.AddInt64(&it.size, -1)
}

Expand Down
79 changes: 65 additions & 14 deletions services/roomproxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,64 @@ import (
"net"
"time"

"github.com/LilithGames/spiracle/config"
"github.com/LilithGames/spiracle/protocol"
"github.com/LilithGames/spiracle/protocol/heartbeat"
"github.com/LilithGames/spiracle/protocol/kcp"
"github.com/LilithGames/spiracle/protocol/multiplex"
"github.com/LilithGames/spiracle/proxy"
"github.com/LilithGames/spiracle/repos"
"github.com/LilithGames/spiracle/infra/db"
"github.com/buraksezer/olric"
)

type RoomProxy struct {
*roomProxyOptions
ctx context.Context
name string
conf *config.Config
session repos.SessionRepo
router repos.RouterRepo
kcp protocol.Parser
multiplex protocol.Parser
heartbeat protocol.Parser
}

type roomProxyOptions struct {
expire time.Duration
session repos.SessionRepo
router repos.RouterRepo
db *olric.Olric
}

func NewRoomProxy(ctx context.Context, conf *config.Config, name string, router repos.RouterRepo, db *olric.Olric) (*RoomProxy, error) {
session, err := repos.NewSessionRepo(fmt.Sprintf("roomproxy.%s", name), db)
if err != nil {
return nil, fmt.Errorf("NewRoomProxy pstream NewSessionRepo err: %w", err)
func (it *RoomProxy) Routers() repos.RouterRepo {
return it.router
}

func NewRoomProxy(ctx context.Context, name string, opts ...RoomProxyOption) (*RoomProxy, error) {
o := getRoomProxyOptions(opts...)
var err error
if o.db == nil {
o.db, err = db.ProvideLocal(ctx)
if err != nil {
return nil, fmt.Errorf("NewRoomProxy db.ProvideLocal err: %w", err)
}
}
if err != nil {
return nil, err
if o.session == nil {
o.session, err = repos.NewSessionRepo(fmt.Sprintf("roomproxy.%s", name), o.db)
if err != nil {
return nil, fmt.Errorf("NewRoomProxy NewSessionRepo err: %w", err)
}
}
if o.router == nil {
o.router, err = repos.NewRouterRepo(fmt.Sprintf("roomproxy.%s", name), o.db)
if err != nil {
return nil, fmt.Errorf("NewRoomProxy NewRouterRepo err: %w", err)
}
}
rp := &RoomProxy{
roomProxyOptions: o,
ctx: ctx,
name: name,
conf: conf,
session: session,
router: router,
kcp: protocol.NewFuncParser(kcp.Parser()),
multiplex: protocol.NewFuncParser(multiplex.Parser()),
heartbeat: protocol.NewFuncParser(heartbeat.Parser()),
expire: time.Second * 30,
}
return rp, nil
}
Expand Down Expand Up @@ -114,24 +132,57 @@ func (it *RoomProxy) uroute(m *proxy.UdpMsg) error {
}

func (it *RoomProxy) Run(ctx *proxy.ProxyContext, pes *proxy.ProxyEndpoints) error {
s := proxy.GetStatd(ctx.Context)
for {
select {
case m := <-pes.Downstream.Rx():
s.DRx().Incr(len(m.Buffer))
err := it.droute(m)
if err != nil {
s.DDrop().Incr(len(m.Buffer))
m.Drop(ctx.BufferPool)
continue
}
pes.Upstream.Tx() <- m
s.DTx().Incr(len(m.Buffer))
case m := <-pes.Upstream.Rx():
s.URx().Incr(len(m.Buffer))
err := it.uroute(m)
if err != nil {
s.UDrop().Incr(len(m.Buffer))
m.Drop(ctx.BufferPool)
continue
}
pes.Downstream.Tx() <- m
s.UTx().Incr(len(m.Buffer))
case <-ctx.Done():
return ctx.Err()
}
}
}

type RoomProxyOption interface {
apply(*roomProxyOptions)
}

type funcRoomProxyOption struct {
f func(*roomProxyOptions)
}

func (it *funcRoomProxyOption) apply(o *roomProxyOptions) {
it.f(o)
}

func newFuncRoomProxyOption(f func(*roomProxyOptions)) RoomProxyOption {
return &funcRoomProxyOption{f: f}
}
func getRoomProxyOptions(opts ...RoomProxyOption) *roomProxyOptions {
o := &roomProxyOptions{
expire: time.Second*30,
}
for _, opt := range opts {
opt.apply(o)
}
return o
}

Loading

0 comments on commit 26a0323

Please sign in to comment.