Skip to content

Commit

Permalink
support server side subs
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Aug 29, 2020
1 parent 808385e commit 3d6f0f1
Showing 1 changed file with 154 additions and 13 deletions.
167 changes: 154 additions & 13 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,68 @@ type ErrorHandler interface {
OnError(*Client, *ErrorEvent)
}

// ServerPublishEvent has info about received channel Publication.
type ServerPublishEvent struct {
Channel string
Offset uint64
Data []byte
Info *ClientInfo
}

type ServerSubscribeEvent struct {
Channel string
Resubscribed bool
Recovered bool
}

// ServerJoinEvent has info about user who left channel.
type ServerJoinEvent struct {
Channel string
Client string
User string
ConnInfo []byte
ChanInfo []byte
}

// ServerLeaveEvent has info about user who joined channel.
type ServerLeaveEvent struct {
Channel string
Client string
User string
ConnInfo []byte
ChanInfo []byte
}

// ServerUnsubscribeEvent is an event passed to unsubscribe event handler.
type ServerUnsubscribeEvent struct {
Channel string
}

// ServerPublishHandler ...
type ServerPublishHandler interface {
OnServerPublish(*Client, *ServerPublishEvent)
}

// ServerSubscribeHandler ...
type ServerSubscribeHandler interface {
OnServerSubscribe(*Client, *ServerSubscribeEvent)
}

// ServerUnsubscribeHandler ...
type ServerUnsubscribeHandler interface {
OnServerUnsubscribe(*Client, *ServerUnsubscribeEvent)
}

// ServerJoinHandler ...
type ServerJoinHandler interface {
OnServerJoin(*Client, *ServerJoinEvent)
}

// ServerLeaveHandler ...
type ServerLeaveHandler interface {
OnServerLeave(*Client, *ServerLeaveEvent)
}

type eventProxy struct {
client *Client

Expand All @@ -78,6 +140,12 @@ type eventProxy struct {
onRefresh RefreshHandler
onError ErrorHandler
onMessage MessageHandler

onServerSubscribe ServerSubscribeHandler
onServerPublish ServerPublishHandler
onServerJoin ServerJoinHandler
onServerLeave ServerLeaveHandler
onServerUnsubscribe ServerUnsubscribeHandler
}

func (p *eventProxy) OnConnect(_ *gocentrifuge.Client, e gocentrifuge.ConnectEvent) {
Expand Down Expand Up @@ -126,6 +194,62 @@ func (p *eventProxy) OnMessage(_ *gocentrifuge.Client, e gocentrifuge.MessageEve
})
}

// OnServerPublish ...
func (p *eventProxy) OnServerPublish(_ *gocentrifuge.Client, e gocentrifuge.ServerPublishEvent) {
event := &ServerPublishEvent{
Channel: e.Channel,
Offset: e.Offset,
Data: e.Data,
}
if e.Info != nil {
event.Info = &ClientInfo{
Client: e.Info.Client,
User: e.Info.User,
ConnInfo: e.Info.ConnInfo,
ChanInfo: e.Info.ChanInfo,
}
}
p.onServerPublish.OnServerPublish(p.client, event)
}

// OnServerSubscribe ...
func (p *eventProxy) OnServerSubscribe(_ *gocentrifuge.Client, e gocentrifuge.ServerSubscribeEvent) {
p.onServerSubscribe.OnServerSubscribe(p.client, &ServerSubscribeEvent{
Channel: e.Channel,
Resubscribed: e.Resubscribed,
Recovered: e.Recovered,
})
}

// OnServerUnsubscribe ...
func (p *eventProxy) OnServerUnsubscribe(_ *gocentrifuge.Client, e gocentrifuge.ServerUnsubscribeEvent) {
p.onServerUnsubscribe.OnServerUnsubscribe(p.client, &ServerUnsubscribeEvent{
Channel: e.Channel,
})
}

// OnServerJoin ...
func (p *eventProxy) OnServerJoin(_ *gocentrifuge.Client, e gocentrifuge.ServerJoinEvent) {
p.onServerJoin.OnServerJoin(p.client, &ServerJoinEvent{
Channel: e.Channel,
User: e.User,
Client: e.Client,
ConnInfo: e.ConnInfo,
ChanInfo: e.ChanInfo,
})
}

// OnServerLeave ...
func (p *eventProxy) OnServerLeave(_ *gocentrifuge.Client, e gocentrifuge.ServerLeaveEvent) {
p.onServerLeave.OnServerLeave(p.client, &ServerLeaveEvent{
Channel: e.Channel,
User: e.User,
Client: e.Client,
ConnInfo: e.ConnInfo,
ChanInfo: e.ChanInfo,
})
}

// OnConnect is a function to handle connect event.
func (c *Client) OnConnect(handler ConnectHandler) {
proxy := &eventProxy{client: c, onConnect: handler}
Expand Down Expand Up @@ -162,6 +286,36 @@ func (c *Client) OnMessage(handler MessageHandler) {
c.client.OnMessage(proxy)
}

// OnServerPublish ...
func (c *Client) OnServerPublish(handler ServerPublishHandler) {
proxy := &eventProxy{client: c, onServerPublish: handler}
c.client.OnServerPublish(proxy)
}

// OnServerSubscribe ...
func (c *Client) OnServerSubscribe(handler ServerSubscribeHandler) {
proxy := &eventProxy{client: c, onServerSubscribe: handler}
c.client.OnServerSubscribe(proxy)
}

// OnServerUnsubscribe ...
func (c *Client) OnServerUnsubscribe(handler ServerUnsubscribeHandler) {
proxy := &eventProxy{client: c, onServerUnsubscribe: handler}
c.client.OnServerUnsubscribe(proxy)
}

// OnServerJoin ...
func (c *Client) OnServerJoin(handler ServerJoinHandler) {
proxy := &eventProxy{client: c, onServerJoin: handler}
c.client.OnServerJoin(proxy)
}

// OnServerLeave ...
func (c *Client) OnServerLeave(handler ServerLeaveHandler) {
proxy := &eventProxy{client: c, onServerLeave: handler}
c.client.OnServerLeave(proxy)
}

// SubscribeSuccessEvent is a subscribe success event context passed
// to event callback.
type SubscribeSuccessEvent struct {
Expand Down Expand Up @@ -245,19 +399,6 @@ type subEventProxy struct {
}

func (p *subEventProxy) OnPublish(_ *gocentrifuge.Subscription, e gocentrifuge.PublishEvent) {
pub := Publication{
Offset: e.Offset,
Data: e.Data,
}
if e.Info != nil {
pub.Info = &ClientInfo{
Client: e.Info.Client,
User: e.Info.User,
ConnInfo: e.Info.ConnInfo,
ChanInfo: e.Info.ChanInfo,
}
}

event := &PublishEvent{
Offset: e.Offset,
Data: e.Data,
Expand Down

0 comments on commit 3d6f0f1

Please sign in to comment.