Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance DataCarrier#MultipleChannelsConsumer to add priority #8664

Merged
merged 3 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ Release Notes.
* Add Istio 1.13.1 to E2E test matrix for verification.
* Upgrade Apache parent pom version to 25.
* Use the plugin version defined by the Apache maven parent.
* Upgrade maven-dependency-plugin to 3.2.0.
* Upgrade maven-assembly-plugin to 3.3.0.
* Upgrade maven-failsafe-plugin to 2.22.2.
* Upgrade maven-surefire-plugin to 2.22.2.
* Upgrade maven-jar-plugin to 3.2.2.
* Upgrade maven-enforcer-plugin to 3.0.0.
* Upgrade maven-compiler-plugin to 3.10.0.
* Upgrade maven-resources-plugin to 3.2.0.
* Upgrade maven-source-plugin to 3.2.1.
* Upgrade maven-dependency-plugin to 3.2.0.
* Upgrade maven-assembly-plugin to 3.3.0.
* Upgrade maven-failsafe-plugin to 2.22.2.
* Upgrade maven-surefire-plugin to 2.22.2.
* Upgrade maven-jar-plugin to 3.2.2.
* Upgrade maven-enforcer-plugin to 3.0.0.
* Upgrade maven-compiler-plugin to 3.10.0.
* Upgrade maven-resources-plugin to 3.2.0.
* Upgrade maven-source-plugin to 3.2.1.
* Update codeStyle.xml to fix incompatibility on M1's IntelliJ IDEA 2021.3.2.
* Update frontend-maven-plugin to 1.12 and npm to 16.14.0 for booster UI build.

