Skip to content

Commit

Permalink
Update based on new salesforce generic subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
kush-elastic committed Jan 11, 2022
1 parent 9f83643 commit 633e946
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 23 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# bayeux
Bayeux Client Protocol implemented in Golang (as specified by Salesforce Realtime API)

Fork from zph/bayeux:
Changes to accept both 'payload' and 'sobject'.
All the channels (user-created or generic) should be accepted.
Make it so user will pass chan from main function then they can close it anytime.
User can metioned replay mechanism from Channel method.

# Usage
See `examples/main.go`
```golang
Expand All @@ -9,13 +15,15 @@ package main
import (
"fmt"

bay "github.com/zph/bayeux"
bay "github.com/kush-elastic/bayeux"
)

func Example() {
out := make(chan bay.TriggerEvent)
b := bay.Bayeux{}
creds := bay.GetSalesforceCredentials()
c := b.TopicToChannel(creds, "topicName")
replay := "-1"
c := b.Channel(out, replay, creds, "channel")
for {
select {
case e := <-c:
Expand Down
29 changes: 12 additions & 17 deletions bayeux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ type TriggerEvent struct {
ReplayID int `json:"replayId"`
Type string `json:"type"`
} `json:"event"`
Object json.RawMessage `json:"sobject"`
Object json.RawMessage `json:"sobject"`
Payload json.RawMessage `json:"payload"`
} `json:"data,omitempty"`
Channel string `json:"channel"`
Successful bool `json:"successful,omitempty"`
}

func (t TriggerEvent) topic() string {
func (t TriggerEvent) channel() string {
s := strings.Replace(t.Channel, "/topic/", "", 1)
return s
}
Expand Down Expand Up @@ -157,15 +158,15 @@ type Replay struct {
Value int
}

func (b *Bayeux) subscribe(topic string, replay Replay) Subscription {
func (b *Bayeux) subscribe(channel string, replay string) Subscription {
handshake := fmt.Sprintf(`{
"channel": "/meta/subscribe",
"subscription": "/topic/%s",
"subscription": "%s",
"clientId": "%s",
"ext": {
"replay": {"/topic/%s": "%d"}
"replay": {"%s": "%s"}
}
}`, topic, b.id.clientID, topic, replay)
}`, channel, b.id.clientID, channel, replay)
resp, err := b.call(handshake, b.creds.bayeuxUrl())
if err != nil {
logger.Fatalf("Cannot subscribe %s", err)
Expand All @@ -174,7 +175,6 @@ func (b *Bayeux) subscribe(topic string, replay Replay) Subscription {
defer resp.Body.Close()
if os.Getenv("DEBUG") != "" {
logger.Printf("Response: %+v", resp)
// // Read the content
var b []byte
if resp.Body != nil {
b, _ = ioutil.ReadAll(resp.Body)
Expand All @@ -199,25 +199,21 @@ func (b *Bayeux) subscribe(topic string, replay Replay) Subscription {
sub := h[0]
status.connected = sub.Successful
status.clientID = sub.ClientID
status.channels = append(status.channels, topic)
status.channels = append(status.channels, channel)
logger.Printf("Established connection(s): %+v", status)
return sub
}

func (b *Bayeux) connect() chan TriggerEvent {
out := make(chan TriggerEvent)
func (b *Bayeux) connect(out chan TriggerEvent) chan TriggerEvent {
go func() {
// TODO: add stop chan to bring this thing to halt
for {
postBody := fmt.Sprintf(`{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "%s"} `, b.id.clientID)
resp, err := b.call(postBody, b.creds.bayeuxUrl())
if err != nil {
logger.Printf("Cannot connect to bayeux %s", err)
logger.Println("Trying again...")
} else {
defer resp.Body.Close()
if os.Getenv("DEBUG") != "" {
// // Read the content
var b []byte
if resp.Body != nil {
b, _ = ioutil.ReadAll(resp.Body)
Expand Down Expand Up @@ -277,15 +273,14 @@ func mustGetEnv(s string) string {
return r
}

func (b *Bayeux) TopicToChannel(creds Credentials, topic string) chan TriggerEvent {
func (b *Bayeux) Channel(out chan TriggerEvent, r string, creds Credentials, channel string) chan TriggerEvent {
b.creds = creds
err := b.getClientID()
if err != nil {
log.Fatal("Unable to get bayeux ClientId")
}
r := Replay{ReplayAll}
b.subscribe(topic, r)
c := b.connect()
b.subscribe(channel, r)
c := b.connect(out)
wg.Add(1)
return c
}
8 changes: 6 additions & 2 deletions example_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package bayeux

import "fmt"
import (
"fmt"
)

func Example() {
out := make(chan TriggerEvent)
replay := "-1"
b := Bayeux{}
creds := GetSalesforceCredentials()
c := b.TopicToChannel(creds, "topicName")
c := b.Channel(out, replay, creds, "channel")
for {
select {
case e := <-c:
Expand Down
6 changes: 4 additions & 2 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package main
import (
"fmt"

bay "github.com/zph/bayeux"
bay "github.com/kush-elastic/bayeux"
)

func Example() {
out := make(chan bay.TriggerEvent)
b := bay.Bayeux{}
creds := bay.GetSalesforceCredentials()
c := b.TopicToChannel(creds, "topicName")
replay := "-1"
c := b.Channel(out, replay, creds, "channel")
for {
select {
case e := <-c:
Expand Down

0 comments on commit 633e946

Please sign in to comment.