-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpaymentlog.go
88 lines (77 loc) · 1.98 KB
/
paymentlog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package paymentlog
import (
"context"
"encoding/json"
"fmt"
"os"
"time"
"cloud.google.com/go/storage"
"github.com/xeipuuv/gojsonschema"
)
// PubSubMessage represents a Pub/Sub message.
type PubSubMessage struct {
Data []byte
}
// schema used to validate the payload
var schemaData string = `
{
"$schema": "http://json-schema.org/draft/2019-09/schema#",
"type": "object",
"properties": {
"fromCustomer": { "type": "string" },
"toCustomer": { "type": "string" },
"fromAccount": { "type": "string" },
"toAccount": { "type": "string" },
"amount": { "type": "number", "minimum": 0 },
"transactionDate": { "type": "string", "format": "date-time" }
},
"required": ["fromCustomer", "toCustomer", "fromAccount", "toAccount", "amount"]
}
`
// ProcessLog receives PubSub messages from Payments Audit Log topic
func ProcessLog(ctx context.Context, m PubSubMessage) error {
err := m.validate()
if err != nil {
return err
}
err = m.save()
if err != nil {
return err
}
return nil
}
// validate the payload
func (msg *PubSubMessage) validate() error {
if !json.Valid([]byte(msg.Data)) {
return fmt.Errorf("Invalid message")
}
schemaLoader := gojsonschema.NewStringLoader(schemaData)
documentLoader := gojsonschema.NewBytesLoader(msg.Data)
result, err := gojsonschema.Validate(schemaLoader, documentLoader)
if err != nil {
return fmt.Errorf("Schema validation error %s: ", err.Error())
}
if !result.Valid() {
return fmt.Errorf("The document is not valid %s", result.Errors())
}
return nil
}
// store message in GCS
func (msg *PubSubMessage) save() error {
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
return err
}
bucketName := os.Getenv("BUCKET")
objName := time.Now().UTC().Format(time.RFC3339) + ".json"
obj := client.Bucket(bucketName).Object(objName)
wc := obj.NewWriter(ctx)
if _, err := wc.Write(msg.Data); err != nil {
return err
}
if err := wc.Close(); err != nil {
return err
}
return nil
}