Expand All @@ -40,7 +40,8 @@ Release Notes.
name doesn't exist in TCP traffic.
* Upgrade H2 version to 2.0.206 to fix CVE-2021-23463 and GHSA-h376-j262-vhq6.
* Extend column name override mechanism working for `ValueColumnMetadata`.
* Introduce new concept `Layer` and removed `NodeType`. More details refer to [v9-version-upgrade](https://skywalking.apache.org/docs/main/latest/en/faq/v9-version-upgrade/).
* Introduce new concept `Layer` and removed `NodeType`. More details refer
to [v9-version-upgrade](https://skywalking.apache.org/docs/main/latest/en/faq/v9-version-upgrade/).
* Fix query sort metrics failure in H2 Storage.
* Bump up grpc to 1.43.2 and protobuf to 3.19.2 to fix CVE-2021-22569.
* Add source layer and dest layer to relation.
Expand All @@ -58,8 +59,10 @@ Release Notes.
* Add FreeSql component ID(3017) of dotnet agent.
* E2E: verify OAP cluster model data aggregation.
* Fix `SelfRemoteClient` self observing metrics.
* Add env variables `SW_CLUSTER_INTERNAL_COM_HOST` and `SW_CLUSTER_INTERNAL_COM_PORT` for cluster selectors `zookeeper`,`consul`,`etcd` and `nacos`.
* Doc update: `configuration-vocabulary`,`backend-cluster` about env variables `SW_CLUSTER_INTERNAL_COM_HOST` and `SW_CLUSTER_INTERNAL_COM_PORT`.
* Add env variables `SW_CLUSTER_INTERNAL_COM_HOST` and `SW_CLUSTER_INTERNAL_COM_PORT` for cluster selectors `zookeeper`
,`consul`,`etcd` and `nacos`.
* Doc update: `configuration-vocabulary`,`backend-cluster` about env variables `SW_CLUSTER_INTERNAL_COM_HOST`
and `SW_CLUSTER_INTERNAL_COM_PORT`.
* Add Python MysqlClient component ID(7013) with mapping information.
* Support Java thread pool metrics analysis.
* Fix IoTDB Storage Option insert null index value.
Expand All @@ -70,30 +73,38 @@ Release Notes.
* Add OpenFunction component ID(5013).
* Expose configuration `responseTimeout` of ES client.
* Support datasource metric analysis.
* [Break Change] Keep the endpoint avg resp time meter name the same with others scope. (This may break 3rd party integration and existing alarm rule settings)
* [Break Change] Keep the endpoint avg resp time meter name the same with others scope. (This may break 3rd party
integration and existing alarm rule settings)
* Add Python FastApi component ID(7014).
* Support all metrics from MAL engine in alarm core, including Prometheus, OC receiver, meter receiver.
* Allow updating non-metrics templates when structure changed.
* Set default connection timeout of ElasticSearch to 3000 milliseconds.
* Support ElasticSearch 8 and add it into E2E tests.
* Disable indexing for field `alarm_record.tags_raw_data` of binary type in ElasticSearch storage.
* Fix Zipkin receiver wrong condition for decoding `gzip`.
* Fix Zipkin receiver wrong condition for decoding `gzip`.
* Add a new sampler (`possibility`) in LAL.
* Unify module name `receiver_zipkin` to `receiver-zipkin`, remove `receiver_jaeger` from `application.yaml`.
* Unify module name `receiver_zipkin` to `receiver-zipkin`, remove `receiver_jaeger` from `application.yaml`.
* Introduce the entity of Process type.
* Set the length of event#parameters to 2000.
* Limit the length of Event#parameters.
* Support large service/instance/networkAddressAlias list query by using ElasticSearch scrolling API, add `metadataQueryBatchSize` to configure scrolling page size.
* Support large service/instance/networkAddressAlias list query by using ElasticSearch scrolling API,
add `metadataQueryBatchSize` to configure scrolling page size.
* Change default value of `metadataQueryMaxSize` from `5000` to `10000`
* Replace deprecated Armeria API `BasicToken.of` with `AuthToken.ofBasic`.
* Implement v9 UI template management protocol.
* Implement process metadata query protocol.
* Expose more ElasticSearch health check related logs to help to diagnose `Health check fails. reason: No healthy endpoint`.
* Expose more ElasticSearch health check related logs to help to
diagnose `Health check fails. reason: No healthy endpoint`.
* Add source `event` generated metrics to SERVICE_CATALOG_NAME catalog.
* [Breaking Change] Deprecate `All` from OAL source.
* [Breaking Change] Remove `SRC_ALL: 'All'` from OAL grammar tree.
* Remove `all_heatmap` and `all_percentile` metrics.
* Fix es normal index couldn't apply mapping and update.
* Fix ElasticSearch normal index couldn't apply mapping and update.
* Enhance DataCarrier#MultipleChannelsConsumer to add priority for the channels, which makes OAP server has a better
performance to activate all analyzers on default.
* Activate `receiver-otel#enabledOcRules` receiver with `k8s-cluster,k8s-node,k8s-service,oap,vm` rules on default.
* Activate `satellite,spring-sleuth` for `agent-analyzer#meterAnalyzerActiveFiles` on default.
* Activate `receiver-zabbix` receiver with `agent` rule on default.

#### UI

Expand Down
13 changes: 10 additions & 3 deletions docs/en/setup/backend/backend-load-balancer.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
# Backend Load Balancer

When set the Agent or Envoy connecting to OAP server directly as in default, OAP server cluster would face the problem of OAP load imbalance. This issue would be very serious in high traffic load scenarios.
Satellite is recommended to be used as a native gateway proxy, to provide load balancing capabilities for data content before the data from Agent/Envoy reaches the OAP. The major difference between Satellite and other general wide used proxy(s), like Envoy, is that, Satellite would routine the data accordingly to contents rather than connection, as gRPC streaming is used widely in SkyWalking.
When set the Agent or Envoy connecting to OAP server directly as in default, OAP server cluster would face the problem
of OAP load imbalance. This issue would be very serious in high traffic load scenarios. Satellite is recommended to be
used as a native gateway proxy, to provide load balancing capabilities for data content before the data from Agent/Envoy
reaches the OAP. The major difference between Satellite and other general wide used proxy(s), like Envoy, is that,
Satellite would route the data accordingly to contents rather than connection, as gRPC streaming is used widely in
SkyWalking.

Follow instructions in the [Setup SkyWalking Satellite](https://skywalking.apache.org/docs/#SkyWalkingSatellite)
to deploy Satellite and connect your application to the satellite.
to deploy Satellite and connect your application to the satellite.

[Scaling with Apache SkyWalking](https://skywalking.apache.org/blog/2022-01-24-scaling-with-apache-skywalking/) blog
introduces the theory and technology details how to set up load balancer for the OAP cluster.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public class BulkConsumePool implements ConsumerPool {
private List<MultipleChannelsConsumer> allConsumers;
private volatile boolean isStarted = false;

public BulkConsumePool(String name, int size, long consumeCycle) {
private BulkConsumePool(String name, int size, long consumeCycle) {
size = EnvUtil.getInt(name + "_THREAD", size);
allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
allConsumers = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
multipleChannelsConsumer.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class MultipleChannelsConsumer extends Thread {

public MultipleChannelsConsumer(String threadName, long consumeCycle) {
super(threadName);
this.consumeTargets = new ArrayList<Group>();
this.consumeTargets = new ArrayList<>();
this.consumeCycle = consumeCycle;
}

Expand All @@ -48,7 +48,7 @@ public void run() {
while (running) {
boolean hasData = false;
for (Group target : consumeTargets) {
boolean consume = consume(target, consumeList);
boolean consume = target.consume(consumeList);
hasData = hasData || consume;
}

Expand All @@ -63,30 +63,8 @@ public void run() {
// consumer thread is going to stop
// consume the last time
for (Group target : consumeTargets) {
consume(target, consumeList);

target.consumer.onExit();
}
}

private boolean consume(Group target, List consumeList) {
for (int i = 0; i < target.channels.getChannelSize(); i++) {
QueueBuffer buffer = target.channels.getBuffer(i);
buffer.obtain(consumeList);
}

if (!consumeList.isEmpty()) {
try {
target.consumer.consume(consumeList);
} catch (Throwable t) {
target.consumer.onError(consumeList, t);
} finally {
consumeList.clear();
}
return true;
target.consume(consumeList);
}
target.consumer.nothingToConsume();
return false;
}

/**
Expand All @@ -96,9 +74,7 @@ public void addNewTarget(Channels channels, IConsumer consumer) {
Group group = new Group(channels, consumer);
// Recreate the new list to avoid change list while the list is used in consuming.
ArrayList<Group> newList = new ArrayList<Group>();
for (Group target : consumeTargets) {
newList.add(target);
}
newList.addAll(consumeTargets);
newList.add(group);
consumeTargets = newList;
size += channels.size();
Expand All @@ -115,10 +91,86 @@ void shutdown() {
private static class Group {
private Channels channels;
private IConsumer consumer;

public Group(Channels channels, IConsumer consumer) {
/**
* Priority determines the consuming strategy. On default every period consumer thread loops all groups trying
* to fetch the data from queue, if the queue only contains few elements, it is too expensive to consume every
* time.
*
* if 'size of last fetched data' > 0
*
* priority = 'size of last fetched data' * 100 / {@link Channels#size()} * {@link Channels#getChannelSize()}
*
* else
*
* priority = priority / 2
*
* Meaning, priority is the load factor of {@link #channels}
*
* After consuming loop, priority = (priority of current loop + priority of last loop) / 2.
*
* If priority > 50, consuming happens in next loop, otherwise, priority += 10, and wait until priority > 50. In
* worth case, for a low traffic group, consuming happens in 1/10.
*
* Priority only exists in {@link MultipleChannelsConsumer}, because it has limited threads but has to consume
* from a large set of queues.
*
* @since 9.0.0
*/
private int priority;
private short continuousNoDataCount;

private Group(Channels channels, IConsumer consumer) {
this.channels = channels;
this.consumer = consumer;
this.priority = 0;
this.continuousNoDataCount = 0;
}

/**
* @return false if there is no data to consume, or priority is too low. Read {@link #priority} for more
* details.
* @since 9.0.0
*/
private boolean consume(List consumeList) {
try {
if (priority < 50) {
priority += 10;
return false;
}

for (int i = 0; i < channels.getChannelSize(); i++) {
QueueBuffer buffer = channels.getBuffer(i);
buffer.obtain(consumeList);
}

if (!consumeList.isEmpty()) {
priority = (priority + (int) (consumeList.size() * 100 / channels.getChannelSize() * channels.size())) / 2;
try {
consumer.consume(consumeList);
} catch (Throwable t) {
consumer.onError(consumeList, t);
} finally {
consumeList.clear();
}
continuousNoDataCount = 0;
return true;
} else {
if (continuousNoDataCount < 5) {
continuousNoDataCount++;
// For low traffic queue (low traffic means occasionally no data
// cut priority to half to reduce consuming period.
priority /= 2;
} else {
// For cold queue, the consuming happens in 1/10;
priority = -50;
}
}

consumer.nothingToConsume();
return false;
} finally {
consumer.onExit();
}
}
}
}
8 changes: 4 additions & 4 deletions oap-server/server-starter/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ agent-analyzer:
# Nginx and Envoy agents can't get the real remote address.
# Exit spans with the component in the list would not generate the client-side instance relation metrics.
noUpstreamRealAddressAgents: ${SW_NO_UPSTREAM_REAL_ADDRESS:6000,9000}
meterAnalyzerActiveFiles: ${SW_METER_ANALYZER_ACTIVE_FILES:datasource,threadpool} # Which files could be meter analyzed, files split by ","
meterAnalyzerActiveFiles: ${SW_METER_ANALYZER_ACTIVE_FILES:datasource,threadpool,satellite,spring-sleuth} # Which files could be meter analyzed, files split by ","

log-analyzer:
selector: ${SW_LOG_ANALYZER:default}
Expand Down Expand Up @@ -332,7 +332,7 @@ receiver-profile:
default:

receiver-zabbix:
selector: ${SW_RECEIVER_ZABBIX:-}
selector: ${SW_RECEIVER_ZABBIX:default}
default:
port: ${SW_RECEIVER_ZABBIX_PORT:10051}
host: ${SW_RECEIVER_ZABBIX_HOST:0.0.0.0}
Expand Down Expand Up @@ -379,10 +379,10 @@ receiver-meter:
default:

receiver-otel:
selector: ${SW_OTEL_RECEIVER:-}
selector: ${SW_OTEL_RECEIVER:default}
default:
enabledHandlers: ${SW_OTEL_RECEIVER_ENABLED_HANDLERS:"oc"}
enabledOcRules: ${SW_OTEL_RECEIVER_ENABLED_OC_RULES:"istio-controlplane"}
enabledOcRules: ${SW_OTEL_RECEIVER_ENABLED_OC_RULES:"istio-controlplane,k8s-cluster,k8s-node,k8s-service,oap,vm"}

receiver-zipkin:
selector: ${SW_RECEIVER_ZIPKIN:-}
Expand Down