Skip to content

Commit

Permalink
[Feature][RestAPI] overview support tag filter
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie committed Jul 11, 2024
1 parent 05717ef commit 9761ee9
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 104 deletions.
15 changes: 10 additions & 5 deletions docs/en/seatunnel-engine/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ network:
### Returns an overview over the Zeta engine cluster.

<details>
<summary><code>GET</code> <code><b>/hazelcast/rest/maps/overview</b></code> <code>(Returns an overview over the Zeta engine cluster.)</code></summary>
<summary><code>GET</code> <code><b>/hazelcast/rest/maps/overview?tag1=value1&tag2=value2</b></code> <code>(Returns an overview over the Zeta engine cluster.)</code></summary>

#### Parameters

> | name | type | data type | description |
> |----------|----------|-----------|------------------------------------------------------------------------------------------------------|
> | tag_name | optional | string | the tags filter, you can add tag filter to get those matched worker count, and slot on those workers |

#### Responses

```json
Expand All @@ -50,16 +54,17 @@ network:
"gitCommitAbbrev":"DeadD0d0",
"totalSlot":"0",
"unassignedSlot":"0",
"works":"1",
"runningJobs":"0",
"finishedJobs":"0",
"failedJobs":"0",
"cancelledJobs":"0",
"works":"1"
"cancelledJobs":"0"
}
```

If you use `dynamic-slot`, the `totalSlot` and `unassignedSlot` always be `0`.
If you set it to fix slot number, it will return the correct total and unassigned slot number
**Notes:**
- If you use `dynamic-slot`, the `totalSlot` and `unassignedSlot` always be `0`. when you set it to fix slot number, it will return the correct total and unassigned slot number
- If the url has tag filter, the `works`, `totalSlot` and `unassignedSlot` will return the result on the matched worker. but the job related metric will always return the cluster level information.

</details>

Expand Down
85 changes: 45 additions & 40 deletions docs/zh/seatunnel-engine/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@ network:
### 返回Zeta集群的概览

<details>
<summary><code>GET</code> <code><b>/hazelcast/rest/maps/overview</b></code> <code>(Returns an overview over the Zeta engine cluster.)</code></summary>
<summary><code>GET</code> <code><b>/hazelcast/rest/maps/overview?tag1=value1&tag2=value2</b></code> <code>(Returns an overview over the Zeta engine cluster.)</code></summary>

#### 参数

> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
> |--------|------|------|--------------------------|
> | tag键值对 | 否 | 字符串 | 一组标签值, 通过该标签值过滤满足条件的节点信息 |

#### 响应

```json
Expand All @@ -49,16 +53,17 @@ network:
"gitCommitAbbrev":"DeadD0d0",
"totalSlot":"0",
"unassignedSlot":"0",
"works":"1",
"runningJobs":"0",
"finishedJobs":"0",
"failedJobs":"0",
"cancelledJobs":"0",
"works":"1"
"cancelledJobs":"0"
}
```

当你使用`dynamic-slot`时, 返回结果中的`totalSlot`和`unassignedSlot`将始终为0.
当你设置为固定的slot值时, 将正确返回集群中总共的slot数量以及未分配的slot数量.
**注意:**
- 当你使用`dynamic-slot`时, 返回结果中的`totalSlot`和`unassignedSlot`将始终为0. 设置为固定的slot值后, 将正确返回集群中总共的slot数量以及未分配的slot数量.
- 当添加标签过滤后, `works`, `totalSlot`, `unassignedSlot`将返回满足条件的节点的相关指标. 注意`runningJobs`等job相关指标为集群级别结果, 无法根据标签进行过滤.

</details>

Expand Down Expand Up @@ -110,9 +115,9 @@ network:

#### 参数

> | name | type | data type | description |
> |-------|----------|-----------|-------------|
> | jobId | required | long | job id |
> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
> |-------|------|------|--------|
> | jobId | 是 | long | job id |

#### 响应

Expand Down Expand Up @@ -167,9 +172,9 @@ network:

#### 参数

> | name | type | data type | description |
> |-------|----------|-----------|-------------|
> | jobId | required | long | job id |
> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
> |-------|------|------|--------|
> | jobId | 是 | long | job id |

#### 响应

Expand Down Expand Up @@ -222,9 +227,9 @@ network:

#### 参数

> | name | type | data type | description |
> |-------|----------|-----------|------------------------------------------------------------------|
> | state | optional | string | finished job status. `FINISHED`,`CANCELED`,`FAILED`,`UNKNOWABLE` |
> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
> |-------|----------|--------|------------------------------------------------------------------|
> | state | optional | string | finished job status. `FINISHED`,`CANCELED`,`FAILED`,`UNKNOWABLE` |

#### 响应

Expand Down Expand Up @@ -319,11 +324,11 @@ network:

#### 参数

> | name | type | data type | description |
> |----------------------|----------|-----------|-----------------------------------|
> | jobId | optional | string | job id |
> | jobName | optional | string | job name |
> | isStartWithSavePoint | optional | string | if job is started with save point |
> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
> |----------------------|----------|--------|-----------------------------------|
> | jobId | optional | string | job id |
> | jobName | optional | string | job name |
> | isStartWithSavePoint | optional | string | if job is started with save point |

#### 请求体

