-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublishSubscribe.go
166 lines (141 loc) · 3.04 KB
/
publishSubscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package redis
import (
"bufio"
"io"
)
// 发布/订阅
// Message 接收到的消息
type Message struct {
ChannelMatche string // 频道模式
Channel string // 具体频道
Message []byte // 具体消息
}
// Subscribe 订阅频道
func (client *Client) Subscribe(subscribe <-chan string, unscribe <-chan string, psubscribe <-chan string, punscribe <-chan string, messages chan<- Message) error {
cmds := make(chan []string, 0)
data := make(chan interface{}, 0)
// 开个协程处理分析命令模式
go func() {
for {
var channel string
var cmd string
select {
case channel = <-subscribe:
cmd = "SUBSCRIBE"
case channel = <-unscribe:
cmd = "UNSUBSCRIBE"
case channel = <-psubscribe:
cmd = "PSUBSCRIBE"
case channel = <-punscribe:
cmd = "PUNSUBSCRIBE"
}
if channel == "" {
break
} else {
cmds <- []string{cmd, channel}
}
}
close(cmds)
close(data)
}()
// 把频道广播的值放到messages里
go func() {
for response := range data {
db := response.([][]byte)
messageType := string(db[0])
switch messageType {
case "message":
channel, message := string(db[1]), db[2]
messages <- Message{channel, channel, message}
case "subscribe":
// Ignore
case "unsubscribe":
// Ignore
case "pmessage":
channelMatched, channel, message := string(db[1]), string(db[2]), db[3]
messages <- Message{channelMatched, channel, message}
case "psubscribe":
// Ignore
case "punsubscribe":
// Ignore
default:
// log.Printf("Unknown message '%s'", messageType)
}
}
}()
err := client.sendCommands(cmds, data)
return err
}
func (client *Client) sendCommands(cmds <-chan []string, data chan<- interface{}) (err error) {
c, err := client.popCon()
var reader *bufio.Reader
var pong interface{}
var errs chan error
var errClosed = false
if err != nil {
goto End
}
reader = bufio.NewReader(c)
err = writeRequest(c, "PING")
if err == io.EOF {
c, err = client.openConnection()
if err != nil {
goto End
}
reader = bufio.NewReader(c)
} else {
pong, err = readResponse(reader)
if pong != "PONG" {
return RedisError("Unexpected response to PING.")
}
if err != nil {
goto End
}
}
errs = make(chan error) // 无缓存的通道,只要有进入就会阻塞
// 开个协程处理命令
go func() {
for cmd := range cmds {
err = writeRequest(c, cmd[0], cmd[1:]...)
if err != nil {
if !errClosed {
errs <- err
}
break
}
}
if !errClosed {
errClosed = true
close(errs)
}
}()
// 处理返回数据
go func() {
for {
response, err := readResponse(reader)
if err != nil {
if !errClosed {
errs <- err
}
break
}
data <- response
}
if !errClosed {
errClosed = true
close(errs)
}
}()
for e := range errs {
err = e
}
End:
c.Close()
client.pushCon(nil)
return
}
// Publish 频道发布信息
func (client *Client) Publish(channel string, val []byte) error {
_, err := client.sendCommand("PUBLISH", channel, string(val))
return err
}