-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.go
60 lines (46 loc) · 1.17 KB
/
app.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"time"
eventhub "github.com/Azure/azure-event-hubs-go"
)
func main() {
connStr := flag.String("connection-string", "", "connection string of eventhub")
flag.Parse()
envVar := os.Getenv("EVENTHUB_CONNECTION_STRING")
if *connStr == "" && envVar != "" {
connStr = &envVar
}
hub, err := eventhub.NewHubFromConnectionString(*connStr)
if err != nil {
fmt.Println(err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
handler := func(c context.Context, event *eventhub.Event) error {
fmt.Println(string(event.Data))
return nil
}
// listen to each partition of the Event Hub
runtimeInfo, err := hub.GetRuntimeInformation(ctx)
if err != nil {
fmt.Println(err)
return
}
for _, partitionID := range runtimeInfo.PartitionIDs {
_, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithPrefetchCount(10000), eventhub.ReceiveWithLatestOffset())
if err != nil {
fmt.Println(err)
return
}
}
signalChan := make(chan os.Signal)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
hub.Close(context.Background())
}