diff --git a/Cargo.lock b/Cargo.lock index a2777cb1151d4..64469c1fc7763 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3865,9 +3865,9 @@ dependencies = [ [[package]] name = "pg_interval" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47354dbd658c57a5ee1cc97a79937345170234d4c817768de80ea6d2e9f5b98a" +checksum = "fe46640b465e284b048ef065cbed8ef17a622878d310c724578396b4cfd00df2" dependencies = [ "bytes", "chrono", @@ -6513,18 +6513,18 @@ checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" [[package]] name = "thiserror" -version = "1.0.32" +version = "1.0.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5f6586b7f764adc0231f4c79be7b920e766bb2f3e51b3661cdb263828f19994" +checksum = "8c1b05ca9d106ba7d2e31a9dab4a64e7be2cce415321966ea3132c49a656e252" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.32" +version = "1.0.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12bafc5b54507e0149cdf1b145a5d80ab80a90bcd9275df43d4fff68460f6c21" +checksum = "e8f2591983642de85c921015f3f070c665a197ed69e417af436115e3a1407487" dependencies = [ "proc-macro2", "quote", diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 197a2a9f9e0ee..bb23abf1b8da8 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -14,7 +14,7 @@ export RUST_TOOLCHAIN=$(cat ../rust-toolchain) # !!! CHANGE THIS WHEN YOU WANT TO BUMP CI IMAGE !!! # # AND ALSO docker-compose.yml # ###################################################### -export BUILD_ENV_VERSION=v20221020 +export BUILD_ENV_VERSION=v20221016 export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 5d71786c26545..30f91e4f5f543 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -42,12 +42,12 @@ services: retries: 5 rw-build-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20221020 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20221016 volumes: - ..:/risingwave regress-test-env: - image: public.ecr.aws/x5u3w5h6/rw-build-env:v20221020 + image: public.ecr.aws/x5u3w5h6/rw-build-env:v20221016 depends_on: db: condition: service_healthy diff --git a/rust-toolchain b/rust-toolchain index 185be365a4525..3a67502bb5cd9 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2022-07-29 +nightly-2022-10-16 diff --git a/src/batch/src/execution/grpc_exchange.rs b/src/batch/src/execution/grpc_exchange.rs index eb85a2f3253a3..8abed4c8fb11a 100644 --- a/src/batch/src/execution/grpc_exchange.rs +++ b/src/batch/src/execution/grpc_exchange.rs @@ -72,7 +72,7 @@ impl Debug for GrpcExchangeSource { } impl ExchangeSource for GrpcExchangeSource { - type TakeDataFuture<'a> = impl Future>>; + type TakeDataFuture<'a> = impl Future>> + 'a; fn take_data(&mut self) -> Self::TakeDataFuture<'_> { async { diff --git a/src/batch/src/execution/local_exchange.rs b/src/batch/src/execution/local_exchange.rs index 259f8550601a7..f098e6d219785 100644 --- a/src/batch/src/execution/local_exchange.rs +++ b/src/batch/src/execution/local_exchange.rs @@ -52,7 +52,7 @@ impl Debug for LocalExchangeSource { } impl ExchangeSource for LocalExchangeSource { - type TakeDataFuture<'a> = impl Future>>; + type TakeDataFuture<'a> = impl Future>> + 'a; fn take_data(&mut self) -> Self::TakeDataFuture<'_> { async { diff --git a/src/batch/src/executor/test_utils.rs b/src/batch/src/executor/test_utils.rs index f81073ecf9276..dcd1178443fbd 100644 --- a/src/batch/src/executor/test_utils.rs +++ b/src/batch/src/executor/test_utils.rs @@ -251,7 +251,7 @@ impl FakeExchangeSource { } impl ExchangeSource for FakeExchangeSource { - type TakeDataFuture<'a> = impl Future>>; + type TakeDataFuture<'a> = impl Future>> + 'a; fn take_data(&mut self) -> Self::TakeDataFuture<'_> { async { diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index 8bf9eb09a28b2..48a63bda8ef0d 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -15,7 +15,6 @@ #![expect(dead_code)] #![allow(clippy::derive_partial_eq_without_eq)] #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(exact_size_is_empty)] #![feature(type_alias_impl_trait)] @@ -26,6 +25,7 @@ #![feature(lint_reasons)] #![feature(binary_heap_into_iter_sorted)] #![recursion_limit = "256"] +#![feature(let_chains)] mod error; pub mod exchange_source; diff --git a/src/batch/src/task/broadcast_channel.rs b/src/batch/src/task/broadcast_channel.rs index 55f3d9ef0e865..4ff9c8506b124 100644 --- a/src/batch/src/task/broadcast_channel.rs +++ b/src/batch/src/task/broadcast_channel.rs @@ -42,7 +42,7 @@ impl Debug for BroadcastSender { } impl ChanSender for BroadcastSender { - type SendFuture<'a> = impl Future>; + type SendFuture<'a> = impl Future> + 'a; fn send(&mut self, chunk: Option) -> Self::SendFuture<'_> { async move { @@ -65,7 +65,7 @@ pub struct BroadcastReceiver { } impl ChanReceiver for BroadcastReceiver { - type RecvFuture<'a> = impl Future>>; + type RecvFuture<'a> = impl Future>> + 'a; fn recv(&mut self) -> Self::RecvFuture<'_> { async move { diff --git a/src/batch/src/task/env.rs b/src/batch/src/task/env.rs index 0f1ed6849f55a..7217a5724c400 100644 --- a/src/batch/src/task/env.rs +++ b/src/batch/src/task/env.rs @@ -106,7 +106,6 @@ impl BatchEnvironment { self.task_manager.clone() } - #[expect(clippy::explicit_auto_deref)] pub fn source_manager(&self) -> &TableSourceManager { &*self.source_manager } diff --git a/src/batch/src/task/fifo_channel.rs b/src/batch/src/task/fifo_channel.rs index a5cd02dd8fe67..03502295666ea 100644 --- a/src/batch/src/task/fifo_channel.rs +++ b/src/batch/src/task/fifo_channel.rs @@ -39,7 +39,7 @@ pub struct FifoReceiver { } impl ChanSender for FifoSender { - type SendFuture<'a> = impl Future>; + type SendFuture<'a> = impl Future> + 'a; fn send(&mut self, chunk: Option) -> Self::SendFuture<'_> { async { @@ -52,7 +52,7 @@ impl ChanSender for FifoSender { } impl ChanReceiver for FifoReceiver { - type RecvFuture<'a> = impl Future>>; + type RecvFuture<'a> = impl Future>> + 'a; fn recv(&mut self) -> Self::RecvFuture<'_> { async move { diff --git a/src/batch/src/task/hash_shuffle_channel.rs b/src/batch/src/task/hash_shuffle_channel.rs index 65b6d80d6a29b..26c4dd4bc9185 100644 --- a/src/batch/src/task/hash_shuffle_channel.rs +++ b/src/batch/src/task/hash_shuffle_channel.rs @@ -105,7 +105,7 @@ fn generate_new_data_chunks( } impl ChanSender for HashShuffleSender { - type SendFuture<'a> = impl Future>; + type SendFuture<'a> = impl Future> + 'a; fn send(&mut self, chunk: Option) -> Self::SendFuture<'_> { async move { @@ -150,7 +150,7 @@ impl HashShuffleSender { } impl ChanReceiver for HashShuffleReceiver { - type RecvFuture<'a> = impl Future>>; + type RecvFuture<'a> = impl Future>> + 'a; fn recv(&mut self) -> Self::RecvFuture<'_> { async move { diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index 809491436dfab..54ee5fec37b91 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -13,6 +13,7 @@ // limitations under the License. #![cfg_attr(coverage, feature(no_coverage))] +#![feature(let_chains)] use tikv_jemallocator::Jemalloc; diff --git a/src/common/src/error.rs b/src/common/src/error.rs index 7cd7b7f3a9169..6caa1d1ead310 100644 --- a/src/common/src/error.rs +++ b/src/common/src/error.rs @@ -227,7 +227,9 @@ impl Debug for RwError { "{}\n{}", self.inner, // Use inner error's backtrace by default, otherwise use the generated one in `From`. - self.inner.backtrace().unwrap_or(&*self.backtrace) + (&self.inner as &dyn std::error::Error) + .request_ref::() + .unwrap_or(&*self.backtrace) ) } } diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 1a568b755fb7a..501dff1b21f17 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -15,10 +15,8 @@ #![allow(rustdoc::private_intra_doc_links)] #![allow(clippy::derive_partial_eq_without_eq)] #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(is_sorted)] -#![feature(backtrace)] #![feature(fn_traits)] #![feature(type_alias_impl_trait)] #![feature(test)] @@ -28,6 +26,9 @@ #![feature(generators)] #![feature(map_try_insert)] #![feature(once_cell)] +#![feature(let_chains)] +#![feature(error_generic_member_access)] +#![feature(provide_any)] #[macro_use] pub mod error; diff --git a/src/common/src/types/chrono_wrapper.rs b/src/common/src/types/chrono_wrapper.rs index 8b15766251f97..e54b81328db6c 100644 --- a/src/common/src/types/chrono_wrapper.rs +++ b/src/common/src/types/chrono_wrapper.rs @@ -78,7 +78,7 @@ impl NaiveDateWrapper { pub fn with_days_value(days: i32) -> value_encoding::Result { Ok(NaiveDateWrapper::new( NaiveDate::from_num_days_from_ce_opt(days) - .ok_or(ValueEncodingError::InvalidNaiveDateEncoding(days))?, + .ok_or_else(|| ValueEncodingError::InvalidNaiveDateEncoding(days))?, )) } @@ -116,7 +116,7 @@ impl NaiveTimeWrapper { pub fn with_secs_nano_value(secs: u32, nano: u32) -> value_encoding::Result { Ok(NaiveTimeWrapper::new( NaiveTime::from_num_seconds_from_midnight_opt(secs, nano) - .ok_or(ValueEncodingError::InvalidNaiveTimeEncoding(secs, nano))?, + .ok_or_else(|| ValueEncodingError::InvalidNaiveTimeEncoding(secs, nano))?, )) } @@ -163,9 +163,8 @@ impl NaiveDateTimeWrapper { pub fn with_secs_nsecs_value(secs: i64, nsecs: u32) -> value_encoding::Result { Ok(NaiveDateTimeWrapper::new({ - NaiveDateTime::from_timestamp_opt(secs, nsecs).ok_or( - ValueEncodingError::InvalidNaiveDateTimeEncoding(secs, nsecs), - )? + NaiveDateTime::from_timestamp_opt(secs, nsecs) + .ok_or_else(|| ValueEncodingError::InvalidNaiveDateTimeEncoding(secs, nsecs))? })) } diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 1130af170039a..d2a75509f711a 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -14,8 +14,6 @@ #![feature(trait_alias)] #![feature(binary_heap_drain_sorted)] -#![feature(generic_associated_types)] -#![feature(let_else)] #![feature(generators)] #![feature(type_alias_impl_trait)] #![cfg_attr(coverage, feature(no_coverage))] diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 32f20d149c2e8..66bd82c285358 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -19,11 +19,11 @@ #![feature(stmt_expr_attributes)] #![feature(box_patterns)] #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(lint_reasons)] #![feature(once_cell)] #![feature(result_option_inspect)] +#![feature(let_chains)] pub mod aws_utils; pub mod error; diff --git a/src/ctl/src/cmd_impl/hummock/list_kv.rs b/src/ctl/src/cmd_impl/hummock/list_kv.rs index 049bcb991c400..a5db997d3f3cd 100644 --- a/src/ctl/src/cmd_impl/hummock/list_kv.rs +++ b/src/ctl/src/cmd_impl/hummock/list_kv.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; + use bytes::{Buf, BufMut, BytesMut}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::next_key; @@ -30,9 +32,12 @@ pub async fn list_kv(epoch: u64, table_id: u32) -> anyhow::Result<()> { let mut buf = BytesMut::with_capacity(5); buf.put_u8(b't'); buf.put_u32(table_id); - let range = buf.to_vec()..next_key(buf.to_vec().as_slice()); + let range = ( + Bound::Included(buf.to_vec()), + Bound::Excluded(next_key(buf.to_vec().as_slice())), + ); hummock - .scan::<_, Vec>( + .scan( None, range, None, diff --git a/src/expr/src/lib.rs b/src/expr/src/lib.rs index a2c4397cbbadf..f8ab8196debb7 100644 --- a/src/expr/src/lib.rs +++ b/src/expr/src/lib.rs @@ -14,14 +14,12 @@ #![allow(rustdoc::private_intra_doc_links)] #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(binary_heap_into_iter_sorted)] #![feature(is_sorted)] -#![feature(backtrace)] #![feature(fn_traits)] #![feature(assert_matches)] -#![feature(let_else)] +#![feature(let_chains)] #![feature(lint_reasons)] #![feature(type_alias_impl_trait)] #![feature(generators)] diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 84c7df00d7093..2b263247cd998 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(label_break_value)] +#![feature(let_chains)] #![allow(clippy::derive_partial_eq_without_eq)] //! Data-driven tests. diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 768f872de755a..b35100044dd85 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -18,12 +18,11 @@ #![feature(negative_impls)] #![feature(generators)] #![feature(proc_macro_hygiene, stmt_expr_attributes)] -#![feature(let_else)] #![feature(trait_alias)] #![feature(drain_filter)] #![feature(if_let_guard)] +#![feature(let_chains)] #![feature(assert_matches)] -#![feature(map_first_last)] #![feature(lint_reasons)] #![feature(box_patterns)] #![feature(once_cell)] diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index d4a43e6c3789a..69fa38404b372 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -366,7 +366,6 @@ impl FrontendEnv { } /// Get a reference to the frontend env's catalog writer. - #[expect(clippy::explicit_auto_deref)] pub fn catalog_writer(&self) -> &dyn CatalogWriter { &*self.catalog_writer } @@ -377,7 +376,6 @@ impl FrontendEnv { } /// Get a reference to the frontend env's user info writer. - #[expect(clippy::explicit_auto_deref)] pub fn user_info_writer(&self) -> &dyn UserInfoWriter { &*self.user_info_writer } @@ -395,7 +393,6 @@ impl FrontendEnv { self.worker_node_manager.clone() } - #[expect(clippy::explicit_auto_deref)] pub fn meta_client(&self) -> &dyn FrontendMetaClient { &*self.meta_client } diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 97ef0989155c2..43d0cfb750d8e 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -78,7 +78,7 @@ impl std::fmt::Debug for MetaError { write!(f, "{}", self.inner)?; writeln!(f)?; - if let Some(backtrace) = self.inner.backtrace() { + if let Some(backtrace) = (&self.inner as &dyn Error).request_ref::() { write!(f, " backtrace of inner error:\n{}", backtrace)?; } else { write!(f, " backtrace of `MetaError`:\n{}", self.backtrace)?; diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index fcbad1dbbcc29..285f2fd5273d0 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -12,24 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(backtrace)] +#![expect(clippy::iter_kv_map, reason = "FIXME: fix later")] +#![expect( + clippy::or_fun_call, + reason = "https://github.com/rust-lang/rust-clippy/issues/8574" +)] #![allow(clippy::derive_partial_eq_without_eq)] #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(option_result_contains)] -#![feature(let_else)] #![feature(type_alias_impl_trait)] -#![feature(map_first_last)] #![feature(drain_filter)] #![feature(custom_test_frameworks)] #![feature(lint_reasons)] #![feature(map_try_insert)] #![feature(hash_drain_filter)] -#![feature(is_some_with)] +#![feature(is_some_and)] #![feature(btree_drain_filter)] #![feature(result_option_inspect)] #![feature(once_cell)] +#![feature(let_chains)] +#![feature(error_generic_member_access)] +#![feature(provide_any)] #![cfg_attr(coverage, feature(no_coverage))] #![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)] @@ -188,8 +192,8 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { let barrier_interval = Duration::from_millis(meta_config.streaming.barrier_interval_ms as u64); let max_idle_ms = opts.dangerous_max_idle_secs.unwrap_or(0) * 1000; - let in_flight_barrier_nums = meta_config.streaming.in_flight_barrier_nums as usize; - let checkpoint_frequency = meta_config.streaming.checkpoint_frequency as usize; + let in_flight_barrier_nums = meta_config.streaming.in_flight_barrier_nums; + let checkpoint_frequency = meta_config.streaming.checkpoint_frequency; tracing::info!("Meta server listening at {}", listen_addr); let add_info = AddressInfo { diff --git a/src/object_store/src/lib.rs b/src/object_store/src/lib.rs index 482547fb2532d..96d86a4e67da9 100644 --- a/src/object_store/src/lib.rs +++ b/src/object_store/src/lib.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(backtrace)] -#![feature(generic_associated_types)] #![feature(trait_alias)] #![feature(type_alias_impl_trait)] #![feature(once_cell)] #![feature(lint_reasons)] +#![feature(let_chains)] +#![feature(error_generic_member_access)] +#![feature(provide_any)] pub mod object; diff --git a/src/object_store/src/object/error.rs b/src/object_store/src/object/error.rs index cac98cb396023..080a43a5d99dc 100644 --- a/src/object_store/src/object/error.rs +++ b/src/object_store/src/object/error.rs @@ -50,7 +50,7 @@ impl std::fmt::Debug for ObjectError { write!(f, "{}", self.inner)?; writeln!(f)?; - if let Some(backtrace) = self.inner.backtrace() { + if let Some(backtrace) = (&self.inner as &dyn Error).request_ref::() { write!(f, " backtrace of inner error:\n{}", backtrace)?; } else { write!(f, " backtrace of `ObjectError`:\n{}", self.backtrace)?; diff --git a/src/risedevtool/src/bin/risedev-config.rs b/src/risedevtool/src/bin/risedev-config.rs index 3df7ae05b7535..b691df37e837d 100644 --- a/src/risedevtool/src/bin/risedev-config.rs +++ b/src/risedevtool/src/bin/risedev-config.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(let_else)] #![allow(clippy::needless_question_mark)] use std::fs::OpenOptions; diff --git a/src/risedevtool/src/lib.rs b/src/risedevtool/src/lib.rs index ce8268fb6c8bb..6cd62695c9c6d 100644 --- a/src/risedevtool/src/lib.rs +++ b/src/risedevtool/src/lib.rs @@ -14,7 +14,7 @@ #![allow(clippy::derive_partial_eq_without_eq)] #![feature(exit_status_error)] -#![feature(let_else)] +#![feature(let_chains)] #![feature(lint_reasons)] mod config; diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 58937a7b8946b..c60d7763f141e 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(result_option_inspect)] #![feature(type_alias_impl_trait)] diff --git a/src/source/src/lib.rs b/src/source/src/lib.rs index 17f4c27d864d0..d8b407794d712 100644 --- a/src/source/src/lib.rs +++ b/src/source/src/lib.rs @@ -15,7 +15,6 @@ #![allow(clippy::derive_partial_eq_without_eq)] #![allow(rustdoc::private_intra_doc_links)] #![feature(trait_alias)] -#![feature(generic_associated_types)] #![feature(binary_heap_drain_sorted)] #![feature(lint_reasons)] #![feature(result_option_inspect)] diff --git a/src/storage/compaction_test/src/lib.rs b/src/storage/compaction_test/src/lib.rs index e7f6cd572550e..c70b334b3483b 100644 --- a/src/storage/compaction_test/src/lib.rs +++ b/src/storage/compaction_test/src/lib.rs @@ -23,7 +23,6 @@ #![warn(clippy::no_effect_underscore_binding)] #![warn(clippy::await_holding_lock)] #![deny(rustdoc::broken_intra_doc_links)] -#![feature(let_else)] mod server; diff --git a/src/storage/compaction_test/src/server.rs b/src/storage/compaction_test/src/server.rs index 2d7ba8d07dc73..e6ce674a79ef8 100644 --- a/src/storage/compaction_test/src/server.rs +++ b/src/storage/compaction_test/src/server.rs @@ -14,6 +14,7 @@ use std::collections::{BTreeMap, HashSet}; use std::net::SocketAddr; +use std::ops::Bound; use std::sync::Arc; use std::time::Duration; @@ -385,9 +386,9 @@ async fn open_hummock_iters( let mut results = BTreeMap::new(); for &epoch in snapshots.iter() { let iter = hummock - .iter::<_, Vec>( + .iter( None, - .., + (Bound::Unbounded, Bound::Unbounded), ReadOptions { epoch, table_id: TableId { table_id }, diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 2b15b7601d035..85e9879338b93 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(let_else)] - mod compactor_observer; mod rpc; mod server; diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index dd60b2bfcca97..2102038c9e5b9 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -16,6 +16,7 @@ mod tests { use std::collections::{BTreeSet, HashMap}; + use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; @@ -675,9 +676,9 @@ mod tests { // 7. scan kv to check key table_id let scan_result = storage - .scan::<_, Vec>( + .scan( None, - .., + (Bound::Unbounded, Bound::Unbounded), None, ReadOptions { epoch, @@ -844,9 +845,9 @@ mod tests { // 6. scan kv to check key table_id let scan_result = storage - .scan::<_, Vec>( + .scan( None, - .., + (Bound::Unbounded, Bound::Unbounded), None, ReadOptions { epoch, @@ -1016,7 +1017,10 @@ mod tests { let scan_result = storage .scan( Some(bloom_filter_key), - start_bound_key..end_bound_key, + ( + Bound::Included(start_bound_key), + Bound::Excluded(end_bound_key), + ), None, ReadOptions { epoch, diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index c797df3d2c5cd..ce3ed2634b600 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; @@ -134,7 +135,7 @@ async fn test_failpoints_state_store_read_upload() { let result = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: 2, table_id: Default::default(), @@ -193,7 +194,7 @@ async fn test_failpoints_state_store_read_upload() { let mut iters = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: 5, table_id: Default::default(), diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index 706b9a4703946..211a4beff8662 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; @@ -31,7 +32,7 @@ use crate::test_utils::get_test_notification_client; macro_rules! assert_count_range_scan { ($storage:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{ let mut it = $storage - .iter::<_, Vec>( + .iter( None, $range, ReadOptions { @@ -56,7 +57,7 @@ macro_rules! assert_count_range_scan { macro_rules! assert_count_backward_range_scan { ($storage:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{ let mut it = $storage - .backward_iter::<_, Vec>( + .backward_iter( $range, ReadOptions { epoch: $epoch, @@ -127,7 +128,12 @@ async fn test_snapshot_inner(enable_sync: bool, enable_commit: bool) { .unwrap(); } } - assert_count_range_scan!(hummock_storage, .., 2, epoch1); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 2, + epoch1 + ); let epoch2 = epoch1 + 1; hummock_storage @@ -161,8 +167,18 @@ async fn test_snapshot_inner(enable_sync: bool, enable_commit: bool) { .unwrap(); } } - assert_count_range_scan!(hummock_storage, .., 3, epoch2); - assert_count_range_scan!(hummock_storage, .., 2, epoch1); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 3, + epoch2 + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 2, + epoch1 + ); let epoch3 = epoch2 + 1; hummock_storage @@ -196,9 +212,24 @@ async fn test_snapshot_inner(enable_sync: bool, enable_commit: bool) { .unwrap(); } } - assert_count_range_scan!(hummock_storage, .., 0, epoch3); - assert_count_range_scan!(hummock_storage, .., 3, epoch2); - assert_count_range_scan!(hummock_storage, .., 2, epoch1); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 0, + epoch3 + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 3, + epoch2 + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 2, + epoch1 + ); } async fn test_snapshot_range_scan_inner(enable_sync: bool, enable_commit: bool) { @@ -259,12 +290,42 @@ async fn test_snapshot_range_scan_inner(enable_sync: bool, enable_commit: bool) }; } - assert_count_range_scan!(hummock_storage, key!(2)..=key!(3), 2, epoch); - assert_count_range_scan!(hummock_storage, key!(2)..key!(3), 1, epoch); - assert_count_range_scan!(hummock_storage, key!(2).., 3, epoch); - assert_count_range_scan!(hummock_storage, ..=key!(3), 3, epoch); - assert_count_range_scan!(hummock_storage, ..key!(3), 2, epoch); - assert_count_range_scan!(hummock_storage, .., 4, epoch); + assert_count_range_scan!( + hummock_storage, + (Bound::Included(key!(2)), Bound::Included(key!(3))), + 2, + epoch + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Included(key!(2)), Bound::Excluded(key!(3))), + 1, + epoch + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Included(key!(2)), Bound::Unbounded), + 3, + epoch + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Included(key!(3))), + 3, + epoch + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Excluded(key!(3))), + 2, + epoch + ); + assert_count_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 4, + epoch + ); } #[ignore] @@ -360,14 +421,54 @@ async fn test_snapshot_backward_range_scan_inner(enable_sync: bool, enable_commi }; } - assert_count_backward_range_scan!(hummock_storage, key!(3)..=key!(2), 2, epoch); - assert_count_backward_range_scan!(hummock_storage, key!(3)..key!(2), 1, epoch); - assert_count_backward_range_scan!(hummock_storage, key!(3)..key!(1), 2, epoch); - assert_count_backward_range_scan!(hummock_storage, key!(3)..=key!(1), 3, epoch); - assert_count_backward_range_scan!(hummock_storage, key!(3)..key!(0), 3, epoch); - assert_count_backward_range_scan!(hummock_storage, .., 6, epoch); - assert_count_backward_range_scan!(hummock_storage, .., 8, epoch + 1); - assert_count_backward_range_scan!(hummock_storage, key!(7)..key!(2), 5, epoch + 1); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Included(key!(3)), Bound::Included(key!(2))), + 2, + epoch + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Included(key!(3)), Bound::Excluded(key!(2))), + 1, + epoch + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Included(key!(3)), Bound::Excluded(key!(1))), + 2, + epoch + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Included(key!(3)), Bound::Included(key!(1))), + 3, + epoch + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Included(key!(3)), Bound::Excluded(key!(0))), + 3, + epoch + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 6, + epoch + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Unbounded, Bound::Unbounded), + 8, + epoch + 1 + ); + assert_count_backward_range_scan!( + hummock_storage, + (Bound::Included(key!(7)), Bound::Excluded(key!(2))), + 5, + epoch + 1 + ); } #[tokio::test] diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 13eea5cb8d6eb..c2db671e39c55 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; @@ -213,7 +214,7 @@ async fn test_basic() { let mut iter = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: epoch1, table_id: Default::default(), @@ -260,7 +261,7 @@ async fn test_basic() { let mut iter = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: epoch2, table_id: Default::default(), @@ -276,7 +277,7 @@ async fn test_basic() { let mut iter = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: epoch3, table_id: Default::default(), @@ -576,7 +577,7 @@ async fn test_reload_storage() { let mut iter = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: epoch1, table_id: Default::default(), @@ -623,7 +624,7 @@ async fn test_reload_storage() { let mut iter = hummock_storage .iter( None, - ..=b"ee".to_vec(), + (Bound::Unbounded, Bound::Included(b"ee".to_vec())), ReadOptions { epoch: epoch2, table_id: Default::default(), @@ -715,7 +716,10 @@ async fn test_write_anytime() { let mut iter = hummock_storage .iter( None, - "aa".as_bytes()..="cc".as_bytes(), + ( + Bound::Included(b"aa".to_vec()), + Bound::Included(b"cc".to_vec()), + ), ReadOptions { epoch, table_id: Default::default(), @@ -810,7 +814,10 @@ async fn test_write_anytime() { let mut iter = hummock_storage .iter( None, - "aa".as_bytes()..="cc".as_bytes(), + ( + Bound::Included(b"aa".to_vec()), + Bound::Included(b"cc".to_vec()), + ), ReadOptions { epoch, table_id: Default::default(), diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 8595b5e33415c..0773461bc6745 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -47,11 +47,12 @@ impl From for RwError { impl std::fmt::Debug for StorageError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use std::backtrace::Backtrace; use std::error::Error; write!(f, "{}", self)?; writeln!(f)?; - if let Some(backtrace) = self.backtrace() { + if let Some(backtrace) = (&self as &dyn Error).request_ref::() { // Since we forward all backtraces from source, `self.backtrace()` is the backtrace of // inner error. write!(f, " backtrace of inner error:\n{}", backtrace)?; diff --git a/src/storage/src/hummock/error.rs b/src/storage/src/hummock/error.rs index da5d8ccb89761..a7fe279db6210 100644 --- a/src/storage/src/hummock/error.rs +++ b/src/storage/src/hummock/error.rs @@ -160,7 +160,7 @@ impl std::fmt::Debug for HummockError { write!(f, "{}", self.inner)?; writeln!(f)?; - if let Some(backtrace) = self.inner.backtrace() { + if let Some(backtrace) = (&self.inner as &dyn Error).request_ref::() { write!(f, " backtrace of inner error:\n{}", backtrace)?; } else { write!( diff --git a/src/storage/src/hummock/iterator/backward_user.rs b/src/storage/src/hummock/iterator/backward_user.rs index f558e0331a5fc..a768e7a973b76 100644 --- a/src/storage/src/hummock/iterator/backward_user.rs +++ b/src/storage/src/hummock/iterator/backward_user.rs @@ -298,15 +298,13 @@ impl DirectedUserIteratorBuilder for BackwardUserIterator, - >, + iterator_iter: Vec>, key_range: (Bound>, Bound>), read_epoch: u64, min_epoch: u64, version: Option, ) -> DirectedUserIterator { - let iterator = UnorderedMergeIteratorInner::new(iterator_iter); + let iterator = UnorderedMergeIteratorInner::new(iterator_iter.into_iter()); DirectedUserIterator::Backward(BackwardUserIterator::with_epoch( iterator, key_range, read_epoch, min_epoch, version, )) diff --git a/src/storage/src/hummock/iterator/forward_user.rs b/src/storage/src/hummock/iterator/forward_user.rs index fbc112fb82f9a..1325ef1a96a25 100644 --- a/src/storage/src/hummock/iterator/forward_user.rs +++ b/src/storage/src/hummock/iterator/forward_user.rs @@ -235,13 +235,13 @@ impl DirectedUserIteratorBuilder for UserIterator { type SstableIteratorType = SstableIterator; fn create( - iterator_iter: impl IntoIterator>, + iterator_iter: Vec>, key_range: (Bound>, Bound>), read_epoch: u64, min_epoch: u64, version: Option, ) -> DirectedUserIterator { - let iterator = UnorderedMergeIteratorInner::new(iterator_iter); + let iterator = UnorderedMergeIteratorInner::new(iterator_iter.into_iter()); DirectedUserIterator::Forward(Self::new( iterator, key_range, read_epoch, min_epoch, version, )) diff --git a/src/storage/src/hummock/iterator/merge_inner.rs b/src/storage/src/hummock/iterator/merge_inner.rs index fb9862af210f2..654398ddcbdb0 100644 --- a/src/storage/src/hummock/iterator/merge_inner.rs +++ b/src/storage/src/hummock/iterator/merge_inner.rs @@ -17,6 +17,8 @@ use std::collections::binary_heap::PeekMut; use std::collections::{BinaryHeap, LinkedList}; use std::future::Future; +use futures::future::BoxFuture; +use futures::FutureExt; use risingwave_hummock_sdk::VersionedComparator; use crate::hummock::iterator::{DirectionEnum, HummockIterator, HummockIteratorDirection}; @@ -24,7 +26,7 @@ use crate::hummock::value::HummockValue; use crate::hummock::HummockResult; use crate::monitor::StoreLocalStatistic; -pub trait NodeExtraOrderInfo: Eq + Ord + Send + Sync {} +pub trait NodeExtraOrderInfo: Eq + Ord + Send + Sync + 'static {} /// For unordered merge iterator, no extra order info is needed. type UnorderedNodeExtra = (); @@ -146,16 +148,16 @@ impl MergeIteratorInner { pub type UnorderedMergeIteratorInner = MergeIteratorInner; -impl UnorderedMergeIteratorInner { - pub fn new(iterators: impl IntoIterator) -> Self { +impl UnorderedMergeIteratorInner { + pub fn new(iterators: impl IntoIterator + 'static) -> Self { Self::create(iterators) } - pub fn for_compactor(iterators: impl IntoIterator) -> Self { + pub fn for_compactor(iterators: impl IntoIterator + 'static) -> Self { Self::create(iterators) } - fn create(iterators: impl IntoIterator) -> Self { + fn create(iterators: impl IntoIterator + 'static) -> Self { Self { unused_iters: iterators .into_iter() @@ -188,6 +190,32 @@ where .drain_filter(|i| i.iter.is_valid()) .collect(); } + + // TODO(chi): workaround for Rust toolchain 2022-10-16 + + async fn rewind_inner(&mut self) -> HummockResult<()> { + self.reset_heap(); + futures::future::try_join_all( + self.unused_iters + .iter_mut() + .map(|x| x.iter.rewind().boxed()), + ) + .await?; + self.build_heap(); + Ok(()) + } + + async fn seek_inner(&mut self, key: &[u8]) -> HummockResult<()> { + self.reset_heap(); + futures::future::try_join_all( + self.unused_iters + .iter_mut() + .map(|x| x.iter.seek(key).boxed()), + ) + .await?; + self.build_heap(); + Ok(()) + } } /// The behaviour of `next` of order aware merge iterator is different from the normal one, so we @@ -199,87 +227,114 @@ trait MergeIteratorNext { fn next_inner(&mut self) -> Self::HummockResultFuture<'_>; } -impl MergeIteratorNext for OrderedMergeIteratorInner { - type HummockResultFuture<'a> = impl Future>; +// TODO(chi): workaround for Rust toolchain 2022-10-16, removed this later +fn unsafe_boxed_static_future(f: F) -> BoxFuture<'static, O> +where + F: Future + Send, +{ + let b = f.boxed(); + unsafe { std::mem::transmute::, BoxFuture<'static, O>>(b) } +} - fn next_inner(&mut self) -> Self::HummockResultFuture<'_> { - async { - let top_node = self.heap.pop().expect("no inner iter"); - let mut popped_nodes = vec![]; - - // Take all nodes with the same current key as the top_node out of the heap. - while let Some(next_node) = self.heap.peek_mut() { - match VersionedComparator::compare_key(top_node.iter.key(), next_node.iter.key()) { - Ordering::Equal => { - popped_nodes.push(PeekMut::pop(next_node)); - } - _ => break, +impl OrderedMergeIteratorInner { + async fn next_inner_inner(&mut self) -> HummockResult<()> { + let top_node = self.heap.pop().expect("no inner iter"); + let mut popped_nodes = vec![]; + + // Take all nodes with the same current key as the top_node out of the heap. + while let Some(next_node) = self.heap.peek_mut() { + match VersionedComparator::compare_key(top_node.iter.key(), next_node.iter.key()) { + Ordering::Equal => { + popped_nodes.push(PeekMut::pop(next_node)); } + _ => break, } + } - popped_nodes.push(top_node); - - // WARNING: within scope of BinaryHeap::PeekMut, we must carefully handle all places of - // return. Once the iterator enters an invalid state, we should remove it from heap - // before returning. - - // Put the popped nodes back to the heap if valid or unused_iters if invalid. - for mut node in popped_nodes { - match node.iter.next().await { - Ok(_) => {} - Err(e) => { - // If the iterator returns error, we should clear the heap, so that this - // iterator becomes invalid. - self.heap.clear(); - return Err(e); - } - } + popped_nodes.push(top_node); - if !node.iter.is_valid() { - self.unused_iters.push_back(node); - } else { - self.heap.push(node); - } - } + // WARNING: within scope of BinaryHeap::PeekMut, we must carefully handle all places of + // return. Once the iterator enters an invalid state, we should remove it from heap + // before returning. - Ok(()) - } - } -} - -impl MergeIteratorNext for UnorderedMergeIteratorInner { - type HummockResultFuture<'a> = impl Future>; + // Put the popped nodes back to the heap if valid or unused_iters if invalid. - fn next_inner(&mut self) -> Self::HummockResultFuture<'_> { - async { - let mut node = self.heap.peek_mut().expect("no inner iter"); + // TODO(chi): workaround for Rust toolchain 2022-10-16, removed boxed() later - // WARNING: within scope of BinaryHeap::PeekMut, we must carefully handle all places of - // return. Once the iterator enters an invalid state, we should remove it from heap - // before returning. + for mut node in popped_nodes { + let f = unsafe_boxed_static_future(node.iter.next()); - match node.iter.next().await { + match f.await { Ok(_) => {} Err(e) => { // If the iterator returns error, we should clear the heap, so that this // iterator becomes invalid. - PeekMut::pop(node); self.heap.clear(); return Err(e); } } if !node.iter.is_valid() { - // Put back to `unused_iters` - let node = PeekMut::pop(node); self.unused_iters.push_back(node); } else { - // This will update the heap top. - drop(node); + self.heap.push(node); } + } - Ok(()) + Ok(()) + } +} + +impl MergeIteratorNext for OrderedMergeIteratorInner { + type HummockResultFuture<'a> = impl Future> + 'a; + + fn next_inner(&mut self) -> Self::HummockResultFuture<'_> { + self.next_inner_inner() + } +} + +impl UnorderedMergeIteratorInner { + async fn next_inner_inner(&mut self) -> HummockResult<()> { + let mut node = self.heap.peek_mut().expect("no inner iter"); + + // WARNING: within scope of BinaryHeap::PeekMut, we must carefully handle all places of + // return. Once the iterator enters an invalid state, we should remove it from heap + // before returning. + + // TODO(chi): workaround for Rust toolchain 2022-10-16, removed boxed() later + + let f = unsafe_boxed_static_future(node.iter.next()); + + match f.await { + Ok(_) => {} + Err(e) => { + // If the iterator returns error, we should clear the heap, so that this + // iterator becomes invalid. + PeekMut::pop(node); + self.heap.clear(); + return Err(e); + } + } + + if !node.iter.is_valid() { + // Put back to `unused_iters` + let node = PeekMut::pop(node); + self.unused_iters.push_back(node); + } else { + // This will update the heap top. + drop(node); } + + Ok(()) + } +} + +impl MergeIteratorNext for UnorderedMergeIteratorInner { + type HummockResultFuture<'a> = impl Future> + 'a; + + fn next_inner(&mut self) -> Self::HummockResultFuture<'_> { + self.next_inner_inner() + // async move { unimplemented!() } } } @@ -311,23 +366,11 @@ where } fn rewind(&mut self) -> Self::RewindFuture<'_> { - async move { - self.reset_heap(); - futures::future::try_join_all(self.unused_iters.iter_mut().map(|x| x.iter.rewind())) - .await?; - self.build_heap(); - Ok(()) - } + self.rewind_inner() } fn seek<'a>(&'a mut self, key: &'a [u8]) -> Self::SeekFuture<'a> { - async move { - self.reset_heap(); - futures::future::try_join_all(self.unused_iters.iter_mut().map(|x| x.iter.seek(key))) - .await?; - self.build_heap(); - Ok(()) - } + self.seek_inner(key) } fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { diff --git a/src/storage/src/hummock/iterator/mod.rs b/src/storage/src/hummock/iterator/mod.rs index 64f0522c92fb6..dce7acd13b086 100644 --- a/src/storage/src/hummock/iterator/mod.rs +++ b/src/storage/src/hummock/iterator/mod.rs @@ -362,15 +362,13 @@ pub enum DirectedUserIterator { Backward(BackwardUserIterator), } -pub trait DirectedUserIteratorBuilder { +pub trait DirectedUserIteratorBuilder: 'static { type Direction: HummockIteratorDirection; type SstableIteratorType: SstableIteratorType; /// Initialize an `DirectedUserIterator`. /// The `key_range` should be from smaller key to larger key. fn create( - iterator_iter: impl IntoIterator< - Item = UserIteratorPayloadType, - >, + iterator_iter: Vec>, key_range: (Bound>, Bound>), read_epoch: u64, min_epoch: u64, diff --git a/src/storage/src/hummock/state_store.rs b/src/storage/src/hummock/state_store.rs index 38f4d6fba54db..df847e5cf7231 100644 --- a/src/storage/src/hummock/state_store.rs +++ b/src/storage/src/hummock/state_store.rs @@ -15,7 +15,7 @@ use std::cmp::Ordering; use std::future::Future; use std::ops::Bound::{Excluded, Included}; -use std::ops::RangeBounds; +use std::ops::{Bound, RangeBounds}; use std::sync::Arc; use bytes::Bytes; @@ -54,7 +54,7 @@ use crate::storage_value::StorageValue; use crate::store::*; use crate::{define_state_store_associated_type, StateStore, StateStoreIter}; -pub(crate) trait HummockIteratorType { +pub(crate) trait HummockIteratorType: 'static { type Direction: HummockIteratorDirection; type SstableIteratorType: SstableIteratorType; type UserIteratorBuilder: DirectedUserIteratorBuilder< @@ -90,9 +90,9 @@ impl HummockStorage { /// If `Ok(Some())` is returned, the key is found. If `Ok(None)` is returned, /// the key is not found. If `Err()` is returned, the searching for the key /// failed due to other non-EOF errors. - pub async fn get<'a>( - &'a self, - key: &'a [u8], + pub async fn get( + &self, + key: &[u8], check_bloom_filter: bool, read_options: ReadOptions, ) -> StorageResult> { @@ -258,15 +258,13 @@ impl HummockStorage { } #[allow(dead_code)] - async fn old_iter_inner( + async fn old_iter_inner( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, ) -> StorageResult where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, T: HummockIteratorType, { let epoch = read_options.epoch; @@ -415,11 +413,6 @@ impl HummockStorage { .with_label_values(&["sub-iter"]) .observe(overlapped_iters.len() as f64); - let key_range = ( - key_range.start_bound().map(|b| b.as_ref().to_owned()), - key_range.end_bound().map(|b| b.as_ref().to_owned()), - ); - // The input of the user iterator is a `HummockIteratorUnion` of 4 different types. We use // the union because the underlying merge iterator let mut user_iterator = T::UserIteratorBuilder::create( @@ -434,6 +427,7 @@ impl HummockStorage { .rewind() .in_span(Span::enter_with_local_parent("rewind")) .await?; + local_stats.report(self.stats.as_ref()); Ok(HummockStateStoreIter::new( user_iterator, @@ -453,20 +447,16 @@ impl StateStore for HummockStorage { check_bloom_filter: bool, read_options: ReadOptions, ) -> Self::GetFuture<'_> { - async move { self.get(key, check_bloom_filter, read_options).await } + self.get(key, check_bloom_filter, read_options) } - fn scan( + fn scan( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), limit: Option, read_options: ReadOptions, - ) -> Self::ScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::ScanFuture<'_> { async move { self.iter(prefix_hint, key_range, read_options) .await? @@ -475,16 +465,12 @@ impl StateStore for HummockStorage { } } - fn backward_scan( + fn backward_scan( &self, - _key_range: R, + _key_range: (Bound>, Bound>), _limit: Option, _read_options: ReadOptions, - ) -> Self::BackwardScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardScanFuture<'_> { async move { unimplemented!() } } @@ -507,16 +493,12 @@ impl StateStore for HummockStorage { /// Returns an iterator that scan from the begin key to the end key /// The result is based on a snapshot corresponding to the given `epoch`. - fn iter( + fn iter( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, - ) -> Self::IterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::IterFuture<'_> { if let Some(prefix_hint) = prefix_hint.as_ref() { let next_key = next_key(prefix_hint); @@ -535,8 +517,8 @@ impl StateStore for HummockStorage { // // 3. Include(pk) => prefix_hint <= start_bound < next_key(prefix_hint) Included(range_start) | Excluded(range_start) => { - assert!(range_start.as_ref() >= prefix_hint.as_slice()); - assert!(range_start.as_ref() < next_key.as_slice() || next_key.is_empty()); + assert!(range_start.as_slice() >= prefix_hint.as_slice()); + assert!(range_start.as_slice() < next_key.as_slice() || next_key.is_empty()); } _ => unreachable!(), @@ -544,8 +526,8 @@ impl StateStore for HummockStorage { match key_range.end_bound() { Included(range_end) => { - assert!(range_end.as_ref() >= prefix_hint.as_slice()); - assert!(range_end.as_ref() < next_key.as_slice() || next_key.is_empty()); + assert!(range_end.as_slice() >= prefix_hint.as_slice()); + assert!(range_end.as_slice() < next_key.as_slice() || next_key.is_empty()); } // 1. Excluded(end_bound_of_prefix(pk + col)) => prefix_hint < end_bound <= @@ -554,8 +536,8 @@ impl StateStore for HummockStorage { // 2. Excluded(pk + bound) => prefix_hint < end_bound <= // next_key(prefix_hint) Excluded(range_end) => { - assert!(range_end.as_ref() > prefix_hint.as_slice()); - assert!(range_end.as_ref() <= next_key.as_slice() || next_key.is_empty()); + assert!(range_end.as_slice() > prefix_hint.as_slice()); + assert!(range_end.as_slice() <= next_key.as_slice() || next_key.is_empty()); } std::ops::Bound::Unbounded => { @@ -573,27 +555,17 @@ impl StateStore for HummockStorage { retention_seconds: read_options.retention_seconds, }; - return self.storage_core.iter( - ( - key_range.start_bound().map(|b| b.as_ref().to_owned()), - key_range.end_bound().map(|b| b.as_ref().to_owned()), - ), - read_options.epoch, - read_options_v2, - ); + self.storage_core + .iter(key_range, read_options.epoch, read_options_v2) } /// Returns a backward iterator that scans from the end key to the begin key /// The result is based on a snapshot corresponding to the given `epoch`. - fn backward_iter( + fn backward_iter( &self, - _key_range: R, + _key_range: (Bound>, Bound>), _read_options: ReadOptions, - ) -> Self::BackwardIterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardIterFuture<'_> { async move { unimplemented!(); } @@ -679,7 +651,7 @@ impl StateStoreIter for HummockStateStoreIter { type Item = (Bytes, Bytes); type NextFuture<'a> = - impl Future>> + Send; + impl Future>> + Send + 'a; fn next(&mut self) -> Self::NextFuture<'_> { async move { diff --git a/src/storage/src/hummock/store/state_store.rs b/src/storage/src/hummock/store/state_store.rs index 2713e9877f43c..cdd3446a62d0d 100644 --- a/src/storage/src/hummock/store/state_store.rs +++ b/src/storage/src/hummock/store/state_store.rs @@ -612,7 +612,7 @@ pub struct HummockStorageIterator { impl StateStoreIter for HummockStorageIterator { type Item = (Bytes, Bytes); - type NextFuture<'a> = impl Future>> + Send; + type NextFuture<'a> = impl Future>> + Send + 'a; fn next(&mut self) -> Self::NextFuture<'_> { async { diff --git a/src/storage/src/keyspace.rs b/src/storage/src/keyspace.rs index e218eab4738ad..63e03eaab704d 100644 --- a/src/storage/src/keyspace.rs +++ b/src/storage/src/keyspace.rs @@ -169,7 +169,7 @@ impl Keyspace { } } -pub struct StripPrefixIterator> { +pub struct StripPrefixIterator + 'static> { iter: I, prefix_len: usize, } @@ -178,7 +178,7 @@ impl> StateStoreIter for StripPrefixIte type Item = (Bytes, Bytes); type NextFuture<'a> = - impl Future>> + Send; + impl Future>> + Send + 'a; fn next(&mut self) -> Self::NextFuture<'_> { async move { diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 6260e41279c7d..a684b5ec95d2e 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. #![feature(allocator_api)] -#![feature(backtrace)] #![feature(binary_heap_drain_sorted)] #![feature(bound_as_ref)] #![feature(bound_map)] @@ -21,11 +20,9 @@ #![feature(custom_test_frameworks)] #![feature(drain_filter)] #![feature(generators)] -#![feature(generic_associated_types)] #![feature(hash_drain_filter)] -#![feature(let_else)] +#![feature(let_chains)] #![feature(lint_reasons)] -#![feature(map_first_last)] #![feature(proc_macro_hygiene)] #![feature(result_option_inspect)] #![feature(stmt_expr_attributes)] @@ -42,6 +39,9 @@ #![feature(once_cell)] #![cfg_attr(coverage, feature(no_coverage))] #![recursion_limit = "256"] +#![feature(error_generic_member_access)] +#![feature(provide_any)] +#![expect(clippy::result_large_err, reason = "FIXME: HummockError is large")] pub mod hummock; pub mod keyspace; diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 23bd6f9820caa..6ffe6e0742952 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -212,7 +212,7 @@ impl StateStore for MemoryStateStore { read_options: ReadOptions, ) -> Self::GetFuture<'_> { async move { - let range_bounds = key.to_vec()..=key.to_vec(); + let range_bounds = (Bound::Included(key.to_vec()), Bound::Included(key.to_vec())); // We do not really care about vnodes here, so we just use the default value. let res = self.scan(None, range_bounds, Some(1), read_options).await?; @@ -224,17 +224,13 @@ impl StateStore for MemoryStateStore { } } - fn scan( + fn scan( &self, _prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), limit: Option, read_options: ReadOptions, - ) -> Self::ScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::ScanFuture<'_> { async move { let epoch = read_options.epoch; let mut data = vec![]; @@ -262,16 +258,12 @@ impl StateStore for MemoryStateStore { } } - fn backward_scan( + fn backward_scan( &self, - _key_range: R, + _key_range: (Bound>, Bound>), _limit: Option, _read_options: ReadOptions, - ) -> Self::BackwardScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardScanFuture<'_> { async move { unimplemented!() } } @@ -292,16 +284,12 @@ impl StateStore for MemoryStateStore { } } - fn iter( + fn iter( &self, _prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, - ) -> Self::IterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::IterFuture<'_> { async move { Ok(MemoryStateStoreIter::new( batched_iter::Iter::new(self.inner.clone(), to_bytes_range(key_range)), @@ -310,15 +298,11 @@ impl StateStore for MemoryStateStore { } } - fn backward_iter( + fn backward_iter( &self, - _key_range: R, + _key_range: (Bound>, Bound>), _read_options: ReadOptions, - ) -> Self::BackwardIterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardIterFuture<'_> { async move { unimplemented!() } } @@ -366,7 +350,7 @@ impl MemoryStateStoreIter { impl StateStoreIter for MemoryStateStoreIter { type Item = (Bytes, Bytes); - type NextFuture<'a> = impl Future>> + Send; + type NextFuture<'a> = impl Future>> + Send + 'a; fn next(&mut self) -> Self::NextFuture<'_> { async move { @@ -423,7 +407,10 @@ mod tests { state_store .scan( None, - "a"..="b", + ( + Bound::Included(b"a".to_vec()), + Bound::Included(b"b".to_vec()), + ), None, ReadOptions { epoch: 0, @@ -442,7 +429,10 @@ mod tests { state_store .scan( None, - "a"..="b", + ( + Bound::Included(b"a".to_vec()), + Bound::Included(b"b".to_vec()), + ), Some(1), ReadOptions { epoch: 0, @@ -458,7 +448,10 @@ mod tests { state_store .scan( None, - "a"..="b", + ( + Bound::Included(b"a".to_vec()), + Bound::Included(b"b".to_vec()), + ), None, ReadOptions { epoch: 1, diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 55e995cfdf07f..ab571bb91cfa6 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::RangeBounds; +use std::ops::Bound; use std::sync::Arc; use async_stack_trace::StackTrace; @@ -120,17 +120,13 @@ where } } - fn scan( + fn scan( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), limit: Option, read_options: ReadOptions, - ) -> Self::ScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::ScanFuture<'_> { async move { let timer = self.stats.range_scan_duration.start_timer(); let result = self @@ -149,16 +145,12 @@ where } } - fn backward_scan( + fn backward_scan( &self, - key_range: R, + key_range: (Bound>, Bound>), limit: Option, read_options: ReadOptions, - ) -> Self::BackwardScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardScanFuture<'_> { async move { let timer = self.stats.range_backward_scan_duration.start_timer(); let result = self @@ -204,35 +196,21 @@ where } } - fn iter( + fn iter( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, - ) -> Self::IterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { - async move { - self.monitored_iter(self.inner.iter(prefix_hint, key_range, read_options)) - .await - } + ) -> Self::IterFuture<'_> { + self.monitored_iter(self.inner.iter(prefix_hint, key_range, read_options)) } - fn backward_iter( + fn backward_iter( &self, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, - ) -> Self::BackwardIterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { - async move { - self.monitored_iter(self.inner.backward_iter(key_range, read_options)) - .await - } + ) -> Self::BackwardIterFuture<'_> { + self.monitored_iter(self.inner.backward_iter(key_range, read_options)) } fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Self::WaitEpochFuture<'_> { @@ -312,7 +290,7 @@ where type Item = (Bytes, Bytes); type NextFuture<'a> = - impl Future>> + Send; + impl Future>> + Send + 'a; fn next(&mut self) -> Self::NextFuture<'_> { async move { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index fd518e8b590a5..889c9d56e9688 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::future::Future; -use std::ops::RangeBounds; +use std::ops::Bound; use bytes::Bytes; use risingwave_hummock_sdk::HummockReadEpoch; @@ -43,32 +43,24 @@ impl StateStore for PanicStateStore { } } - fn scan( + fn scan( &self, _prefix_hint: Option>, - _key_range: R, + _key_range: (Bound>, Bound>), _limit: Option, _read_options: ReadOptions, - ) -> Self::ScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::ScanFuture<'_> { async move { panic!("should not scan from the state store!"); } } - fn backward_scan( + fn backward_scan( &self, - _key_range: R, + _key_range: (Bound>, Bound>), _limit: Option, _read_options: ReadOptions, - ) -> Self::BackwardScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardScanFuture<'_> { async move { panic!("should not backward scan from the state store!"); } @@ -84,30 +76,22 @@ impl StateStore for PanicStateStore { } } - fn iter( + fn iter( &self, _prefix_hint: Option>, - _key_range: R, + _key_range: (Bound>, Bound>), _read_options: ReadOptions, - ) -> Self::IterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::IterFuture<'_> { async move { panic!("should not create iter from the state store!"); } } - fn backward_iter( + fn backward_iter( &self, - _key_range: R, + _key_range: (Bound>, Bound>), _read_options: ReadOptions, - ) -> Self::BackwardIterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> Self::BackwardIterFuture<'_> { async move { panic!("should not create backward iter from the panic state store!"); } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index f377b2a8c8339..8ad2c5f76617f 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::future::Future; -use std::ops::RangeBounds; +use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; @@ -32,13 +32,13 @@ pub struct SyncResult { pub uncommitted_ssts: Vec, } -pub trait GetFutureTrait<'a> = Future>> + Send; -pub trait ScanFutureTrait<'a, R, B> = Future>> + Send; -pub trait IterFutureTrait<'a, I: StateStoreIter, R, B> = - Future> + Send; -pub trait EmptyFutureTrait<'a> = Future> + Send; -pub trait SyncFutureTrait<'a> = Future> + Send; -pub trait IngestBatchFutureTrait<'a> = Future> + Send; +pub trait GetFutureTrait<'a> = Future>> + Send + 'a; +pub trait ScanFutureTrait<'a> = Future>> + Send + 'a; +pub trait IterFutureTrait<'a, I: StateStoreIter> = + Future> + Send + 'a; +pub trait EmptyFutureTrait<'a> = Future> + Send + 'a; +pub trait SyncFutureTrait<'a> = Future> + Send + 'a; +pub trait IngestBatchFutureTrait<'a> = Future> + Send + 'a; #[macro_export] macro_rules! define_state_store_associated_type { @@ -48,44 +48,26 @@ macro_rules! define_state_store_associated_type { type WaitEpochFuture<'a> = impl EmptyFutureTrait<'a>; type SyncFuture<'a> = impl SyncFutureTrait<'a>; - type BackwardIterFuture<'a, R, B> = impl IterFutureTrait<'a, Self::Iter, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type BackwardIterFuture<'a> = impl IterFutureTrait<'a, Self::Iter>; - type IterFuture<'a, R, B> = impl IterFutureTrait<'a, Self::Iter, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type IterFuture<'a> = impl IterFutureTrait<'a, Self::Iter>; - type BackwardScanFuture<'a, R, B> = impl ScanFutureTrait<'a, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type BackwardScanFuture<'a> = impl ScanFutureTrait<'a>; - type ScanFuture<'a, R, B> = impl ScanFutureTrait<'a, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type ScanFuture<'a> = impl ScanFutureTrait<'a>; type ClearSharedBufferFuture<'a> = impl EmptyFutureTrait<'a>; }; } pub trait StateStore: Send + Sync + 'static + Clone { - type Iter: StateStoreIter; + type Iter: StateStoreIter + 'static; type GetFuture<'a>: GetFutureTrait<'a>; - type ScanFuture<'a, R, B>: ScanFutureTrait<'a, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type ScanFuture<'a>: ScanFutureTrait<'a>; - type BackwardScanFuture<'a, R, B>: ScanFutureTrait<'a, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type BackwardScanFuture<'a>: ScanFutureTrait<'a>; type IngestBatchFuture<'a>: IngestBatchFutureTrait<'a>; @@ -93,15 +75,9 @@ pub trait StateStore: Send + Sync + 'static + Clone { type SyncFuture<'a>: SyncFutureTrait<'a>; - type IterFuture<'a, R, B>: IterFutureTrait<'a, Self::Iter, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type IterFuture<'a>: IterFutureTrait<'a, Self::Iter>; - type BackwardIterFuture<'a, R, B>: IterFutureTrait<'a, Self::Iter, R, B> - where - R: 'static + Send + RangeBounds, - B: 'static + Send + AsRef<[u8]>; + type BackwardIterFuture<'a>: IterFutureTrait<'a, Self::Iter>; type ClearSharedBufferFuture<'a>: EmptyFutureTrait<'a>; @@ -121,26 +97,20 @@ pub trait StateStore: Send + Sync + 'static + Clone { /// /// /// By default, this simply calls `StateStore::iter` to fetch elements. - fn scan( + fn scan( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), limit: Option, read_options: ReadOptions, - ) -> Self::ScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send; + ) -> Self::ScanFuture<'_>; - fn backward_scan( + fn backward_scan( &self, - key_range: R, + key_range: (Bound>, Bound>), limit: Option, read_options: ReadOptions, - ) -> Self::BackwardScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send; + ) -> Self::BackwardScanFuture<'_>; /// Ingests a batch of data into the state store. One write batch should never contain operation /// on the same key. e.g. Put(233, x) then Delete(233). @@ -162,27 +132,21 @@ pub trait StateStore: Send + Sync + 'static + Clone { /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included in /// `key_range`) The returned iterator will iterate data based on a snapshot corresponding to /// the given `epoch`. - fn iter( + fn iter( &self, prefix_hint: Option>, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, - ) -> Self::IterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send; + ) -> Self::IterFuture<'_>; /// Opens and returns a backward iterator for given `key_range`. /// The returned iterator will iterate data based on a snapshot corresponding to the given /// `epoch` - fn backward_iter( + fn backward_iter( &self, - key_range: R, + key_range: (Bound>, Bound>), read_options: ReadOptions, - ) -> Self::BackwardIterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send; + ) -> Self::BackwardIterFuture<'_>; /// Creates a `WriteBatch` associated with this state store. fn start_write_batch(&self, write_options: WriteOptions) -> WriteBatch<'_, Self> { diff --git a/src/storage/src/table/streaming_table/state_table.rs b/src/storage/src/table/streaming_table/state_table.rs index cb15f8eb6b67b..b674b68928206 100644 --- a/src/storage/src/table/streaming_table/state_table.rs +++ b/src/storage/src/table/streaming_table/state_table.rs @@ -16,8 +16,8 @@ use std::borrow::Cow; use std::cmp::Ordering; use std::collections::BTreeMap; use std::marker::PhantomData; +use std::ops::Bound; use std::ops::Bound::*; -use std::ops::{Bound, RangeBounds}; use std::sync::Arc; use async_stack_trace::StackTrace; @@ -1023,18 +1023,18 @@ struct StorageIterInner { deserializer: RowDeserializer, } -impl StorageIterInner { - async fn new( +impl StorageIterInner +where + S: 'static, + S::Iter: 'static, +{ + async fn new( keyspace: &Keyspace, prefix_hint: Option>, - raw_key_range: R, + raw_key_range: (Bound>, Bound>), read_options: ReadOptions, deserializer: RowDeserializer, - ) -> StorageResult - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { + ) -> StorageResult { let iter = keyspace .iter_with_range(prefix_hint, raw_key_range, read_options) .await?; diff --git a/src/stream/src/error.rs b/src/stream/src/error.rs index c7c68dfe9b83c..51b9ecd9936d3 100644 --- a/src/stream/src/error.rs +++ b/src/stream/src/error.rs @@ -58,7 +58,7 @@ impl std::fmt::Debug for StreamError { write!(f, "{}", self.inner)?; writeln!(f)?; - if let Some(backtrace) = self.inner.backtrace() { + if let Some(backtrace) = (&self.inner as &dyn Error).request_ref::() { write!(f, " backtrace of inner error:\n{}", backtrace)?; } else { write!(f, " backtrace of `StreamError`:\n{}", self.backtrace)?; diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index 4a787a54f63a5..905108ab72551 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -101,7 +101,7 @@ impl std::fmt::Debug for StreamExecutorError { write!(f, "{}", self.inner)?; writeln!(f)?; - if let Some(backtrace) = self.inner.backtrace() { + if let Some(backtrace) = (&self.inner as &dyn Error).request_ref::() { write!(f, " backtrace of inner error:\n{}", backtrace)?; } else { write!( diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 653229ad03e4a..9b90640bae202 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -14,16 +14,13 @@ #![allow(rustdoc::private_intra_doc_links)] #![allow(clippy::derive_partial_eq_without_eq)] -#![feature(backtrace)] #![feature(iterator_try_collect)] #![feature(trait_alias)] #![feature(type_alias_impl_trait)] -#![feature(generic_associated_types)] #![feature(more_qualified_paths)] #![feature(lint_reasons)] #![feature(binary_heap_drain_sorted)] -#![feature(map_first_last)] -#![feature(let_else)] +#![feature(let_chains)] #![feature(hash_drain_filter)] #![feature(drain_filter)] #![feature(generators)] @@ -36,6 +33,8 @@ #![feature(never_type)] #![feature(btreemap_alloc)] #![feature(once_cell)] +#![feature(error_generic_member_access)] +#![feature(provide_any)] #![feature(btree_drain_filter)] #[macro_use] diff --git a/src/stream/src/task/env.rs b/src/stream/src/task/env.rs index 5053de21220ca..cc1f95d65df7b 100644 --- a/src/stream/src/task/env.rs +++ b/src/stream/src/task/env.rs @@ -77,7 +77,6 @@ impl StreamEnvironment { &self.server_addr } - #[expect(clippy::explicit_auto_deref)] pub fn source_manager(&self) -> &TableSourceManager { &*self.source_manager } diff --git a/src/tests/sqlsmith/src/lib.rs b/src/tests/sqlsmith/src/lib.rs index dc62978bcf05d..54fb84deb480c 100644 --- a/src/tests/sqlsmith/src/lib.rs +++ b/src/tests/sqlsmith/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(once_cell)] +#![feature(let_chains)] use std::vec; diff --git a/src/tests/sqlsmith/tests/test_runner.rs b/src/tests/sqlsmith/tests/test_runner.rs index 2a3a3bf72e2ba..5daf51592e3a4 100644 --- a/src/tests/sqlsmith/tests/test_runner.rs +++ b/src/tests/sqlsmith/tests/test_runner.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(let_chains)] + #[cfg(feature = "enable_sqlsmith_unit_test")] mod frontend; diff --git a/src/utils/async_stack_trace/src/context.rs b/src/utils/async_stack_trace/src/context.rs index 252607aaa5100..00cb50c7b76d7 100644 --- a/src/utils/async_stack_trace/src/context.rs +++ b/src/utils/async_stack_trace/src/context.rs @@ -12,11 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// FIXME: This is a false-positive clippy test, remove this while bumping toolchain. -// https://github.com/tokio-rs/tokio/issues/4836 -// https://github.com/rust-lang/rust-clippy/issues/8493 -#![expect(clippy::declare_interior_mutable_const)] - use std::cell::RefCell; use std::fmt::{Debug, Write}; use std::sync::atomic::{AtomicU64, Ordering};