Skip to content

Commit

Permalink
Merge branch 'main' into bz2/bump
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Nov 17, 2022
2 parents 8300ca7 + 2239ce1 commit bfe4b3b
Show file tree
Hide file tree
Showing 119 changed files with 1,567 additions and 966 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docs/relational_table/relational-table-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ In this doc, we will take HashAgg with extreme state (`max`, `min`) or value sta
[Code](https://github.com/risingwavelabs/risingwave/blob/7f9ad2240712aa0cfe3edffb4535d43b42f32cc5/src/frontend/src/optimizer/plan_node/logical_agg.rs#L144)

## Table id
For all relational table states, the keyspace must start with `table_id`. This is a globally unique id allocated in meta. Meta is responsible for traversing the Plan Tree and calculating the total number of Relational Tables needed. For example, the Hash Join Operator needs 2, one for the left table and one for the right table. The number of tables needed for Agg depends on the number of agg calls.
`table_id` is a globally unique id allocated in meta for each relational table object. Meta is responsible for traversing the Plan Tree and calculating the total number of Relational Tables needed. For example, the Hash Join Operator needs 2, one for the left table and one for the right table. The number of tables needed for Agg depends on the number of agg calls.

## Value State (Sum, Count)
Query example:
Expand Down
8 changes: 4 additions & 4 deletions docs/state-store-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ Hummock consists of a manager service on the meta node, clients on worker nodes

The streaming state store has distinguished workload characteristics.

* Every streaming executor will only ***read and write its own portion of data***, which are multiple consecutive non-overlapping ranges of keys (we call it ***key space***).
* Every streaming executor will only ***read and write its own portion of data***.
* Data (generally) ***won’t be shared across nodes***, so every worker node will only read and write its own data. Therefore, every Hummock API, like `get` or `scan`, only guarantees that writes on one node can be immediately read from the same node. In some cases, if we want to read data written from other nodes, we will need to ***wait for the epoch***.
* Streaming data are ***committed in serial***. Based on the [barrier-based checkpoint algorithm](https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm), the states are persisted epoch by epoch. We can tailor the write path specifically for the epoch-based checkpoint workload.

This leads to the design of Hummock, the cloud-native KV-based streaming state store. We’ll explain concepts like “epoch”, “key space” and “barrier” in the following chapters.
This leads to the design of Hummock, the cloud-native KV-based streaming state store. We’ll explain concepts like “epoch” and “barrier” in the following chapters.

## The Hummock User API

Expand Down Expand Up @@ -119,8 +119,8 @@ For `scan`, we simply select by overlapping key range. For point get, we will fi
Hummock implements the following iterators:
- `BlockIterator`: iterates a block of an SSTable.
- `SSTableIterator`: iterates an SSTable.
- `ConcatIterator`: iterates SSTables with non-overlapping keyspaces.
- `MergeIterator`: iterates SSTables with overlapping keyspaces.
- `ConcatIterator`: iterates SSTables with non-overlapping key ranges.
- `MergeIterator`: iterates SSTables with overlapping key ranges.
- `UserIterator`: wraps internal iterators and outputs user key-value with epoch <= read epoch.

[iterators source code](https://github.com/risingwavelabs/risingwave/tree/main/src/storage/src/hummock/iterator)
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/nexmark/create_sources.slt.part
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
CREATE SOURCE person (
CREATE MATERIALIZED SOURCE person (
"id" BIGINT,
"name" VARCHAR,
"email_address" VARCHAR,
Expand All @@ -17,7 +17,7 @@ CREATE SOURCE person (
) ROW FORMAT JSON;

statement ok
CREATE SOURCE auction (
CREATE MATERIALIZED SOURCE auction (
"id" BIGINT,
"item_name" VARCHAR,
"description" VARCHAR,
Expand Down
47 changes: 47 additions & 0 deletions proto/cdc_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
syntax = "proto3";

package cdc_service;

option optimize_for = SPEED;

// Notes: This proto needs to be self-contained
message Status {
enum Code {
UNSPECIFIED = 0;
OK = 1;
}
Code code = 1;
string message = 2;
}

message DbConnectorProperties {
string database_host = 1;
string database_port = 2;
string database_user = 3;
string database_password = 4;
string database_name = 5;
string table_name = 6;
string partition = 7;
string start_offset = 8;
bool include_schema_events = 9;
}

message CdcMessage {
string payload = 1;
string partition = 2;
string offset = 3;
}

message GetEventStreamRequest {
uint64 source_id = 1;
DbConnectorProperties properties = 2;
}

message GetEventStreamResponse {
uint64 source_id = 1;
repeated CdcMessage events = 2;
}

service CdcService {
rpc GetEventStream(GetEventStreamRequest) returns (stream GetEventStreamResponse);
}
3 changes: 3 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,12 @@ message UnpinSnapshotBeforeResponse {
common.Status status = 1;
}

// When `right_exclusive=false`, it represents [left, right], of which both boundary are open. When `right_exclusive=true`,
// it represents [left, right), of which right is exclusive.
message KeyRange {
bytes left = 1;
bytes right = 2;
bool right_exclusive = 3;
}

message TableOption {
Expand Down
3 changes: 3 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,9 @@ template:
# If `enable-tiered-cache` is true, hummock will use data directory as file cache.
enable-tiered-cache: false

# RPC endpoint for source connector node
connector-source-endpoint: "127.0.0.1:61261"

# Minio instances used by this compute node
provide-minio: "minio*"

Expand Down
133 changes: 0 additions & 133 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,136 +405,3 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
}
}
}

#[cfg(test)]
mod tests {
use std::collections::Bound::Unbounded;

use futures::StreamExt;
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId, TableOption};
use risingwave_common::row::Row;
use risingwave_common::types::DataType;
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::sort_util::OrderType;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::streaming_table::state_table::StateTable;
use risingwave_storage::table::Distribution;

use crate::executor::{Executor, RowSeqScanExecutor, ScanRange};

#[tokio::test]
async fn test_row_seq_scan() {
let state_store = MemoryStateStore::new();
let column_ids = vec![ColumnId::from(0), ColumnId::from(1), ColumnId::from(2)];
let column_descs = vec![
ColumnDesc::unnamed(column_ids[0], DataType::Int32),
ColumnDesc::unnamed(column_ids[1], DataType::Int32),
ColumnDesc::unnamed(column_ids[2], DataType::Int32),
];
let pk_indices = vec![0_usize, 1_usize];
let order_types = vec![OrderType::Ascending, OrderType::Descending];
let mut state = StateTable::new_without_distribution(
state_store.clone(),
TableId::from(0x42),
column_descs.clone(),
order_types.clone(),
pk_indices.clone(),
)
.await;
let column_ids_partial = vec![ColumnId::from(1), ColumnId::from(2)];
let value_indices: Vec<usize> = vec![0, 1, 2];
let table = StorageTable::new_partial(
state_store.clone(),
TableId::from(0x42),
column_descs.clone(),
column_ids_partial,
order_types.clone(),
pk_indices,
Distribution::fallback(),
TableOption::default(),
value_indices,
);
let epoch = EpochPair::new_test_epoch(1);
state.init_epoch(epoch);
epoch.inc();

state.insert(Row(vec![
Some(1_i32.into()),
Some(11_i32.into()),
Some(111_i32.into()),
]));
state.insert(Row(vec![
Some(2_i32.into()),
Some(22_i32.into()),
Some(222_i32.into()),
]));
state.insert(Row(vec![
Some(3_i32.into()),
Some(33_i32.into()),
Some(333_i32.into()),
]));

state.commit_for_test(epoch).await.unwrap();

let scan_range1 = ScanRange {
pk_prefix: Row(vec![Some(1_i32.into()), Some(11_i32.into())]),
next_col_bounds: (Unbounded, Unbounded),
};

let scan_range2 = ScanRange {
pk_prefix: Row(vec![Some(2_i32.into()), Some(22_i32.into())]),
next_col_bounds: (Unbounded, Unbounded),
};

let row_seq_scan_exec = RowSeqScanExecutor::new(
table.clone(),
vec![scan_range1, scan_range2],
epoch.curr,
1024,
"row_seq_scan_exec".to_string(),
None,
);

let point_get_row_seq_scan_exec = Box::new(row_seq_scan_exec);

let mut stream = point_get_row_seq_scan_exec.execute();
let chunk = stream.next().await.unwrap().unwrap();

assert_eq!(
chunk.row_at(0).0.to_owned_row(),
Row(vec![Some(11_i32.into()), Some(111_i32.into())])
);
assert_eq!(
chunk.row_at(1).0.to_owned_row(),
Row(vec![Some(22_i32.into()), Some(222_i32.into())])
);

let full_row_seq_scan_exec = RowSeqScanExecutor::new(
table,
vec![ScanRange::full()],
epoch.curr,
1024,
"row_seq_scan_exec".to_string(),
None,
);

let row_seq_scan_exec = Box::new(full_row_seq_scan_exec);

let mut stream = row_seq_scan_exec.execute();
let chunk = stream.next().await.unwrap().unwrap();

assert_eq!(
chunk.row_at(0).0.to_owned_row(),
Row(vec![Some(11_i32.into()), Some(111_i32.into())])
);
assert_eq!(
chunk.row_at(1).0.to_owned_row(),
Row(vec![Some(22_i32.into()), Some(222_i32.into())])
);
assert_eq!(
chunk.row_at(2).0.to_owned_row(),
Row(vec![Some(33_i32.into()), Some(333_i32.into())])
);
}
}
3 changes: 1 addition & 2 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
})
.await
{
// Prints the entire backtrace of error.
error!("Execution failed [{:?}]: {:?}", &task_id, &e);
error!("Execution failed [{:?}]: {}", &task_id, e);
let err_str = e.to_string();
*failure.lock() = Some(e);
if let Err(_e) = t_1
Expand Down
4 changes: 4 additions & 0 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ pub struct ComputeNodeOpts {
/// Left empty to disable file cache.
#[clap(long, default_value = "")]
pub file_cache_dir: String,

/// Endpoint of the connector node
#[clap(long, default_value = "127.0.0.1:60061")]
pub connector_source_endpoint: String,
}

use std::future::Future;
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub async fn compute_node_serve(
let stream_env = StreamEnvironment::new(
source_mgr,
client_addr.clone(),
opts.connector_source_endpoint,
stream_config,
worker_id,
state_store,
Expand Down
2 changes: 1 addition & 1 deletion src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use risingwave_source::table_test_utils::create_table_source_desc_builder;
use risingwave_source::{TableSourceManager, TableSourceManagerRef};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::streaming_table::state_table::StateTable;
use risingwave_stream::common::table::state_table::StateTable;
use risingwave_stream::error::StreamResult;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::state_table_handler::SourceStateTableHandler;
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ rand = "0.8"
rdkafka = { package = "madsim-rdkafka", version = "=0.2.8-alpha", features = ["cmake-build", "ssl-vendored", "gssapi"] }
risingwave_common = { path = "../common" }
risingwave_pb = { path = "../prost" }
risingwave_rpc_client = { path = "../rpc_client" }
risingwave_storage = { path = "../storage" }
serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
Expand Down
Loading

0 comments on commit bfe4b3b

Please sign in to comment.