Skip to content

Commit

Permalink
Polish code of transport command centers and heartbeat senders
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Zhao <sczyh16@gmail.com>
  • Loading branch information
sczyh30 committed Mar 14, 2020
1 parent 0536fb6 commit 22df09b
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import java.util.List;

/**
* @author leyou
* @author Carpenter Lee
* @author Jason Joo
*/
public class TransportConfig {

Expand Down Expand Up @@ -66,7 +67,6 @@ public static List<Tuple2<String, Integer>> getConsoleServerList() {
String config = SentinelConfig.getConfig(CONSOLE_SERVER);
List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
if (StringUtil.isBlank(config)) {
RecordLog.warn("Dashboard server address is not configured");
return list;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.alibaba.csp.sentinel.util.PidUtil;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.csp.sentinel.util.function.Tuple2;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
Expand All @@ -37,7 +38,7 @@

/**
* @author Eric Zhao
* @author leyou
* @author Carpenter Lee
*/
@SpiOrder(SpiOrder.LOWEST_PRECEDENCE - 100)
public class HttpHeartbeatSender implements HeartbeatSender {
Expand All @@ -46,26 +47,28 @@ public class HttpHeartbeatSender implements HeartbeatSender {

private static final int OK_STATUS = 200;


private final int timeoutMs = 3000;
private final RequestConfig requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(timeoutMs)
.setConnectTimeout(timeoutMs)
.setSocketTimeout(timeoutMs)
.build();

private String consoleHost;
private int consolePort;
private final String consoleHost;
private final int consolePort;

public HttpHeartbeatSender() {
this.client = HttpClients.createDefault();
List<Tuple2<String, Integer>> dashboardList = TransportConfig.getConsoleServerList();
if (dashboardList == null || dashboardList.isEmpty()) {
RecordLog.info("[NettyHttpHeartbeatSender] No dashboard available");
RecordLog.info("[NettyHttpHeartbeatSender] No dashboard server available");
consoleHost = null;
consolePort = -1;
} else {
consoleHost = dashboardList.get(0).r1;
consolePort = dashboardList.get(0).r2;
RecordLog.info("[NettyHttpHeartbeatSender] Dashboard address parsed: <" + consoleHost + ':' + consolePort + ">");
RecordLog.info(
"[NettyHttpHeartbeatSender] Dashboard address parsed: <" + consoleHost + ':' + consolePort + ">");
}
}

Expand Down Expand Up @@ -96,38 +99,22 @@ public boolean sendHeartbeat() throws Exception {
return true;
} else if (clientErrorCode(statusCode) || serverErrorCode(statusCode)) {
RecordLog.warn("[HttpHeartbeatSender] Failed to send heartbeat to "
+ consoleHost + ":" + consolePort + ", http status code: {0}", statusCode);
+ consoleHost + ":" + consolePort + ", http status code: " + statusCode);
}

return false;


}

@Override
public long intervalMs() {
return 5000;
}

/**
* 4XX Client Error
*
* @param code
* @return
*/
private boolean clientErrorCode(int code) {
return code > 399 && code < 500;
}

/**
* 5XX Server Error
*
* @param code
* @return
*/
private boolean serverErrorCode(int code) {
return code > 499 && code < 600;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.alibaba.csp.sentinel.command.CommandResponse;
import com.alibaba.csp.sentinel.config.SentinelConfig;
import com.alibaba.csp.sentinel.log.CommandCenterLog;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter;
import com.alibaba.csp.sentinel.transport.command.exception.RequestException;
import com.alibaba.csp.sentinel.transport.util.HttpCommandUtils;
Expand All @@ -41,15 +40,17 @@
import java.util.HashMap;
import java.util.Map;


/***
/**
* The task handles incoming command request in HTTP protocol.
*
* @author youji.zj
* @author Eric Zhao
* @author Jason Joo
*/
public class HttpEventTask implements Runnable {
private static final String SERVER_ERROR_MESSAGE = "Command server error";

public static final String SERVER_ERROR_MESSAGE = "Command server error";
public static final String INVALID_COMMAND_MESSAGE = "Invalid command";

private final Socket socket;

Expand Down Expand Up @@ -92,7 +93,7 @@ public void run() {
// Validate the target command.
String commandName = HttpCommandUtils.getTarget(request);
if (StringUtil.isBlank(commandName)) {
writeResponse(printWriter, StatusCode.BAD_REQUEST, "Invalid command");
writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
return;
}

Expand Down Expand Up @@ -133,7 +134,7 @@ public void run() {
closeResource(socket);
}
}

private static String readLine(InputStream in) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(64);
int data;
Expand All @@ -153,52 +154,52 @@ private static String readLine(InputStream in) throws IOException {
}
return new String(arr, SentinelConfig.charset());
}

/**
* Try to process the body of POST request additionally.
*
*
* @param in
* @param request
* @throws RequestException
* @throws IOException
*/
protected static void processPostRequest(InputStream in, CommandRequest request)
throws RequestException, IOException {
throws RequestException, IOException {
Map<String, String> headerMap = parsePostHeaders(in);

if (headerMap == null) {
// illegal request
RecordLog.warn("Illegal request read");
CommandCenterLog.warn("Illegal request read: null headerMap");
throw new RequestException(StatusCode.BAD_REQUEST, "");
}
if (headerMap.containsKey("content-type") && !checkSupport(headerMap.get("content-type"))) {
// not support Content-type
RecordLog.warn("Not supported Content-Type: {}", headerMap.get("content-type"));
throw new RequestException(StatusCode.UNSUPPORTED_MEDIA_TYPE,
"Only form-encoded post request is supported");

if (headerMap.containsKey("content-type") && !checkContentTypeSupported(headerMap.get("content-type"))) {
// not supported Content-type
CommandCenterLog.warn("Request not supported: unsupported Content-Type: " + headerMap.get("content-type"));
throw new RequestException(StatusCode.UNSUPPORTED_MEDIA_TYPE,
"Only form-encoded post request is supported");
}

int bodyLength = 0;
try {
bodyLength = Integer.parseInt(headerMap.get("content-length"));
} catch (Exception e) {
}
if (bodyLength < 1) {
// illegal request without Content-length header
RecordLog.warn("No available Content-Length in headers");
CommandCenterLog.warn("Request not supported: no available Content-Length in headers");
throw new RequestException(StatusCode.LENGTH_REQUIRED, "No legal Content-Length");
}

parseParams(readBody(in, bodyLength), request);
}

/**
* Process header line in request
*
*
* @param in
* @return return headers in a Map, null for illegal request
* @throws IOException
* @throws IOException
*/
protected static Map<String, String> parsePostHeaders(InputStream in) throws IOException {
Map<String, String> headerMap = new HashMap<String, String>(4);
Expand All @@ -221,8 +222,8 @@ protected static Map<String, String> parsePostHeaders(InputStream in) throws IOE
}
}
}
private static boolean checkSupport(String contentType) {

private static boolean checkContentTypeSupported(String contentType) {
int idx = contentType.indexOf(";");
String type;
if (idx > 0) {
Expand All @@ -234,16 +235,15 @@ private static boolean checkSupport(String contentType) {
// But some library do add it. So we will be compatible with that but force to
// encoding specified in configuration as legacy processing will do.
if (!type.contains("application/x-www-form-urlencoded")) {
CommandCenterLog.warn("Content-Type not supported: " + contentType);
// Not supported request type
// Now simple-http only support form-encoded post request.
return false;
}
return true;
}
private static String readBody(InputStream in, int bodyLength)
throws IOException, RequestException {

private static String readBody(InputStream in, int bodyLength)
throws IOException, RequestException {
byte[] buf = new byte[bodyLength];
int pos = 0;
while (pos < bodyLength) {
Expand All @@ -259,23 +259,23 @@ private static String readBody(InputStream in, int bodyLength)
// Only allow partial
return new String(buf, 0, pos, SentinelConfig.charset());
}

/**
* Consume all the body submitted and parse params into {@link CommandRequest}
*
*
* @param queryString
* @param request
*/
protected static void parseParams(String queryString, CommandRequest request) {
if (queryString == null || queryString.length() < 1) {
return;
}

int offset = 0, pos = -1;

// check anchor
queryString = removeAnchor(queryString);

while (true) {
offset = pos + 1;
pos = queryString.indexOf('&', offset);
Expand Down Expand Up @@ -319,11 +319,11 @@ private <T> void handleResponse(CommandResponse<T> response, final PrintWriter p
writeResponse(printWriter, StatusCode.BAD_REQUEST, msg);
}
}

private void writeResponse(PrintWriter out, StatusCode statusCode, String message) {
out.print("HTTP/1.0 " + statusCode.toString() + "\r\n"
+ "Content-Length: " + (message == null ? 0 : message.getBytes().length) + "\r\n"
+ "Connection: close\r\n\r\n");
+ "Content-Length: " + (message == null ? 0 : message.getBytes().length) + "\r\n"
+ "Connection: close\r\n\r\n");
if (message != null) {
out.print(message);
}
Expand Down Expand Up @@ -354,34 +354,34 @@ protected static CommandRequest processQueryString(String line) {
parseParams(parameterStr, request);
return request;
}

/**
* Truncate query from "a=1&b=2#mark" to "a=1&b=2"
*
*
* @param str
* @return
*/
protected static String removeAnchor(String str) {
if (str == null || str.length() == 0) {
return str;
}

int anchor = str.indexOf('#');

if (anchor == 0) {
return "";
} else if (anchor > 0) {
return str.substring(0, anchor);
}

return str;
}

protected static void parseSingleParam(String single, CommandRequest request) {
if (single == null || single.length() < 3) {
return;
}

int index = single.indexOf('=');
if (index <= 0 || index >= single.length() - 1) {
// empty key/val or nothing found
Expand All @@ -395,7 +395,7 @@ protected static void parseSingleParam(String single, CommandRequest request) {
value = URLDecoder.decode(value, SentinelConfig.charset());
} catch (UnsupportedEncodingException e) {
}

request.addParam(key, value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
*/
package com.alibaba.csp.sentinel.transport.command.http;

/**
* @author Jason Joo
*/
public enum StatusCode {
/**
* 200 OK.
*/
OK(200, "OK"),
BAD_REQUEST(400, "Bad Request"),
REQUEST_TIMEOUT(408, "Request Timeout"),
Expand All @@ -27,7 +33,7 @@ public enum StatusCode {
private String desc;
private String representation;

private StatusCode(int code, String desc) {
StatusCode(int code, String desc) {
this.code = code;
this.desc = desc;
this.representation = code + " " + desc;
Expand Down
Loading

0 comments on commit 22df09b

Please sign in to comment.