Skip to content

Commit

Permalink
refactor mediator interface
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jun 21, 2015
1 parent 05bc0ce commit 3251d12
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 46 deletions.
10 changes: 4 additions & 6 deletions libcentrifugo/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,7 @@ func newMessage(ch Channel, data []byte, client ConnID, info *ClientInfo) (Messa
return message, nil
}

// Publish sends a message to all clients subscribed on project channel with provided data, client
// and client info.
// Publish sends a message to all clients subscribed on project channel with provided data, client and ClientInfo.
func (app *Application) Publish(pk ProjectKey, ch Channel, data []byte, client ConnID, info *ClientInfo) error {

if string(ch) == "" || len(data) == 0 {
Expand Down Expand Up @@ -350,11 +349,10 @@ func (app *Application) publish(pk ProjectKey, ch Channel, data []byte, client C
if app.mediator != nil {
// If mediator is set then we don't need to publish message
// immediately as mediator will decide itself what to do with it.
ok := app.mediator.Message(pk, ch, data, client, info, fromClient)
if !ok {
return ErrRejected
pass := app.mediator.Message(pk, ch, data, client, info)
if !pass {
return nil
}
return nil
}

err = app.pubClient(pk, ch, chOpts, data, client, info)
Expand Down
43 changes: 15 additions & 28 deletions libcentrifugo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,26 +217,20 @@ func (c *client) clean() error {
}
}

if pk != "" && c.app.mediator != nil {
c.app.mediator.Disconnect(c.Project, c.info(Channel("")))
}

close(c.closeChan)
c.messages.Close()

if pk != "" && c.app.mediator != nil {
c.app.mediator.Disconnect(pk, c.Uid, c.User)
}

return nil
}

func (c *client) info(ch Channel) ClientInfo {
var channelInfo []byte
var ok bool
if string(ch) == "" {
channelInfo, ok := c.channelInfo[ch]
if !ok {
channelInfo = []byte{}
} else {
channelInfo, ok = c.channelInfo[ch]
if !ok {
channelInfo = []byte{}
}
}
var rawDefaultInfo *json.RawMessage
var rawChannelInfo *json.RawMessage
Expand Down Expand Up @@ -504,13 +498,6 @@ func (c *client) connectCmd(cmd *connectClientCommand) (*response, error) {
c.Channels = map[Channel]bool{}
c.channelInfo = map[Channel][]byte{}

if c.app.mediator != nil {
ok := c.app.mediator.Connect(c.Project, c.info(Channel("")))
if !ok {
return nil, ErrRejected
}
}

go c.presencePing()

err := c.app.addConn(c)
Expand All @@ -519,6 +506,10 @@ func (c *client) connectCmd(cmd *connectClientCommand) (*response, error) {
return nil, ErrInternalServerError
}

if c.app.mediator != nil {
c.app.mediator.Connect(c.Project, c.Uid, c.User)
}

if timeToExpire > 0 {
duration := time.Duration(timeToExpire+closeDelay) * time.Second
c.expireTimer = time.AfterFunc(duration, c.expire)
Expand Down Expand Up @@ -651,14 +642,6 @@ func (c *client) subscribeCmd(cmd *subscribeClientCommand) (*response, error) {

info := c.info(channel)

if c.app.mediator != nil {
ok := c.app.mediator.Subscribe(c.Project, channel, info)
if !ok {
resp.Err(ErrRejected)
return resp, nil
}
}

err = c.app.addSub(c.Project, channel, c)
if err != nil {
logger.ERROR.Println(err)
Expand All @@ -680,6 +663,10 @@ func (c *client) subscribeCmd(cmd *subscribeClientCommand) (*response, error) {
}
}

if c.app.mediator != nil {
c.app.mediator.Subscribe(c.Project, channel, c.Uid, c.User)
}

return resp, nil
}

Expand Down Expand Up @@ -732,7 +719,7 @@ func (c *client) unsubscribeCmd(cmd *unsubscribeClientCommand) (*response, error
}

if c.app.mediator != nil {
c.app.mediator.Unsubscribe(c.Project, channel, info)
c.app.mediator.Unsubscribe(c.Project, channel, c.Uid, c.User)
}

return resp, nil
Expand Down
10 changes: 5 additions & 5 deletions libcentrifugo/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ package libcentrifugo
// Go code. Implemented Mediator must be set to Application via
// corresponding Application method SetMediator.
type Mediator interface {
Connect(pk ProjectKey, info ClientInfo) bool
Subscribe(pk ProjectKey, ch Channel, info ClientInfo) bool
Unsubscribe(pk ProjectKey, ch Channel, info ClientInfo)
Disconnect(pk ProjectKey, info ClientInfo)
Message(pk ProjectKey, ch Channel, data []byte, client ConnID, info *ClientInfo, fromClient bool) bool
Connect(pk ProjectKey, client ConnID, user UserID)
Subscribe(pk ProjectKey, ch Channel, client ConnID, user UserID)
Unsubscribe(pk ProjectKey, ch Channel, client ConnID, user UserID)
Disconnect(pk ProjectKey, client ConnID, user UserID)
Message(pk ProjectKey, ch Channel, data []byte, client ConnID, info *ClientInfo) bool
}
12 changes: 5 additions & 7 deletions libcentrifugo/mediator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,25 @@ type testMediator struct {
message int
}

func (m *testMediator) Connect(pk ProjectKey, info ClientInfo) bool {
func (m *testMediator) Connect(pk ProjectKey, client ConnID, user UserID) {
m.connect += 1
return true
}

func (m *testMediator) Subscribe(pk ProjectKey, ch Channel, info ClientInfo) bool {
func (m *testMediator) Subscribe(pk ProjectKey, ch Channel, client ConnID, user UserID) {
m.subscribe += 1
return true
}

func (m *testMediator) Unsubscribe(pk ProjectKey, ch Channel, info ClientInfo) {
func (m *testMediator) Unsubscribe(pk ProjectKey, ch Channel, client ConnID, user UserID) {
m.unsubscribe += 1
return
}

func (m *testMediator) Disconnect(pk ProjectKey, info ClientInfo) {
func (m *testMediator) Disconnect(pk ProjectKey, client ConnID, user UserID) {
m.disconnect += 1
return
}

func (m *testMediator) Message(pk ProjectKey, ch Channel, data []byte, client ConnID, info *ClientInfo, fromClient bool) bool {
func (m *testMediator) Message(pk ProjectKey, ch Channel, data []byte, client ConnID, info *ClientInfo) bool {
m.message += 1
return false
}
Expand Down

0 comments on commit 3251d12

Please sign in to comment.