Skip to content

Commit

Permalink
substitute parking_lot::Mutex for std::sync::Mutex (apache#1720)
Browse files Browse the repository at this point in the history
* Substitute parking_lot::Mutex for std::sync::Mutex

* enable parking_lot feature in tokio
  • Loading branch information
xudong963 authored Feb 2, 2022
1 parent b9a8f15 commit 46879f1
Show file tree
Hide file tree
Showing 39 changed files with 154 additions and 151 deletions.
2 changes: 1 addition & 1 deletion ballista-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client", version = "0.6.0"}
prost = "0.9"
tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
futures = "0.3"
num_cpus = "1.13.0"
1 change: 1 addition & 0 deletions ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ log = "0.4"
tokio = "1.0"
tempfile = "3"
sqlparser = "0.13"
parking_lot = "0.11"

datafusion = { path = "../../../datafusion", version = "6.0.0" }

Expand Down
19 changes: 10 additions & 9 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

//! Distributed execution context.

use parking_lot::Mutex;
use sqlparser::ast::Statement;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use ballista_core::config::BallistaConfig;
use ballista_core::utils::create_df_ctx_with_ballista_query_planner;
Expand Down Expand Up @@ -142,7 +143,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -162,7 +163,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -186,7 +187,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -203,7 +204,7 @@ impl BallistaContext {
name: &str,
table: Arc<dyn TableProvider>,
) -> Result<()> {
let mut state = self.state.lock().unwrap();
let mut state = self.state.lock();
state.tables.insert(name.to_owned(), table);
Ok(())
}
Expand Down Expand Up @@ -280,7 +281,7 @@ impl BallistaContext {
/// might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let mut ctx = {
let state = self.state.lock().unwrap();
let state = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&state.scheduler_host,
state.scheduler_port,
Expand All @@ -291,7 +292,7 @@ impl BallistaContext {
let is_show = self.is_show_statement(sql).await?;
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
if is_show {
let state = self.state.lock().unwrap();
let state = self.state.lock();
ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_information_schema(
state.config.default_with_information_schema(),
Expand All @@ -301,7 +302,7 @@ impl BallistaContext {

// register tables with DataFusion context
{
let state = self.state.lock().unwrap();
let state = self.state.lock();
for (name, prov) in &state.tables {
ctx.register_table(
TableReference::Bare { table: name },
Expand Down Expand Up @@ -483,7 +484,7 @@ mod tests {
.unwrap();

{
let mut guard = context.state.lock().unwrap();
let mut guard = context.state.lock();
let csv_table = guard.tables.get("single_nan");

if let Some(table_provide) = csv_table {
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ parse_arg = "0.1.3"
arrow-flight = { version = "8.0.0" }
datafusion = { path = "../../../datafusion", version = "6.0.0" }

parking_lot = "0.11"

[dev-dependencies]
tempfile = "3"

Expand Down
5 changes: 3 additions & 2 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

//! Client API for sending requests to executors.

use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;
use std::{collections::HashMap, pin::Pin};
use std::{
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -154,7 +155,7 @@ impl Stream for FlightDataStream {
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut stream = self.stream.lock().expect("mutex is bad");
let mut stream = self.stream.lock();
stream.poll_next_unpin(cx).map(|x| match x {
Some(flight_data_chunk_result) => {
let converted_chunk = flight_data_chunk_result
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
//! will use the ShuffleReaderExec to read these results.

use parking_lot::Mutex;
use std::fs::File;
use std::iter::Iterator;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Instant;
use std::{any::Any, pin::Pin};

Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ futures = "0.3"
log = "0.4"
snmalloc-rs = {version = "0.2", features= ["cache-friendly"], optional = true}
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
hyper = "0.14.4"
parking_lot = "0.11"

[dev-dependencies]

Expand Down
1 change: 1 addition & 0 deletions ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tokio-stream = { version = "0.1", features = ["net"], optional = true }
tonic = "0.6"
tower = { version = "0.4" }
warp = "0.3"
parking_lot = "0.11"

[dev-dependencies]
ballista-core = { path = "../core", version = "0.6.0" }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ snmalloc = ["snmalloc-rs"]
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client" }
structopt = { version = "0.3", default-features = false }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] }
futures = "0.3"
env_logger = "0.9"
mimalloc = { version = "0.1", optional = true, default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async fn datafusion_sql_benchmarks(
}

async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()> {
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let runtime = ctx.state.lock().runtime_env.clone();
let plan = ctx.create_logical_plan(sql)?;
let plan = ctx.optimize(&plan)?;
if debug {
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
.with_target_partitions(opt.partitions)
.with_batch_size(opt.batch_size);
let mut ctx = ExecutionContext::with_config(config);
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let runtime = ctx.state.lock().runtime_env.clone();

// register tables
for table in TABLES {
Expand Down Expand Up @@ -547,7 +547,7 @@ async fn execute_query(
displayable(physical_plan.as_ref()).indent()
);
}
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let runtime = ctx.state.lock().runtime_env.clone();
let result = collect(physical_plan.clone(), runtime).await?;
if debug {
println!(
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.58"
[dependencies]
clap = { version = "3", features = ["derive", "cargo"] }
rustyline = "9.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
datafusion = { path = "../datafusion", version = "6.0.0" }
arrow = { version = "8.0.0" }
ballista = { path = "../ballista/rust/client", version = "0.6.0" }
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ arrow-flight = { version = "8.0.0" }
datafusion = { path = "../datafusion" }
prost = "0.9"
tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
futures = "0.3"
num_cpus = "1.13.0"
3 changes: 2 additions & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ chrono = { version = "0.4", default-features = false }
async-trait = "0.1.41"
futures = "0.3"
pin-project-lite= "^0.2.7"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
tokio-stream = "0.1"
log = "^0.4"
md-5 = { version = "^0.10.0", optional = true }
Expand All @@ -78,6 +78,7 @@ avro-rs = { version = "0.13", features = ["snappy"], optional = true }
num-traits = { version = "0.2", optional = true }
pyo3 = { version = "0.15", optional = true }
tempfile = "3"
parking_lot = "0.11"

[dev-dependencies]
criterion = "0.3"
Expand Down
5 changes: 3 additions & 2 deletions datafusion/benches/aggregate_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use crate::criterion::Criterion;
use data_utils::create_table_provider;
use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;
use tokio::runtime::Runtime;

fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();
let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap();
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/benches/math_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
extern crate criterion;
use criterion::Criterion;

use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;

use tokio::runtime::Runtime;

Expand All @@ -40,7 +41,7 @@ fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();

// execute the query
let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap();
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
rt.block_on(df.collect()).unwrap();
}

Expand Down
13 changes: 7 additions & 6 deletions datafusion/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable};
use datafusion::datasource::object_store::local::LocalFileSystem;

use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;

extern crate arrow;
extern crate datafusion;
Expand All @@ -38,7 +39,7 @@ fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();

// execute the query
let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap();
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
rt.block_on(df.collect()).unwrap();
}

Expand Down Expand Up @@ -81,18 +82,18 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
rt.block_on(async {
// create local execution context
let mut ctx = ExecutionContext::new();
ctx.state.lock().unwrap().config.target_partitions = 1;
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
ctx.state.lock().config.target_partitions = 1;
let runtime = ctx.state.lock().runtime_env.clone();

let mem_table = MemTable::load(Arc::new(csv), Some(partitions), runtime)
.await
.unwrap();
ctx.register_table("aggregate_test_100", Arc::new(mem_table))
.unwrap();
ctx_holder.lock().unwrap().push(Arc::new(Mutex::new(ctx)))
ctx_holder.lock().push(Arc::new(Mutex::new(ctx)))
});

let ctx = ctx_holder.lock().unwrap().get(0).unwrap().clone();
let ctx = ctx_holder.lock().get(0).unwrap().clone();
ctx
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/benches/window_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use crate::criterion::Criterion;
use data_utils::create_table_provider;
use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;
use tokio::runtime::Runtime;

fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();
let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap();
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}

Expand Down
15 changes: 8 additions & 7 deletions datafusion/src/catalog/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
//! representing collections of named schemas.

use crate::catalog::schema::SchemaProvider;
use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::sync::Arc;

/// Represent a list of named catalogs
pub trait CatalogList: Sync + Send {
Expand Down Expand Up @@ -75,17 +76,17 @@ impl CatalogList for MemoryCatalogList {
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
let mut catalogs = self.catalogs.write().unwrap();
let mut catalogs = self.catalogs.write();
catalogs.insert(name, catalog)
}

fn catalog_names(&self) -> Vec<String> {
let catalogs = self.catalogs.read().unwrap();
let catalogs = self.catalogs.read();
catalogs.keys().map(|s| s.to_string()).collect()
}

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
let catalogs = self.catalogs.read().unwrap();
let catalogs = self.catalogs.read();
catalogs.get(name).cloned()
}
}
Expand Down Expand Up @@ -129,7 +130,7 @@ impl MemoryCatalogProvider {
name: impl Into<String>,
schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>> {
let mut schemas = self.schemas.write().unwrap();
let mut schemas = self.schemas.write();
schemas.insert(name.into(), schema)
}
}
Expand All @@ -140,12 +141,12 @@ impl CatalogProvider for MemoryCatalogProvider {
}

fn schema_names(&self) -> Vec<String> {
let schemas = self.schemas.read().unwrap();
let schemas = self.schemas.read();
schemas.keys().cloned().collect()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let schemas = self.schemas.read().unwrap();
let schemas = self.schemas.read();
schemas.get(name).cloned()
}
}
Loading

0 comments on commit 46879f1

Please sign in to comment.