Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue688: Make maxParamByteSize configurable in ParamFlowRequestDataWriter of cluster client module #823

Merged
merged 9 commits into from
Jun 13, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
*/
package com.alibaba.csp.sentinel.cluster.client.codec.data;

import java.util.Collection;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.AssertUtil;

import io.netty.buffer.ByteBuf;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
* @author jialiang.linjl
* @author Eric Zhao
Expand All @@ -50,41 +51,67 @@ public void writeTo(ParamFlowRequestData entity, ByteBuf target) {

Collection<Object> params = entity.getParams();

// Write parameter amount.
int amount = calculateParamAmount(params);
target.writeInt(amount);
params = resolveValidParams(params);
target.writeInt(params.size());

// Serialize parameters with type flag.
for (Object param : entity.getParams()) {
for (Object param : params) {
encodeValue(param, target);
}
}

/**
* Get valid parameters in provided parameter list
*
* @param params
* @return
*/
public List<Object> resolveValidParams(Collection<Object> params) {
List<Object> validParams = new ArrayList<>();
int size = 0;
for (Object param : params) {
int s = calculateParamTransportSize(param);
if (s <= 0) {
RecordLog.warn("[ParamFlowRequestDataWriter] WARN: Non-primitive type detected in params of "
+ "cluster parameter flow control, which is not supported: " + param);
continue;
}
if (size + s > maxParamByteSize) {
RecordLog.warn("[ParamFlowRequestDataWriter] WARN: params size is too big." +
" the configure value is : " + maxParamByteSize + ", the params size is: " + params.size());
break;
}
size += s;
validParams.add(param);
}
return validParams;
}

private void encodeValue(Object param, ByteBuf target) {
// Handle primitive type.
if (param instanceof Integer || int.class.isInstance(param)) {
target.writeByte(ClusterConstants.PARAM_TYPE_INTEGER);
target.writeInt((Integer)param);
target.writeInt((Integer) param);
} else if (param instanceof String) {
encodeString((String)param, target);
encodeString((String) param, target);
} else if (boolean.class.isInstance(param) || param instanceof Boolean) {
target.writeByte(ClusterConstants.PARAM_TYPE_BOOLEAN);
target.writeBoolean((Boolean)param);
target.writeBoolean((Boolean) param);
} else if (long.class.isInstance(param) || param instanceof Long) {
target.writeByte(ClusterConstants.PARAM_TYPE_LONG);
target.writeLong((Long)param);
target.writeLong((Long) param);
} else if (double.class.isInstance(param) || param instanceof Double) {
target.writeByte(ClusterConstants.PARAM_TYPE_DOUBLE);
target.writeDouble((Double)param);
target.writeDouble((Double) param);
} else if (float.class.isInstance(param) || param instanceof Float) {
target.writeByte(ClusterConstants.PARAM_TYPE_FLOAT);
target.writeFloat((Float)param);
target.writeFloat((Float) param);
} else if (byte.class.isInstance(param) || param instanceof Byte) {
target.writeByte(ClusterConstants.PARAM_TYPE_BYTE);
target.writeByte((Byte)param);
target.writeByte((Byte) param);
} else if (short.class.isInstance(param) || param instanceof Short) {
target.writeByte(ClusterConstants.PARAM_TYPE_SHORT);
target.writeShort((Short)param);
target.writeShort((Short) param);
} else {
// Unexpected type, drop.
}
Expand All @@ -97,30 +124,6 @@ private void encodeString(String param, ByteBuf target) {
target.writeBytes(tmpChars);
}

/**
* Calculate amount of valid parameters in provided parameter list.
*
* @param params non-empty parameter list
* @return amount of valid parameters
*/
int calculateParamAmount(/*@NonEmpty*/ Collection<Object> params) {
int size = 0;
int length = 0;
for (Object param : params) {
int s = calculateParamTransportSize(param);
if (s <= 0) {
RecordLog.warn("[ParamFlowRequestDataWriter] WARN: Non-primitive type detected in params of "
+ "cluster parameter flow control, which is not supported: " + param);
continue;
}
if (size + s > maxParamByteSize) {
break;
}
size += s;
length++;
}
return length;
}

int calculateParamTransportSize(Object value) {
if (value == null) {
Expand All @@ -132,7 +135,7 @@ int calculateParamTransportSize(Object value) {
return 5;
} else if (value instanceof String) {
// Layout for string: |type flag(1)|length(4)|string content|
String tmpValue = (String)value;
String tmpValue = (String) value;
byte[] tmpChars = tmpValue.getBytes();
return 1 + 4 + tmpChars.length;
} else if (boolean.class.isInstance(value) || value instanceof Boolean) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.config;

import com.alibaba.csp.sentinel.config.SentinelConfig;
import com.alibaba.csp.sentinel.log.RecordLog;

/**
* <p>
* this class dedicated to reading startup configurations of cluster client
* </p>
*
* @author lianglin
* @since 1.7.0
*/
public class ClusterClientStartUpConfig {

private static final String MAX_PARAM_BYTE_SIZE = "csp.sentinel.cluster.max.param.byte.size";

/**
* Get the max bytes params can be serialized
*
* @return the max bytes, may be null
*/
public static Integer getMaxParamByteSize() {
String maxParamByteSize = SentinelConfig.getConfig(MAX_PARAM_BYTE_SIZE);
try {
return maxParamByteSize == null ? null : Integer.valueOf(maxParamByteSize);
} catch (Exception ex) {
RecordLog.warn("[ClusterClientStartUpConfig] Failed to parse maxParamByteSize: " + maxParamByteSize);
return null;
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.csp.sentinel.cluster.client.codec.data.PingResponseDataDecoder;
import com.alibaba.csp.sentinel.cluster.client.codec.registry.RequestDataWriterRegistry;
import com.alibaba.csp.sentinel.cluster.client.codec.registry.ResponseDataDecodeRegistry;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientStartUpConfig;
import com.alibaba.csp.sentinel.init.InitFunc;
import com.alibaba.csp.sentinel.init.InitOrder;

Expand All @@ -42,7 +43,12 @@ public void init() throws Exception {
private void initDefaultEntityWriters() {
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PING, new PingRequestDataWriter());
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_FLOW, new FlowRequestDataWriter());
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter());
Integer maxParamByteSize = ClusterClientStartUpConfig.getMaxParamByteSize();
if (maxParamByteSize == null) {
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter());
} else {
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter(maxParamByteSize));
}
}

private void initDefaultEntityDecoders() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.alibaba.csp.sentinel.cluster.client.codec.data;

import java.util.ArrayList;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.*;

/**
Expand All @@ -27,35 +28,33 @@ public void testCalculateParamTransportSize() {
}

@Test
public void testCalculateParamAmountExceedsMaxSize() {
final int maxSize = 10;
public void testResolveValidParams() {

final int maxSize = 15;
ParamFlowRequestDataWriter writer = new ParamFlowRequestDataWriter(maxSize);
assertEquals(1, writer.calculateParamAmount(new ArrayList<Object>() {{

ArrayList<Object> params = new ArrayList<Object>() {{
add(1);
}}));
assertEquals(2, writer.calculateParamAmount(new ArrayList<Object>() {{
add(1); add(64);
}}));
assertEquals(2, writer.calculateParamAmount(new ArrayList<Object>() {{
add(1); add(64); add(3);
}}));
}
add(64);
add(3);
}};

@Test
public void testCalculateParamAmount() {
ParamFlowRequestDataWriter writer = new ParamFlowRequestDataWriter();
assertEquals(6, writer.calculateParamAmount(new ArrayList<Object>() {{
add(1); add(1d); add(1f); add((byte) 1); add("123"); add(true);
}}));
// POJO (non-primitive type) should not be regarded as a valid parameter.
assertEquals(0, writer.calculateParamAmount(new ArrayList<Object>() {{
List<Object> validParams = writer.resolveValidParams(params);
assertTrue(validParams.contains(1) && validParams.contains(64) && validParams.contains(3));

//when over maxSize, the exceed number should not be contained
params.add(5);
assertFalse(writer.resolveValidParams(params).contains(5));


//POJO (non-primitive type) should not be regarded as a valid parameter
assertTrue(writer.resolveValidParams(new ArrayList<Object>() {{
add(new SomePojo());
}}));
assertEquals(1, writer.calculateParamAmount(new ArrayList<Object>() {{
add(new Object()); add(1);
}}));
}}).size() == 0);

}


private static class SomePojo {
private String param1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
*/
package com.alibaba.csp.sentinel.config;

import com.alibaba.csp.sentinel.log.LogBase;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.AppNameUtil;
import com.alibaba.csp.sentinel.util.AssertUtil;

import java.io.File;
import java.io.FileInputStream;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

import com.alibaba.csp.sentinel.log.LogBase;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.AppNameUtil;
import com.alibaba.csp.sentinel.util.AssertUtil;

/**
* The universal local config center of Sentinel. The config is retrieved from command line arguments
* and {@code ${user.home}/logs/csp/${appName}.properties} file by default.
Expand All @@ -53,6 +53,7 @@ public class SentinelConfig {
public static final String COLD_FACTOR = "csp.sentinel.flow.cold.factor";
public static final String STATISTIC_MAX_RT = "csp.sentinel.statistic.max.rt";


static final String DEFAULT_CHARSET = "UTF-8";
static final long DEFAULT_SINGLE_METRIC_FILE_SIZE = 1024 * 1024 * 50;
static final int DEFAULT_TOTAL_METRIC_FILE_COUNT = 6;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package com.alibaba.csp.sentinel.node;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.node.metric.MetricNode;
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
import com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric;
import com.alibaba.csp.sentinel.slots.statistic.metric.Metric;
import com.alibaba.csp.sentinel.util.TimeUtil;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* <p>The statistic node keep three kinds of real-time statistics metrics:</p>
Expand Down Expand Up @@ -104,7 +104,7 @@ public class StatisticNode implements Node {
/**
* The counter for thread count.
*/
private AtomicInteger curThreadNum = new AtomicInteger(0);
private LongAdder curThreadNum = new LongAdder();

/**
* The last timestamp when metrics were fetched.
Expand Down Expand Up @@ -233,7 +233,7 @@ public double minRt() {

@Override
public int curThreadNum() {
return curThreadNum.get();
return (int)curThreadNum.sum();
}

@Override
Expand Down Expand Up @@ -265,12 +265,12 @@ public void increaseExceptionQps(int count) {

@Override
public void increaseThreadNum() {
curThreadNum.incrementAndGet();
curThreadNum.increment();
}

@Override
public void decreaseThreadNum() {
curThreadNum.decrementAndGet();
curThreadNum.decrement();
}

@Override
Expand Down
Loading