Skip to content

Commit

Permalink
faet: increased use of custom routine pools with req.
Browse files Browse the repository at this point in the history
  • Loading branch information
dickens7 committed May 12, 2024
1 parent b4444e9 commit 5a1fee4
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
7 changes: 7 additions & 0 deletions server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ func WithCustomPool(pool WorkerPool) OptionFn {
}
}

// WithReqPool uses a custom goroutine pool with req.
func WithReqPool(reqPool ReqWorkerPool) OptionFn {
return func(s *Server) {
s.reqPool = reqPool
}
}

// WithAsyncWrite sets AsyncWrite to true.
func WithAsyncWrite() OptionFn {
return func(s *Server) {
Expand Down
28 changes: 26 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ type WorkerPool interface {
StopAndWait()
}

type ReqWorkerPool interface {
Submit(req *protocol.Message, task func())
StopAndWaitFor(deadline time.Duration)
Stop()
StopAndWait()
}

// Server is rpcx server that use TCP or UDP.
type Server struct {
ln net.Listener
Expand All @@ -90,6 +97,7 @@ type Server struct {
EnableProfile bool // enable profile and statsview or not
AsyncWrite bool // set true if your server only serves few clients
pool WorkerPool
reqPool ReqWorkerPool

serviceMapMu sync.RWMutex
serviceMap map[string]*service
Expand Down Expand Up @@ -365,7 +373,15 @@ func (s *Server) sendResponse(ctx *share.Context, conn net.Conn, err error, req,

data := res.EncodeSlicePointer()
if s.AsyncWrite {
if s.pool != nil {
if s.reqPool != nil {
s.reqPool.Submit(req, func() {
if s.writeTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
}
conn.Write(*data)
protocol.PutData(data)
})
} else if s.pool != nil {
s.pool.Submit(func() {
if s.writeTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
Expand Down Expand Up @@ -514,7 +530,11 @@ func (s *Server) serveConn(conn net.Conn) {
continue
}

if s.pool != nil {
if s.reqPool != nil {
s.reqPool.Submit(req, func() {
s.processOneRequest(ctx, req, conn)
})
} else if s.pool != nil {
s.pool.Submit(func() {
s.processOneRequest(ctx, req, conn)
})
Expand Down Expand Up @@ -921,6 +941,10 @@ func (s *Server) Close() error {
s.pool.StopAndWaitFor(10 * time.Second)
}

if s.reqPool != nil {
s.reqPool.StopAndWaitFor(10 * time.Second)
}

return err
}

Expand Down

0 comments on commit 5a1fee4

Please sign in to comment.