This repository has been archived by the owner on Jan 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
main.go
84 lines (71 loc) · 2.18 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package main
import (
"encoding/json"
"fmt"
zmq "github.com/pebbe/zmq4"
"log"
"os"
"os/signal"
"syscall"
"flag"
)
var SocketRelayName string
var SocketPubName string
func main() {
flag.StringVar(&SocketRelayName, "r", "ipc:///tmp/pushjet-relay.ipc", "Relay socket")
flag.StringVar(&SocketPubName , "p", "ipc:///tmp/pushjet-publisher.ipc", "Publish socket")
flag.Parse()
log.Println("Starting up the publishing server")
context, _ := zmq.NewContext()
socketRelay, err := context.NewSocket(zmq.PULL)
if err != nil {
log.Fatalf("Could not create a ZeroMQ socket: %s", err)
}
socketPub, err := context.NewSocket(zmq.PUB)
if err != nil {
log.Fatalf("Could not create a ZeroMQ socket: %s", err)
}
err = socketRelay.Bind(SocketRelayName)
if err != nil {
log.Fatalf("Could not create the ZeroMQ relay socket: %s", err)
}
err = socketPub.Bind(SocketPubName)
if err != nil {
log.Fatal("Could not create the ZeroMQ publisher socket: %s", err)
}
// Catch signals
signalChannel := make(chan os.Signal, 2)
signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)
go func() {
sig := <-signalChannel
if sig == os.Interrupt || sig == syscall.SIGTERM {
socketRelay.Unbind(SocketRelayName)
socketPub.Unbind(SocketPubName)
log.Fatal("Caught signal!")
}
}()
log.Printf("Listening on '%s' and '%s'", SocketRelayName, SocketPubName)
var apiMessageRaw string
var apiMessage PushjetApiCall
for { // loop forever
apiMessage = PushjetApiCall{}
apiMessageRaw, err = socketRelay.Recv(0)
if err != nil {
continue
}
log.Println("Parsing message... ")
err = json.Unmarshal([]byte(apiMessageRaw), &apiMessage)
if err != nil {
log.Println("ERROR: Could not decode message sent by server. Skipping it; ", err)
continue
}
if apiMessage.Message.Timestamp > 0 {
log.Println("Sending out message for ", apiMessage.Message.Service.Public)
socketPub.Send(fmt.Sprintf("%s %s", apiMessage.Message.Service.Public, apiMessageRaw), 0)
}
if apiMessage.Subscription.Timestamp > 0 {
log.Println("Sending out subscription update for ", apiMessage.Subscription.Uuid)
socketPub.Send(fmt.Sprintf("%s %s", apiMessage.Subscription.Uuid, apiMessageRaw), 0)
}
}
}