Expand All @@ -334,12 +339,12 @@ network:
},
"source": [
{
"plugin_name": "FakeSource",
"result_table_name": "fake",
"plugin_参数名称": "FakeSource",
"result_table_参数名称": "fake",
"row.num": 100,
"schema": {
"fields": {
"name": "string",
"参数名称": "string",
"age": "int",
"card": "int"
}
Expand All @@ -350,8 +355,8 @@ network:
],
"sink": [
{
"plugin_name": "Console",
"source_table_name": ["fake"]
"plugin_参数名称": "Console",
"source_table_参数名称": ["fake"]
}
]
}
Expand Down Expand Up @@ -412,30 +417,30 @@ network:
},
"source": [
{
"plugin_name": "MySQL-CDC",
"plugin_参数名称": "MySQL-CDC",
"schema" : {
"fields": {
"name": "string",
"参数名称": "string",
"age": "int"
}
},
"result_table_name": "fake",
"result_table_参数名称": "fake",
"parallelism": 1,
"hostname": "127.0.0.1",
"username": "seatunnel",
"host参数名称": "127.0.0.1",
"user参数名称": "seatunnel",
"password": "seatunnel_password",
"table-name": "inventory_vwyw0n"
"table-参数名称": "inventory_vwyw0n"
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Clickhouse",
"plugin_参数名称": "Clickhouse",
"host": "localhost:8123",
"database": "default",
"table": "fake_all",
"username": "seatunnel",
"user参数名称": "seatunnel",
"password": "seatunnel_password"
}
]
Expand All @@ -452,29 +457,29 @@ network:
},
"source": [
{
"plugin_name": "MySQL-CDC",
"plugin_参数名称": "MySQL-CDC",
"schema": {
"fields": {
"name": "string",
"参数名称": "string",
"age": "int"
}
},
"result_table_name": "fake",
"result_table_参数名称": "fake",
"parallelism": 1,
"hostname": "127.0.0.1",
"username": "c2VhdHVubmVs",
"host参数名称": "127.0.0.1",
"user参数名称": "c2VhdHVubmVs",
"password": "c2VhdHVubmVsX3Bhc3N3b3Jk",
"table-name": "inventory_vwyw0n"
"table-参数名称": "inventory_vwyw0n"
}
],
"transform": [],
"sink": [
{
"plugin_name": "Clickhouse",
"plugin_参数名称": "Clickhouse",
"host": "localhost:8123",
"database": "default",
"table": "fake_all",
"username": "c2VhdHVubmVs",
"user参数名称": "c2VhdHVubmVs",
"password": "c2VhdHVubmVsX3Bhc3N3b3Jk"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.junit.jupiter.api.Test;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.config.Config;
import com.hazelcast.config.MemberAttributeConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -63,13 +65,23 @@ public class RestApiIT {
@BeforeEach
void beforeClass() throws Exception {
String testClusterName = TestUtils.getClusterName("RestApiIT");
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(20);
node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
SeaTunnelConfig node1Config = ConfigProvider.locateAndGetSeaTunnelConfig();
node1Config.getHazelcastConfig().setClusterName(testClusterName);
node1Config.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
node1Config.getEngineConfig().getSlotServiceConfig().setSlotNum(20);
MemberAttributeConfig node1Tags = new MemberAttributeConfig();
node1Tags.setAttribute("node", "node1");
node1Config.getHazelcastConfig().setMemberAttributeConfig(node1Tags);
node1 = SeaTunnelServerStarter.createHazelcastInstance(node1Config);

node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
MemberAttributeConfig node2Tags = new MemberAttributeConfig();
node2Tags.setAttribute("node", "node2");
Config node2hzconfig = node1Config.getHazelcastConfig().setMemberAttributeConfig(node2Tags);
SeaTunnelConfig node2Config = ConfigProvider.locateAndGetSeaTunnelConfig();
node2Config.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
node2Config.getEngineConfig().getSlotServiceConfig().setSlotNum(20);
node2Config.setHazelcastConfig(node2hzconfig);
node2 = SeaTunnelServerStarter.createHazelcastInstance(node2Config);

String filePath = TestUtils.getResource("stream_fakesource_to_file.conf");
JobConfig jobConfig = new JobConfig();
Expand All @@ -79,7 +91,7 @@ void beforeClass() throws Exception {
clientConfig.setClusterName(testClusterName);
engineClient = new SeaTunnelClient(clientConfig);
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
engineClient.createExecutionContext(filePath, jobConfig, node1Config);

clientJobProxy = jobExecutionEnv.execute();

Expand All @@ -94,7 +106,7 @@ void beforeClass() throws Exception {
JobConfig batchConf = new JobConfig();
batchConf.setName("fake_to_console");
ClientJobExecutionEnvironment batchJobExecutionEnv =
engineClient.createExecutionContext(batchFilePath, batchConf, seaTunnelConfig);
engineClient.createExecutionContext(batchFilePath, batchConf, node1Config);
batchJobProxy = batchJobExecutionEnv.execute();
Awaitility.await()
.atMost(5, TimeUnit.MINUTES)
Expand Down Expand Up @@ -240,6 +252,27 @@ public void testOverview() {
});
}

@Test
public void testOverviewFilterByTag() {
Arrays.asList(node2, node1)
.forEach(
instance -> {
given().get(
HOST
+ instance.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.OVERVIEW
+ "?node=node1")
.then()
.statusCode(200)
.body("projectVersion", notNullValue())
.body("totalSlot", equalTo("20"))
.body("workers", equalTo("1"));
});
}

@Test
public void testGetRunningThreads() {
Arrays.asList(node2, node1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import lombok.NonNull;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -280,7 +281,7 @@ private void restoreAllRunningJobFromMasterNodeSwitch() {
return;
}
// waiting have worker registered
while (getResourceManager().workerCount() == 0) {
while (getResourceManager().workerCount(Collections.emptyMap()) == 0) {
try {
logger.info("Waiting for worker registered");
Thread.sleep(1000);
Expand Down
Loading

0 comments on commit 9761ee9

Please sign in to comment.