Skip to content

Commit

Permalink
Merge pull request #821 from zqr10159/greptime
Browse files Browse the repository at this point in the history
[warehouse]feature: 
1.add GrepTimeDb `createDatabase`
2.Check if the database exists; if it does not exist, create the database.
  • Loading branch information
zqr10159 authored Mar 30, 2023
2 parents fd4812e + 3c0e487 commit 1f22244
Showing 1 changed file with 65 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.usthe.warehouse.config.WarehouseProperties;
import io.greptime.models.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand Down Expand Up @@ -66,7 +67,9 @@ public class HistoryGrepTimeDbDataStorage extends AbstractHistoryDataStorage {
= "SELECT ts, instance, \"%s\" FROM %s WHERE ts >= now() - INTERVAL %s order by ts desc;";
private static final String QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL
= "SELECT FIRST_VALUE(%s), AVG(%s), MIN_VALUE(%s), MAX_VALUE(%s) FROM %s GROUP BY ([now() - %s, now()), 4h) WITHOUT NULL ANY";

private static final String TABLE_NOT_EXIST = "not exist";
private static final String DATABASE_NOT_EXIST = "not exist";
private static final String HERTZBEAT_DB_NAME = "hertzbeat";
private GreptimeDB greptimeDb;

public HistoryGrepTimeDbDataStorage(WarehouseProperties properties) {
Expand All @@ -87,9 +90,56 @@ private boolean initDbSession(WarehouseProperties.StoreProperties.GreptimeProper
}
return createDatabase();
}
// 检查数据库是否存在;如果不存在,则创建该数据库
private boolean createDatabase() {
// todo auto create database hertzbeat
return true;
// 查询现有数据库
QueryRequest showDatabases = QueryRequest.newBuilder()
.exprType(SelectExprType.Sql)
.ql("SHOW DATABASES;")
.build();
Result<QueryOk, Err> result = null;
try {
CompletableFuture<Result<QueryOk, Err>> future = greptimeDb.query(showDatabases);
result = future.get();
} catch (FlightRuntimeException e) {
String msg = e.getMessage();
if (msg != null && !msg.contains(DATABASE_NOT_EXIST)) {
log.warn(msg);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
// 检查现有数据库是否包括“hertzbeat”
boolean isDatabaseExist = false;
if (result != null && result.isOk()) {
QueryOk queryOk = result.getOk();
SelectRows rows = queryOk.getRows();
List<Row> rowsList = rows.collect();
for (Row row : rowsList) {
for (io.greptime.models.Value value : row.values()) {
if (value.value().toString().equals(HERTZBEAT_DB_NAME)) {
log.info("Exist Database {}",HERTZBEAT_DB_NAME);
isDatabaseExist = true;
break;
}
}
}
}
// 如果“hertzbeat”数据库不存在,则创建该数据库
if (!isDatabaseExist) {
QueryRequest createDatabase = QueryRequest.newBuilder()
.exprType(SelectExprType.Sql)
.ql("CREATE DATABASE " + HERTZBEAT_DB_NAME + ";")
.build();
try {
CompletableFuture<Result<QueryOk, Err>> createFuture = greptimeDb.query(createDatabase);
isDatabaseExist = createFuture.get().isOk();
log.info("Database {} does not exist,and has been created",HERTZBEAT_DB_NAME);
} catch (InterruptedException | ExecutionException e) {
log.error("Error creating database");
}
}
return isDatabaseExist;
}

@Override
Expand All @@ -113,7 +163,6 @@ void saveData(CollectRep.MetricsData metricsData) {

List<CollectRep.Field> fieldsList = metricsData.getFieldsList();
for (CollectRep.Field field : fieldsList) {
log.info("Field " + field.getName());
semanticTypes.add(SemanticType.Field);
columnNames.add(field.getName());
// handle field type
Expand Down Expand Up @@ -183,33 +232,33 @@ public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app,
String table = app + "_" + metrics + "_" + monitorId;
String selectSql = String.format(QUERY_HISTORY_SQL, metric, table, history);
log.info("selectSql: {}", selectSql);
QueryRequest request = QueryRequest.newBuilder() //
.exprType(SelectExprType.Sql) //
.ql(selectSql) //
QueryRequest request = QueryRequest.newBuilder()
.exprType(SelectExprType.Sql)
.ql(selectSql)
.build();
Result<QueryOk, Err> result = null;
try {
CompletableFuture<Result<QueryOk, Err>> future = greptimeDb.query(request);
result = future.get();
} catch (InterruptedException | ExecutionException e) {
log.error(e.getMessage());
} catch (FlightRuntimeException e) {
String msg = e.getMessage();
if (msg != null && !msg.contains(TABLE_NOT_EXIST)) {
log.warn(msg);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}

if (result.isOk()) {
if (result != null && result.isOk()) {
QueryOk queryOk = result.getOk();
SelectRows rows = queryOk.getRows();
List<Map<String, Object>> maps = rows.collectToMaps();
List<Value> valueList;
for (Map<String, Object> map : maps) {
log.info("Query row: {}", map);
String strValue = new BigDecimal(map.get(metric).toString()).setScale(4, RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
valueList = instanceValuesMap.computeIfAbsent(metric, k -> new LinkedList<>());
valueList.add(new Value(strValue, (long)map.get("ts")));


valueList.add(new Value(strValue, (long) map.get("ts")));
}
} else {
log.error("Failed to query: {}", result.getErr());
}

return instanceValuesMap;
Expand Down

0 comments on commit 1f22244

Please sign in to comment.