-
Notifications
You must be signed in to change notification settings - Fork 149
/
Copy pathmanager_shipper.go
127 lines (116 loc) · 3.76 KB
/
manager_shipper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package runtime
import (
"fmt"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent/internal/pkg/core/authority"
"github.com/elastic/elastic-agent/pkg/component"
)
func (m *Manager) connectShippers(components []component.Component) error {
// ensure that all shipper components have created connection information (must happen before we connect the units)
shippersTouched := make(map[string]bool)
for i, comp := range components {
if comp.ShipperSpec != nil {
// running shipper (ensure connection information is created)
shippersTouched[comp.ID] = true
conn, ok := m.shipperConns[comp.ID]
if !ok {
ca, err := authority.NewCA()
if err != nil {
return fmt.Errorf("failed to create connection CA for shipper %q: %w", comp.ID, err)
}
conn = &shipperConn{
addr: getShipperAddr(comp.ID),
ca: ca,
pairs: make(map[string]*authority.Pair),
}
m.shipperConns[comp.ID] = conn
}
// each input unit needs its corresponding
pairsTouched := make(map[string]bool)
for j, unit := range comp.Units {
if unit.Type == client.UnitTypeInput {
pairsTouched[unit.ID] = true
pair, err := pairGetOrCreate(conn, unit.ID)
if err != nil {
return fmt.Errorf("failed to get/create certificate pait for shipper %q/%q: %w", comp.ID, unit.ID, err)
}
cfg, cfgErr := injectShipperConn(unit.Config, conn.addr, conn.ca, pair)
unit.Config = cfg
unit.Err = cfgErr
comp.Units[j] = unit
}
}
// cleanup any pairs that are no-longer used
for pairID := range conn.pairs {
touch, _ := pairsTouched[pairID]
if !touch {
delete(conn.pairs, pairID)
}
}
components[i] = comp
}
}
// cleanup any shippers that are no-longer used
for shipperID := range m.shipperConns {
touch, _ := shippersTouched[shipperID]
if !touch {
delete(m.shipperConns, shipperID)
}
}
// connect the output units with the same connection information
for i, comp := range components {
if comp.Shipper != nil {
conn, ok := m.shipperConns[comp.Shipper.ComponentID]
if !ok {
return fmt.Errorf("component %q references a non-existing shipper %q", comp.ID, comp.Shipper.ComponentID)
}
pair, ok := conn.pairs[comp.ID]
if !ok {
return fmt.Errorf("component %q references shipper %q that doesn't know about the component", comp.ID, comp.Shipper.ComponentID)
}
for j, unit := range comp.Units {
if unit.Type == client.UnitTypeOutput {
cfg, cfgErr := injectShipperConn(unit.Config, conn.addr, conn.ca, pair)
unit.Config = cfg
unit.Err = cfgErr
comp.Units[j] = unit
}
}
components[i] = comp
}
}
return nil
}
func pairGetOrCreate(conn *shipperConn, pairID string) (*authority.Pair, error) {
var err error
pair, ok := conn.pairs[pairID]
if ok {
return pair, nil
}
pair, err = conn.ca.GeneratePairWithName(pairID)
if err != nil {
return nil, err
}
conn.pairs[pairID] = pair
return pair, nil
}
func injectShipperConn(cfg *proto.UnitExpectedConfig, addr string, ca *authority.CertificateAuthority, pair *authority.Pair) (*proto.UnitExpectedConfig, error) {
if cfg == nil {
// unit configuration had an error generating (do nothing)
return cfg, nil
}
source := cfg.Source.AsMap()
source["server"] = addr
source["ssl"] = map[string]interface{}{
"certificate_authorities": []interface{}{
string(ca.Crt()),
},
"certificate": string(pair.Crt),
"key": string(pair.Key),
}
return component.ExpectedConfig(source)
}