Skip to content

Commit

Permalink
Fix nil client and subscription using new API (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Oct 7, 2018
1 parent f91361a commit e0726be
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 89 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

[[constraint]]
name = "github.com/centrifugal/centrifuge-go"
branch = "master"
version = "0.2.0"

[prune]
go-tests = true
Expand Down
27 changes: 9 additions & 18 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
// Client to connect to Centrifuge-based server or Centrifugo.
type Client struct {
client *gocentrifuge.Client
events *EventHub
}

// Config defaults.
Expand Down Expand Up @@ -38,17 +39,17 @@ func DefaultConfig() *Config {
}

// New initializes Client.
func New(u string, events *EventHub, config *Config) *Client {
func New(u string, config *Config) *Client {
c := gocentrifuge.Config{
ReadTimeout: time.Duration(config.ReadTimeoutMilliseconds) * time.Millisecond,
WriteTimeout: time.Duration(config.WriteTimeoutMilliseconds) * time.Millisecond,
PingInterval: time.Duration(config.PingIntervalMilliseconds) * time.Millisecond,
PrivateChannelPrefix: config.PrivateChannelPrefix,
}
client := &Client{
client: gocentrifuge.New(u, events.eventHub, c),
client: gocentrifuge.New(u, c),
events: NewEventHub(),
}
events.setClient(client)
return client
}

Expand Down Expand Up @@ -99,24 +100,14 @@ func (c *Client) Publish(channel string, data []byte) error {
return c.client.Publish(channel, data)
}

// Subscribe allows to subscribe on channel.
func (c *Client) Subscribe(channel string, events *SubscriptionEventHub) (*Subscription, error) {
sub, err := c.client.Subscribe(channel, events.subEventHub)
// NewSubscription allows to create new Subscription to channel.
func (c *Client) NewSubscription(channel string) (*Subscription, error) {
sub, err := c.client.NewSubscription(channel)
if err != nil {
return nil, err
}
return &Subscription{
s := &Subscription{
sub: sub,
}, nil
}

// SubscribeSync allows to subscribe on channel and wait until subscribe success or error.
func (c *Client) SubscribeSync(channel string, events *SubscriptionEventHub) (*Subscription, error) {
sub, err := c.client.SubscribeSync(channel, events.subEventHub)
if err != nil {
return nil, err
}
return &Subscription{
sub: sub,
}, nil
return s, nil
}
84 changes: 41 additions & 43 deletions events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package centrifuge

import (
"fmt"

gocentrifuge "github.com/centrifugal/centrifuge-go"
)

Expand Down Expand Up @@ -77,9 +79,7 @@ type EventHub struct {

// NewEventHub ...
func NewEventHub() *EventHub {
return &EventHub{
eventHub: gocentrifuge.NewEventHub(),
}
return &EventHub{}
}

func (h *EventHub) setClient(c *Client) {
Expand Down Expand Up @@ -144,39 +144,39 @@ func (p *eventProxy) OnMessage(c *gocentrifuge.Client, e gocentrifuge.MessageEve
}

// OnConnect is a function to handle connect event.
func (h *EventHub) OnConnect(handler ConnectHandler) {
proxy := &eventProxy{client: h.client, onConnect: handler}
h.eventHub.OnConnect(proxy)
func (c *Client) OnConnect(handler ConnectHandler) {
proxy := &eventProxy{client: c, onConnect: handler}
c.client.OnConnect(proxy)
}

// OnDisconnect is a function to handle disconnect event.
func (h *EventHub) OnDisconnect(handler DisconnectHandler) {
proxy := &eventProxy{client: h.client, onDisconnect: handler}
h.eventHub.OnDisconnect(proxy)
func (c *Client) OnDisconnect(handler DisconnectHandler) {
proxy := &eventProxy{client: c, onDisconnect: handler}
c.client.OnDisconnect(proxy)
}

// OnPrivateSub needed to handle private channel subscriptions.
func (h *EventHub) OnPrivateSub(handler PrivateSubHandler) {
proxy := &eventProxy{client: h.client, onPrivateSub: handler}
h.eventHub.OnPrivateSub(proxy)
func (c *Client) OnPrivateSub(handler PrivateSubHandler) {
proxy := &eventProxy{client: c, onPrivateSub: handler}
c.client.OnPrivateSub(proxy)
}

// OnRefresh handles refresh event when client's credentials expired and must be refreshed.
func (h *EventHub) OnRefresh(handler RefreshHandler) {
proxy := &eventProxy{client: h.client, onRefresh: handler}
h.eventHub.OnRefresh(proxy)
func (c *Client) OnRefresh(handler RefreshHandler) {
proxy := &eventProxy{client: c, onRefresh: handler}
c.client.OnRefresh(proxy)
}

// OnError is a function that will receive unhandled errors for logging.
func (h *EventHub) OnError(handler ErrorHandler) {
proxy := &eventProxy{client: h.client, onError: handler}
h.eventHub.OnError(proxy)
func (c *Client) OnError(handler ErrorHandler) {
proxy := &eventProxy{client: c, onError: handler}
c.client.OnError(proxy)
}

// OnMessage allows to process async message from server to client.
func (h *EventHub) OnMessage(handler MessageHandler) {
proxy := &eventProxy{client: h.client, onMessage: handler}
h.eventHub.OnMessage(proxy)
func (c *Client) OnMessage(handler MessageHandler) {
proxy := &eventProxy{client: c, onMessage: handler}
c.client.OnMessage(proxy)
}

// SubscribeSuccessEvent is a subscribe success event context passed
Expand Down Expand Up @@ -258,9 +258,7 @@ type SubscriptionEventHub struct {

// NewSubscriptionEventHub initializes new SubscriptionEventHub.
func NewSubscriptionEventHub() *SubscriptionEventHub {
return &SubscriptionEventHub{
subEventHub: gocentrifuge.NewSubscriptionEventHub(),
}
return &SubscriptionEventHub{}
}

func (h *SubscriptionEventHub) setSub(s *Subscription) {
Expand All @@ -279,6 +277,7 @@ type subEventProxy struct {
}

func (p *subEventProxy) OnPublish(s *gocentrifuge.Subscription, e gocentrifuge.PublishEvent) {
fmt.Printf("%#v\n", p)
pub := Publication{
UID: e.UID,
Data: e.Data,
Expand All @@ -304,7 +303,6 @@ func (p *subEventProxy) OnPublish(s *gocentrifuge.Subscription, e gocentrifuge.P
ChanInfo: e.Info.ChanInfo,
}
}

p.onPublish.OnPublish(p.sub, event)
}

Expand Down Expand Up @@ -344,37 +342,37 @@ func (p *subEventProxy) OnSubscribeError(s *gocentrifuge.Subscription, e gocentr
}

// OnPublish allows to set PublishHandler to SubEventHandler.
func (h *SubscriptionEventHub) OnPublish(handler PublishHandler) {
proxy := &subEventProxy{sub: h.sub, onPublish: handler}
h.subEventHub.OnPublish(proxy)
func (s *Subscription) OnPublish(handler PublishHandler) {
proxy := &subEventProxy{sub: s, onPublish: handler}
s.sub.OnPublish(proxy)
}

// OnJoin allows to set JoinHandler to SubEventHandler.
func (h *SubscriptionEventHub) OnJoin(handler JoinHandler) {
proxy := &subEventProxy{sub: h.sub, onJoin: handler}
h.subEventHub.OnPublish(proxy)
func (s *Subscription) OnJoin(handler JoinHandler) {
proxy := &subEventProxy{sub: s, onJoin: handler}
s.sub.OnJoin(proxy)
}

// OnLeave allows to set LeaveHandler to SubEventHandler.
func (h *SubscriptionEventHub) OnLeave(handler LeaveHandler) {
proxy := &subEventProxy{sub: h.sub, onLeave: handler}
h.subEventHub.OnPublish(proxy)
func (s *Subscription) OnLeave(handler LeaveHandler) {
proxy := &subEventProxy{sub: s, onLeave: handler}
s.sub.OnLeave(proxy)
}

// OnUnsubscribe allows to set UnsubscribeHandler to SubEventHandler.
func (h *SubscriptionEventHub) OnUnsubscribe(handler UnsubscribeHandler) {
proxy := &subEventProxy{sub: h.sub, onUnsubscribe: handler}
h.subEventHub.OnPublish(proxy)
func (s *Subscription) OnUnsubscribe(handler UnsubscribeHandler) {
proxy := &subEventProxy{sub: s, onUnsubscribe: handler}
s.sub.OnUnsubscribe(proxy)
}

// OnSubscribeSuccess allows to set SubscribeSuccessHandler to SubEventHandler.
func (h *SubscriptionEventHub) OnSubscribeSuccess(handler SubscribeSuccessHandler) {
proxy := &subEventProxy{sub: h.sub, onSubscribeSuccess: handler}
h.subEventHub.OnPublish(proxy)
func (s *Subscription) OnSubscribeSuccess(handler SubscribeSuccessHandler) {
proxy := &subEventProxy{sub: s, onSubscribeSuccess: handler}
s.sub.OnSubscribeSuccess(proxy)
}

// OnSubscribeError allows to set SubscribeErrorHandler to SubEventHandler.
func (h *SubscriptionEventHub) OnSubscribeError(handler SubscribeErrorHandler) {
proxy := &subEventProxy{sub: h.sub, onSubscribeError: handler}
h.subEventHub.OnPublish(proxy)
func (s *Subscription) OnSubscribeError(handler SubscribeErrorHandler) {
proxy := &subEventProxy{sub: s, onSubscribeError: handler}
s.sub.OnSubscribeError(proxy)
}
24 changes: 0 additions & 24 deletions events_test.go

This file was deleted.

77 changes: 77 additions & 0 deletions examples/go/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package main

// Connect, subscribe on channel, publish into channel, read presence and history info.

import (
"encoding/json"
"fmt"
"log"

centrifuge "github.com/centrifugal/centrifuge-mobile"
)

type testMessage struct {
Input string `json:"input"`
}

type eventHandler struct{}

func (h *eventHandler) OnConnect(c *centrifuge.Client, e *centrifuge.ConnectEvent) {
log.Printf("client connected")
}

func (h *eventHandler) OnDisconnect(c *centrifuge.Client, e *centrifuge.DisconnectEvent) {
log.Println("client diconnected")
}

type subEventHandler struct{}

func (h *subEventHandler) OnPublish(sub *centrifuge.Subscription, e *centrifuge.PublishEvent) {
log.Println(fmt.Sprintf("New publication received from channel %s: %s", sub.Channel(), string(e.Data)))
}

func (h *subEventHandler) OnJoin(sub *centrifuge.Subscription, e *centrifuge.JoinEvent) {
log.Println(fmt.Sprintf("User %s (client ID %s) joined channel %s", e.User, e.Client, sub.Channel()))
}

func (h *subEventHandler) OnLeave(sub *centrifuge.Subscription, e *centrifuge.LeaveEvent) {
log.Println(fmt.Sprintf("User %s (client ID %s) left channel %s", e.User, e.Client, sub.Channel()))
}

func main() {
url := "ws://localhost:8000/connection/websocket"

c := centrifuge.New(url, centrifuge.DefaultConfig())
defer c.Close()

eventHandler := &eventHandler{}
c.OnConnect(eventHandler)
c.OnDisconnect(eventHandler)

err := c.Connect()
if err != nil {
log.Fatalln(err)
}

sub, err := c.NewSubscription("chat:index")
if err != nil {
log.Fatalln(err)
}

subEventHandler := &subEventHandler{}
sub.OnPublish(subEventHandler)
sub.OnJoin(subEventHandler)
sub.OnLeave(subEventHandler)

err = sub.Subscribe()
if err != nil {
log.Fatalln(err)
}

data := testMessage{Input: "example input"}
dataBytes, _ := json.Marshal(data)
err = sub.Publish(dataBytes)
if err != nil {
log.Fatalln(err)
}
}

0 comments on commit e0726be

Please sign in to comment.