-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrest.go
107 lines (93 loc) · 2.36 KB
/
rest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package eevee
import (
"bytes"
"context"
"net/http"
"time"
"github.com/gorilla/mux"
)
type RestRoute struct {
Path string
Method string
}
type RestConnectionConfig struct {
Topics map[string]RestRoute
Port string
Router *mux.Router
Wrapper func(http.Handler) http.Handler
}
type RestConnection struct {
router *mux.Router
config RestConnectionConfig
server *http.Server
in chan Payload
out chan Payload
routeStatus chan RouteStatus
}
func NewRestConnection(config RestConnectionConfig) (*RestConnection, error) {
var router *mux.Router
if config.Router != nil {
router = config.Router
} else {
router = mux.NewRouter()
}
handler := createHandler(config.Wrapper, router)
return &RestConnection{
router: router,
config: config,
server: &http.Server{
Addr: ":" + config.Port,
Handler: handler,
},
in: make(chan Payload),
out: make(chan Payload),
routeStatus: make(chan RouteStatus),
}, nil
}
func (rc *RestConnection) Start(ctx context.Context) {
for topic, route := range rc.config.Topics {
rc.router.Handle(route.Path, rc.requestHandler(topic)).Methods(route.Method)
}
go rc.server.ListenAndServe()
go rc.waitToStopServer(ctx)
}
func (rc *RestConnection) In() <-chan Payload {
return rc.in
}
func (rc *RestConnection) Out() chan<- Payload {
return rc.out
}
func (rc *RestConnection) RouteStatus() chan<- RouteStatus {
return rc.routeStatus
}
func (rc *RestConnection) requestHandler(topic string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Debug("REST received message")
defer r.Body.Close()
msg := new(bytes.Buffer)
msg.ReadFrom(r.Body)
rc.in <- NewPayload(msg.Bytes(), topic)
status := <-rc.routeStatus
if status.Code == RouteOK {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(status.Message))
}
})
}
func (rc *RestConnection) waitToStopServer(ctx context.Context) {
<-ctx.Done()
timeoutCtx, cancelCtx := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelCtx()
log.Info("Stopping REST Client")
rc.server.Shutdown(timeoutCtx)
}
func createHandler(wrapper func(http.Handler) http.Handler, router *mux.Router) (handler http.Handler) {
if wrapper != nil {
handler = wrapper(router)
} else {
handler = router
}
return handler
}