Skip to content

Commit

Permalink
Merge pull request #823 from zqr10159/greptime
Browse files Browse the repository at this point in the history
[warehouse]feature
  • Loading branch information
zqr10159 authored Mar 31, 2023
2 parents 1f22244 + 7f30163 commit 3235965
Showing 1 changed file with 72 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.usthe.common.util.CommonConstants;
import com.usthe.warehouse.config.WarehouseProperties;
import io.greptime.models.*;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
Expand All @@ -33,6 +34,7 @@

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -66,10 +68,9 @@ public class HistoryGrepTimeDbDataStorage extends AbstractHistoryDataStorage {
private static final String QUERY_HISTORY_SQL
= "SELECT ts, instance, \"%s\" FROM %s WHERE ts >= now() - INTERVAL %s order by ts desc;";
private static final String QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL
= "SELECT FIRST_VALUE(%s), AVG(%s), MIN_VALUE(%s), MAX_VALUE(%s) FROM %s GROUP BY ([now() - %s, now()), 4h) WITHOUT NULL ANY";
= "SELECT first(%s), avg(%s), min(%s), max(%s) FROM %s WHERE instance = %s AND ts >= now - %s INTERVAL '4' HOUR";
private static final String TABLE_NOT_EXIST = "not exist";
private static final String DATABASE_NOT_EXIST = "not exist";
private static final String HERTZBEAT_DB_NAME = "hertzbeat";
private GreptimeDB greptimeDb;

public HistoryGrepTimeDbDataStorage(WarehouseProperties properties) {
Expand Down Expand Up @@ -117,8 +118,8 @@ private boolean createDatabase() {
List<Row> rowsList = rows.collect();
for (Row row : rowsList) {
for (io.greptime.models.Value value : row.values()) {
if (value.value().toString().equals(HERTZBEAT_DB_NAME)) {
log.info("Exist Database {}",HERTZBEAT_DB_NAME);
if (value.value().toString().equals(STORAGE_DATABASE)) {
log.info("Exist Database {}",STORAGE_DATABASE);
isDatabaseExist = true;
break;
}
Expand All @@ -129,12 +130,12 @@ private boolean createDatabase() {
if (!isDatabaseExist) {
QueryRequest createDatabase = QueryRequest.newBuilder()
.exprType(SelectExprType.Sql)
.ql("CREATE DATABASE " + HERTZBEAT_DB_NAME + ";")
.ql("CREATE DATABASE " + STORAGE_DATABASE + ";")
.build();
try {
CompletableFuture<Result<QueryOk, Err>> createFuture = greptimeDb.query(createDatabase);
isDatabaseExist = createFuture.get().isOk();
log.info("Database {} does not exist,and has been created",HERTZBEAT_DB_NAME);
log.info("Database {} does not exist,and has been created",STORAGE_DATABASE);
} catch (InterruptedException | ExecutionException e) {
log.error("Error creating database");
}
Expand All @@ -151,10 +152,10 @@ void saveData(CollectRep.MetricsData metricsData) {
log.info("[warehouse greptime] flush metrics data {} is null, ignore.", metricsData.getId());
return;
}

String monitorId = String.valueOf(metricsData.getId());
//表名添加monitorId区分
String table = metricsData.getApp() + "_" + metricsData.getMetrics()+ "_" +monitorId;
//TODO bug:选择STORAGE_DATABASE不起作用,还是默认存在public里
TableSchema.Builder tableSchemaBuilder = TableSchema.newBuilder(TableName.with(STORAGE_DATABASE, table));

List<SemanticType> semanticTypes = new LinkedList<>(Arrays.asList(SemanticType.Tag, SemanticType.Tag, SemanticType.Timestamp));
Expand Down Expand Up @@ -224,11 +225,7 @@ public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app,
"\t----------Can Not Use Metric History Now----------\n");
return instanceValuesMap;
}

String[] numberAndTime = history.split("(?<=\\D)(?=\\d)|(?<=\\d)(?=\\D)");
if (Objects.equals(numberAndTime[1], "h")){
history = "'"+numberAndTime[0]+"'" +" HOUR";
}
history = getHistory(history);
String table = app + "_" + metrics + "_" + monitorId;
String selectSql = String.format(QUERY_HISTORY_SQL, metric, table, history);
log.info("selectSql: {}", selectSql);
Expand All @@ -243,6 +240,9 @@ public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app,
} catch (FlightRuntimeException e) {
String msg = e.getMessage();
if (msg != null && !msg.contains(TABLE_NOT_EXIST)) {
List<Value> valueList = instanceValuesMap.computeIfAbsent(metric, k -> new LinkedList<>());
valueList.add(new Value(null, System.currentTimeMillis()));
log.info("TABLE_NOT_EXIST: {}",valueList);
log.warn(msg);
}
} catch (Exception e) {
Expand All @@ -260,27 +260,80 @@ public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app,
valueList.add(new Value(strValue, (long) map.get("ts")));
}
}

log.info("instanceValuesMap: {}",instanceValuesMap);
return instanceValuesMap;
}






@Override
public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, String app, String metrics,
String metric, String instance, String history) {
Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
if (!isServerAvailable()) {
log.error("\n\t---------------GrepTime Init Failed---------------\n" +
"\t--------------Please Config IotDb--------------\n" +
"\t--------------Please Config GrepTime--------------\n" +
"\t----------Can Not Use Metric History Now----------\n");
return instanceValuesMap;
}
history = getHistory(history);
String table = app + "_" + metrics + "_" + monitorId;
String selectSql = String.format(QUERY_HISTORY_SQL, metric, table, history);
log.info("selectSql: {}", selectSql);
QueryRequest request = QueryRequest.newBuilder()
.exprType(SelectExprType.Sql)
.ql(selectSql)
.build();
Result<QueryOk, Err> result = null;
try {
CompletableFuture<Result<QueryOk, Err>> future = greptimeDb.query(request);
result = future.get();
} catch (FlightRuntimeException e) {
String msg = e.getMessage();
if (msg != null && !msg.contains(TABLE_NOT_EXIST)) {
log.warn(msg);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
List<Value> valueList;
if (result != null && result.isOk()) {
QueryOk queryOk = result.getOk();
SelectRows rows = queryOk.getRows();
List<Map<String, Object>> maps = rows.collectToMaps();
for (Map<String, Object> map : maps) {
String strValue = new BigDecimal(map.get(metric).toString()).setScale(4, RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
valueList = instanceValuesMap.computeIfAbsent(metric, k -> new LinkedList<>());
valueList.add(new Value(strValue, (long) map.get("ts")));
}
}
return instanceValuesMap;
}
private static String getHistory(String history) {
String[] parts = history.split("(?<=\\D)(?=\\d)|(?<=\\d)(?=\\D)");
String number = parts[0];
String time = parts[1];
switch (time) {
case "h":
time = "HOUR";
break;
case "d":
time = "DAY";
break;
case "w":
time = "WEEK";
break;
case "s":
time = "SECOND";
break;
case "m":
time = "MINUTE";
break;
default:
number = "6";
time = "HOUR";
break;
}
return String.format("'%s' %s", number, time);
}

@Override
public void destroy() {
Expand Down

0 comments on commit 3235965

Please sign in to comment.