forked from envoyproxy/envoy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hot_restarting_parent.cc
231 lines (204 loc) · 9.86 KB
/
hot_restarting_parent.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
#include "source/server/hot_restarting_parent.h"
#include "envoy/server/instance.h"
#include "source/common/memory/stats.h"
#include "source/common/network/utility.h"
#include "source/common/stats/stat_merger.h"
#include "source/common/stats/symbol_table.h"
#include "source/common/stats/utility.h"
namespace Envoy {
namespace Server {
using HotRestartMessage = envoy::HotRestartMessage;
HotRestartingParent::HotRestartingParent(int base_id, int restart_epoch,
const std::string& socket_path, mode_t socket_mode)
: HotRestartingBase(base_id), restart_epoch_(restart_epoch) {
std::string socket_path_udp = socket_path + "_udp";
child_address_ = main_rpc_stream_.createDomainSocketAddress(restart_epoch_ + 1, "child",
socket_path, socket_mode);
child_address_udp_forwarding_ = udp_forwarding_rpc_stream_.createDomainSocketAddress(
restart_epoch_ + 1, "child", socket_path_udp, socket_mode);
main_rpc_stream_.bindDomainSocket(restart_epoch_, "parent", socket_path, socket_mode);
udp_forwarding_rpc_stream_.bindDomainSocket(restart_epoch_, "parent", socket_path_udp,
socket_mode);
}
void HotRestartingParent::sendHotRestartMessage(envoy::HotRestartMessage&& msg) {
ASSERT(dispatcher_.has_value());
dispatcher_->post([this, msg = std::move(msg)]() {
udp_forwarding_rpc_stream_.sendHotRestartMessage(child_address_udp_forwarding_, std::move(msg));
});
}
// Network::NonDispatchedUdpPacketHandler
void HotRestartingParent::Internal::handle(uint32_t worker_index,
const Network::UdpRecvData& packet) {
envoy::HotRestartMessage msg;
auto* packet_msg = msg.mutable_request()->mutable_forwarded_udp_packet();
packet_msg->set_local_addr(Network::Utility::urlFromDatagramAddress(*packet.addresses_.local_));
packet_msg->set_peer_addr(Network::Utility::urlFromDatagramAddress(*packet.addresses_.peer_));
packet_msg->set_receive_time_epoch_microseconds(
std::chrono::duration_cast<std::chrono::microseconds>(packet.receive_time_.time_since_epoch())
.count());
*packet_msg->mutable_payload() = packet.buffer_->toString();
packet_msg->set_worker_index(worker_index);
udp_sender_.sendHotRestartMessage(std::move(msg));
}
void HotRestartingParent::initialize(Event::Dispatcher& dispatcher, Server::Instance& server) {
socket_event_ = dispatcher.createFileEvent(
main_rpc_stream_.domain_socket_,
[this](uint32_t events) {
ASSERT(events == Event::FileReadyType::Read);
onSocketEvent();
return absl::OkStatus();
},
Event::FileTriggerType::Edge, Event::FileReadyType::Read);
dispatcher_ = dispatcher;
internal_ = std::make_unique<Internal>(&server, *this);
}
void HotRestartingParent::onSocketEvent() {
std::unique_ptr<HotRestartMessage> wrapped_request;
while ((wrapped_request = main_rpc_stream_.receiveHotRestartMessage(RpcStream::Blocking::No))) {
if (wrapped_request->requestreply_case() == HotRestartMessage::kReply) {
ENVOY_LOG(error, "child sent us a HotRestartMessage reply (we want requests); ignoring.");
HotRestartMessage wrapped_reply;
wrapped_reply.set_didnt_recognize_your_last_message(true);
main_rpc_stream_.sendHotRestartMessage(child_address_, wrapped_reply);
continue;
}
switch (wrapped_request->request().request_case()) {
case HotRestartMessage::Request::kShutdownAdmin: {
main_rpc_stream_.sendHotRestartMessage(child_address_, internal_->shutdownAdmin());
break;
}
case HotRestartMessage::Request::kPassListenSocket: {
main_rpc_stream_.sendHotRestartMessage(
child_address_, internal_->getListenSocketsForChild(wrapped_request->request()));
break;
}
case HotRestartMessage::Request::kStats: {
HotRestartMessage wrapped_reply;
internal_->exportStatsToChild(wrapped_reply.mutable_reply()->mutable_stats());
main_rpc_stream_.sendHotRestartMessage(child_address_, wrapped_reply);
break;
}
case HotRestartMessage::Request::kDrainListeners: {
internal_->drainListeners();
break;
}
case HotRestartMessage::Request::kTerminate: {
ENVOY_LOG(info, "shutting down due to child request");
kill(getpid(), SIGTERM);
break;
}
case HotRestartMessage::Request::kTestConnection: {
break;
}
default: {
ENVOY_LOG(error, "child sent us an unfamiliar type of HotRestartMessage; ignoring.");
HotRestartMessage wrapped_reply;
wrapped_reply.set_didnt_recognize_your_last_message(true);
main_rpc_stream_.sendHotRestartMessage(child_address_, wrapped_reply);
break;
}
}
}
}
void HotRestartingParent::shutdown() { socket_event_.reset(); }
HotRestartingParent::Internal::Internal(Server::Instance* server,
HotRestartMessageSender& udp_sender)
: server_(server), udp_sender_(udp_sender) {
Stats::Gauge& hot_restart_generation = hotRestartGeneration(*server->stats().rootScope());
hot_restart_generation.inc();
}
HotRestartMessage HotRestartingParent::Internal::shutdownAdmin() {
server_->shutdownAdmin();
HotRestartMessage wrapped_reply;
wrapped_reply.mutable_reply()->mutable_shutdown_admin()->set_original_start_time_unix_seconds(
server_->startTimeFirstEpoch());
wrapped_reply.mutable_reply()->mutable_shutdown_admin()->set_enable_reuse_port_default(
server_->enableReusePortDefault());
return wrapped_reply;
}
HotRestartMessage
HotRestartingParent::Internal::getListenSocketsForChild(const HotRestartMessage::Request& request) {
HotRestartMessage wrapped_reply;
wrapped_reply.mutable_reply()->mutable_pass_listen_socket()->set_fd(-1);
Network::Address::InstanceConstSharedPtr addr =
THROW_OR_RETURN_VALUE(Network::Utility::resolveUrl(request.pass_listen_socket().address()),
Network::Address::InstanceConstSharedPtr);
for (const auto& listener : server_->listenerManager().listeners()) {
for (auto& socket_factory : listener.get().listenSocketFactories()) {
if (*socket_factory->localAddress() == *addr && listener.get().bindToPort()) {
StatusOr<Network::Socket::Type> socket_type =
Network::Utility::socketTypeFromUrl(request.pass_listen_socket().address());
// socketTypeFromUrl should return a valid value since resolveUrl returned a valid address.
ASSERT(socket_type.ok());
if (socket_factory->socketType() == *socket_type) {
// worker_index() will default to 0 if not set which is the behavior before this field
// was added. Thus, this should be safe for both roll forward and roll back.
if (request.pass_listen_socket().worker_index() < server_->options().concurrency()) {
wrapped_reply.mutable_reply()->mutable_pass_listen_socket()->set_fd(
socket_factory->getListenSocket(request.pass_listen_socket().worker_index())
->ioHandle()
.fdDoNotUse());
}
break;
}
}
}
}
return wrapped_reply;
}
// TODO(fredlas) if there are enough stats for stat name length to become an issue, this current
// implementation can negate the benefit of symbolized stat names by periodically reaching the
// magnitude of memory usage that they are meant to avoid, since this map holds full-string
// names. The problem can be solved by splitting the export up over many chunks.
void HotRestartingParent::Internal::exportStatsToChild(HotRestartMessage::Reply::Stats* stats) {
server_->stats().forEachSinkedGauge(nullptr, [this, stats](Stats::Gauge& gauge) mutable {
if (gauge.used()) {
const std::string name = gauge.name();
(*stats->mutable_gauges())[name] = gauge.value();
recordDynamics(stats, name, gauge.statName());
}
});
server_->stats().forEachSinkedCounter(nullptr, [this, stats](Stats::Counter& counter) mutable {
if (counter.used()) {
// The hot restart parent is expected to have stopped its normal stat exporting (and so
// latching) by the time it begins exporting to the hot restart child.
uint64_t latched_value = counter.latch();
if (latched_value > 0) {
const std::string name = counter.name();
(*stats->mutable_counter_deltas())[name] = latched_value;
recordDynamics(stats, name, counter.statName());
}
}
});
stats->set_memory_allocated(Memory::Stats::totalCurrentlyAllocated());
stats->set_num_connections(server_->listenerManager().numConnections());
}
void HotRestartingParent::Internal::recordDynamics(HotRestartMessage::Reply::Stats* stats,
const std::string& name,
Stats::StatName stat_name) {
// Compute an array of spans describing which components of the stat name are
// dynamic. This is needed so that when the child recovers the StatName, it
// correlates with how the system generates those stats, with the same exact
// components using a dynamic representation.
//
// See https://github.com/envoyproxy/envoy/issues/9874 for more details.
Stats::DynamicSpans spans = server_->stats().symbolTable().getDynamicSpans(stat_name);
// Convert that C++ structure (controlled by stat_merger.cc) into a protobuf
// for serialization.
if (!spans.empty()) {
HotRestartMessage::Reply::RepeatedSpan spans_proto;
for (const Stats::DynamicSpan& span : spans) {
HotRestartMessage::Reply::Span* span_proto = spans_proto.add_spans();
span_proto->set_first(span.first);
span_proto->set_last(span.second);
}
(*stats->mutable_dynamics())[name] = spans_proto;
}
}
void HotRestartingParent::Internal::drainListeners() {
Network::ExtraShutdownListenerOptions options;
options.non_dispatched_udp_packet_handler_ = *this;
server_->drainListeners(options);
}
} // namespace Server
} // namespace Envoy