-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtcp_connection.go
106 lines (80 loc) · 1.82 KB
/
tcp_connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package ramix
import (
"net"
)
type TCPConnection struct {
*netConnection
socket net.Conn
}
func (c *TCPConnection) writer() {
for {
select {
case <-c.ctx.Done():
debug("TCPConnection %d writer stopped", c.ID())
return
case data := <-c.messageChannel:
_, _ = c.socket.Write(data)
}
}
}
func (c *TCPConnection) reader() {
defer c.close(true)
for {
select {
case <-c.ctx.Done():
debug("TCPConnection %d reader stopped", c.ID())
return
default:
buffer := make([]byte, c.server.ConnectionReadBufferSize)
length, err := c.socket.Read(buffer)
if err != nil {
debug("TCPSocket read error: %v", err)
return
}
c.refreshLastActiveTime()
bytesSlices := c.frameDecoder.Decode(buffer[0:length])
for _, bytesSlice := range bytesSlices {
message, err := c.server.decoder.Decode(bytesSlice)
if err != nil {
debug("Message decode error: %v", err)
continue
}
c.server.handleRequest(c, newRequest(message))
}
}
}
}
func (c *TCPConnection) RemoteAddress() net.Addr {
return c.socket.RemoteAddr()
}
func (c *TCPConnection) open() {
go c.reader()
go c.writer()
go c.heartbeatChecker.start()
if c.server.connectionOpen != nil {
c.server.connectionOpen(c)
}
}
func (c *TCPConnection) close(syncConnectionManager bool) {
c.lock.Lock()
defer c.lock.Unlock()
if c.isClosed {
return
}
if c.server.connectionClose != nil {
c.server.connectionClose(c)
}
_ = c.socket.Close()
c.isClosed = true
c.cancel()
close(c.messageChannel)
c.heartbeatChecker.stop()
if syncConnectionManager {
c.server.connectionManager.removeConnection(c)
}
// If the worker pool is not used, need to stop the worker by self
if !c.server.UsingWorkerPool() {
c.worker.stop()
}
debug("TCPConnection %d closed, remote address: %v", c.ID(), c.socket.RemoteAddr())
}