-
Notifications
You must be signed in to change notification settings - Fork 0
/
protoconfloader.go
334 lines (298 loc) · 10.8 KB
/
protoconfloader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
package protoconf_loader
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"sync"
pc "github.com/protoconf/protoconf/agent/api/proto/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/fsnotify/fsnotify"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
const (
AgentDefaultAddress = ":4300"
)
type Configuration struct {
msg proto.Message
configPath string
logger *slog.Logger
isLoaded *sync.Once
isWatchingFile *sync.Once
isWatchingAgent *sync.Once
configFile string
fsnotifyWatcher *fsnotify.Watcher
mu sync.RWMutex
UnmarshalOptions protojson.UnmarshalOptions
quit chan bool
CancelWatcher context.CancelFunc
Host string
Port int
agentStub pc.ProtoconfServiceClient
onConfigChange func(p proto.Message)
}
type Option func(*Configuration)
func WithAgentStub(stub pc.ProtoconfServiceClient) Option {
return func(c *Configuration) {
c.agentStub = stub
}
}
func WithLogger(logger *slog.Logger) Option {
return func(c *Configuration) {
c.logger = logger
}
}
// NewConfiguration creates a new Configuration instance with the given proto.Message,
// config path and optional options.
// It initializes the fsnotify watcher, sets the unmarshal options, and initializes other fields.
// If any error occurs during the watcher creation, it returns an error.
func NewConfiguration(p proto.Message, configPath string, opts ...Option) (*Configuration, error) {
fsnotifyWatcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
config := &Configuration{
msg: p,
configPath: configPath,
logger: slog.Default(),
isLoaded: &sync.Once{},
isWatchingFile: &sync.Once{},
isWatchingAgent: &sync.Once{},
mu: sync.RWMutex{},
fsnotifyWatcher: fsnotifyWatcher,
UnmarshalOptions: protojson.UnmarshalOptions{
DiscardUnknown: true,
},
quit: make(chan bool),
onConfigChange: nil,
}
for _, opt := range opts {
opt(config)
}
return config, nil
}
// LoadConfig loads the configuration from the specified configPath and configName.
// If the configuration is already loaded, it returns nil without doing anything.
// It sets the configFile field to the joined path of configPath and configName,
// then calls the loadConfig method to actually load the configuration.
// Finally, it sets the isLoaded field to true and returns nil.
// If there is an error during loading the configuration, it returns the error.
func (c *Configuration) LoadConfig(configPath string, configName string) error {
var err error
c.isLoaded.Do(func() {
c.configFile = filepath.Join(configPath, configName)
err = c.loadConfig()
if err != nil {
c.isLoaded = new(sync.Once)
}
})
return err
}
// WatchConfig starts watching the configuration file and the agent for changes.
// It returns an error if the configuration is not loaded yet.
// It watches for file changes and agent updates using separate goroutines.
// The method logs the successful start of watching and returns nil upon successful completion.
func (c *Configuration) WatchConfig(ctx context.Context) error {
// Watch config file changes
var err error
var WatcherCtx context.Context
WatcherCtx, c.CancelWatcher = context.WithCancel(ctx)
c.isWatchingFile.Do(func() {
err = c.watchFileChanges(WatcherCtx)
if err != nil {
c.isWatchingFile = new(sync.Once)
}
})
if err != nil {
return err
}
// Watch agent changes
c.isWatchingAgent.Do(func() {
err = c.listenToChanges(c.configPath, WatcherCtx)
if err != nil {
c.isWatchingAgent = new(sync.Once)
}
})
if err != nil {
return err
}
c.logger.Info(
"Successfully watching config",
slog.String("watching_agent_path", c.configPath),
slog.String("watching_file", c.configFile))
return nil
}
// watchFileChanges is a method of the Configuration struct that starts watching the configuration file for changes.
// It adds the configuration file to the fsnotifyWatcher and starts a goroutine to handle file events.
// When a write event occurs, it calls the loadConfig method to reload the configuration.
// If there is an error while watching or loading the configuration file, it logs the error.
// The method returns an error if there is an error adding the file to the fsnotifyWatcher.
func (c *Configuration) watchFileChanges(ctx context.Context) error {
err := c.fsnotifyWatcher.Add(c.configFile)
if err != nil {
return err
}
go func() {
for {
select {
case <-ctx.Done():
return
case event, ok := <-c.fsnotifyWatcher.Events:
if !ok {
c.logger.Error("error while watching config file", slog.Any("event", event))
c.isWatchingFile = new(sync.Once)
return
}
if event.Op&fsnotify.Write == fsnotify.Write {
if err := c.loadConfig(); err != nil {
c.logger.Error("error while watching and loading config file", slog.Any("error", err))
}
}
case err := <-c.fsnotifyWatcher.Errors:
c.logger.Error("error while watching config file", slog.Any("error", err))
return
}
}
}()
return nil
}
// StopWatching stops watching the configuration file and the agent for changes.
// It closes the fsnotifyWatcher and cancels the context for the agent connection.
func (c *Configuration) StopWatching() {
if c.isWatchingFile != nil {
c.fsnotifyWatcher.Close()
}
c.CancelWatcher()
c.isWatchingAgent = new(sync.Once)
}
// LoadConfig loads the configuration from the specified configPath and configName.
// If the configuration is already loaded, it returns nil.
// It sets the configFile field to the joined path of configPath and configName,
// then calls the loadConfig method to actually load the configuration.
// Finally, it sets the isLoaded field to true and returns nil.
// If there is an error during loading the configuration, it returns the error.
func (c *Configuration) loadConfig() error {
var (
ErrReadConfigFile = errors.New("error reading config file")
ErrUnmarshalConfig = errors.New("error unmarshaling config")
)
configReader, err := os.ReadFile(c.configFile)
if err != nil {
c.logger.Error("error reading config file", slog.Any("error", err))
return ErrReadConfigFile
}
c.mu.Lock()
defer c.mu.Unlock()
err = c.UnmarshalOptions.Unmarshal(configReader, c.msg)
if err != nil {
c.logger.Error("error unmarshaling config file", slog.Any("error", err))
return ErrUnmarshalConfig
}
if c.onConfigChange != nil {
c.onConfigChange(c.msg)
}
return nil
}
// listenToChanges is a method of the Configuration struct that establishes a connection to a server using gRPC and subscribes to receive configuration updates.
// It takes a path string and a context.Context as parameters.
// It first gets the hostname to use for the connection by calling the getHostname method.
// Then it dials the server using the obtained address and insecure transport credentials.
// If there is an error while dialing, it logs the error and returns it.
// It creates a new ProtoconfServiceClient using the connection.
// It creates a new context with cancellation capability using the provided context.
// It subscribes for configuration updates by calling the SubscribeForConfig method of the ProtoconfServiceClient.
// If there is an error while subscribing, it logs the error and returns it.
// It starts a goroutine to handle the received configuration updates by calling the handleConfigUpdates method.
// Finally, it returns nil.
func (c *Configuration) listenToChanges(path string, ctx context.Context) error {
psc := c.agentStub
if psc == nil {
address := c.getHostname()
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
c.logger.Error("Error connecting to server ", slog.String("address", address), slog.Any("error", err))
return err
}
psc = pc.NewProtoconfServiceClient(conn)
}
stream, err := psc.SubscribeForConfig(ctx, &pc.ConfigSubscriptionRequest{Path: path})
if err != nil {
c.logger.Error("Error subscribing for config", slog.String("path", path), slog.Any("error", err))
return err
}
go c.handleConfigUpdates(stream, path)
return nil
}
// handleConfigUpdates listens for changes to the configuration and invokes the OnConfigChange function.
func (c *Configuration) handleConfigUpdates(stream pc.ProtoconfService_SubscribeForConfigClient, path string) {
for {
select {
case <-c.quit:
c.logger.Info("Stopping listening to changes due to quit signal")
return // Exit the goroutine gracefully
default:
// Read the next update from the stream
update, err := stream.Recv()
if err == io.EOF {
c.logger.Error("Connection closed while streaming config path", slog.String("path", path))
return
}
if err != nil {
c.logger.Error("Error unmarshaling config", slog.String("path", path), slog.Any("error", err))
}
// Unmarshal the update into the configuration
c.mu.Lock()
err = update.GetValue().UnmarshalTo(c.msg)
c.mu.Unlock()
if err != nil {
c.logger.Error("Error while streaming config path", slog.String("path", path), slog.Any("error", err))
// Implement appropriate error handling here
continue // Continue to the next iteration
}
// Invoke the OnConfigChange function with the updated configuration
if c.onConfigChange != nil {
c.onConfigChange(c.msg)
}
}
}
}
// OnConfigChange sets the event handler that is called when a config file changes.
func (c *Configuration) OnConfigChange(run func(p proto.Message)) {
c.onConfigChange = run
}
// Atomic executes the given function atomically.
func (c *Configuration) WithLock(f func() error) error {
c.mu.RLock()
defer c.mu.RUnlock()
return f()
}
// Get returns the result of the given function.
func Get[T any](c *Configuration, f func() T) T {
c.mu.Lock()
defer c.mu.Unlock()
return f()
}
func (c *Configuration) Int32(f func() int32) int32 { return Get(c, f) }
func (c *Configuration) Int64(f func() int64) int64 { return Get(c, f) }
func (c *Configuration) UInt32(f func() uint32) uint32 { return Get(c, f) }
func (c *Configuration) UInt64(f func() uint64) uint64 { return Get(c, f) }
func (c *Configuration) Float32(f func() float32) float32 { return Get(c, f) }
func (c *Configuration) Float64(f func() float64) float64 { return Get(c, f) }
func (c *Configuration) Bool(f func() bool) bool { return Get(c, f) }
func (c *Configuration) String(f func() string) string { return Get(c, f) }
func (c *Configuration) Any(f func() any) any { return Get(c, f) }
// getHostname returns the hostname to use for the agent connection.
// It uses the Host and Port fields if they are set, otherwise it uses the default address.
func (c *Configuration) getHostname() string {
address := fmt.Sprintf("%v:%v", c.Host, c.Port)
// Use default if not supplied
if address == ":0" {
address = AgentDefaultAddress
}
return address
}