Skip to content

Commit

Permalink
chore: merge main 3ae0299
Browse files Browse the repository at this point in the history
  • Loading branch information
appletreeisyellow committed Apr 22, 2024
2 parents 555c388 + 3ae0299 commit a689b9d
Show file tree
Hide file tree
Showing 38 changed files with 417 additions and 491 deletions.
16 changes: 8 additions & 8 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow_schema::Field;

use crate::error::_schema_err;
use crate::utils::{parse_identifiers_normalized, quote_identifier};
use crate::{DFSchema, DataFusionError, OwnedTableReference, Result, SchemaError};
use crate::{DFSchema, DataFusionError, Result, SchemaError, TableReference};
use std::collections::HashSet;
use std::convert::Infallible;
use std::fmt;
Expand All @@ -32,7 +32,7 @@ use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Column {
/// relation/table reference.
pub relation: Option<OwnedTableReference>,
pub relation: Option<TableReference>,
/// field/column name.
pub name: String,
}
Expand All @@ -45,7 +45,7 @@ impl Column {
///
/// [`TableReference::parse_str`]: crate::TableReference::parse_str
pub fn new(
relation: Option<impl Into<OwnedTableReference>>,
relation: Option<impl Into<TableReference>>,
name: impl Into<String>,
) -> Self {
Self {
Expand Down Expand Up @@ -74,20 +74,20 @@ impl Column {
let (relation, name) = match idents.len() {
1 => (None, idents.remove(0)),
2 => (
Some(OwnedTableReference::Bare {
Some(TableReference::Bare {
table: idents.remove(0).into(),
}),
idents.remove(0),
),
3 => (
Some(OwnedTableReference::Partial {
Some(TableReference::Partial {
schema: idents.remove(0).into(),
table: idents.remove(0).into(),
}),
idents.remove(0),
),
4 => (
Some(OwnedTableReference::Full {
Some(TableReference::Full {
catalog: idents.remove(0).into(),
schema: idents.remove(0).into(),
table: idents.remove(0).into(),
Expand Down Expand Up @@ -340,8 +340,8 @@ impl From<String> for Column {
}

/// Create a column, use qualifier and field name
impl From<(Option<&OwnedTableReference>, &Field)> for Column {
fn from((relation, field): (Option<&OwnedTableReference>, &Field)) -> Self {
impl From<(Option<&TableReference>, &Field)> for Column {
fn from((relation, field): (Option<&TableReference>, &Field)) -> Self {
Self::new(relation.cloned(), field.name())
}
}
Expand Down
47 changes: 19 additions & 28 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::sync::Arc;
use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
use crate::{
field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
OwnedTableReference, SchemaError, TableReference,
SchemaError, TableReference,
};

use arrow::compute::can_cast_types;
Expand Down Expand Up @@ -111,7 +111,7 @@ pub struct DFSchema {
inner: SchemaRef,
/// Optional qualifiers for each column in this schema. In the same order as
/// the `self.inner.fields()`
field_qualifiers: Vec<Option<OwnedTableReference>>,
field_qualifiers: Vec<Option<TableReference>>,
/// Stores functional dependencies in the schema.
functional_dependencies: FunctionalDependencies,
}
Expand All @@ -128,10 +128,10 @@ impl DFSchema {

/// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
pub fn new_with_metadata(
qualified_fields: Vec<(Option<OwnedTableReference>, Arc<Field>)>,
qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
metadata: HashMap<String, String>,
) -> Result<Self> {
let (qualifiers, fields): (Vec<Option<OwnedTableReference>>, Vec<Arc<Field>>) =
let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
qualified_fields.into_iter().unzip();

let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
Expand Down Expand Up @@ -170,10 +170,9 @@ impl DFSchema {
schema: &Schema,
) -> Result<Self> {
let qualifier = qualifier.into();
let owned_qualifier = qualifier.to_owned_reference();
let schema = DFSchema {
inner: schema.clone().into(),
field_qualifiers: vec![Some(owned_qualifier); schema.fields.len()],
field_qualifiers: vec![Some(qualifier); schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
};
schema.check_names()?;
Expand All @@ -182,16 +181,12 @@ impl DFSchema {

/// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
pub fn from_field_specific_qualified_schema(
qualifiers: Vec<Option<impl Into<TableReference>>>,
qualifiers: Vec<Option<TableReference>>,
schema: &SchemaRef,
) -> Result<Self> {
let owned_qualifiers = qualifiers
.into_iter()
.map(|qualifier| qualifier.map(|q| q.into().to_owned_reference()))
.collect();
let dfschema = Self {
inner: schema.clone(),
field_qualifiers: owned_qualifiers,
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Expand All @@ -216,7 +211,7 @@ impl DFSchema {
for (qualifier, name) in qualified_names {
if unqualified_names.contains(name) {
return _schema_err!(SchemaError::AmbiguousReference {
field: Column::new(Some(qualifier.to_owned_reference()), name)
field: Column::new(Some(qualifier.clone()), name)
});
}
}
Expand Down Expand Up @@ -270,7 +265,7 @@ impl DFSchema {
return;
}

let self_fields: HashSet<(Option<&OwnedTableReference>, &FieldRef)> =
let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
self.iter().collect();
let self_unqualified_names: HashSet<&str> = self
.inner
Expand Down Expand Up @@ -316,7 +311,7 @@ impl DFSchema {

/// Returns an immutable reference of a specific `Field` instance selected using an
/// offset within the internal `fields` vector and its qualifier
pub fn qualified_field(&self, i: usize) -> (Option<&OwnedTableReference>, &Field) {
pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
(self.field_qualifiers[i].as_ref(), self.field(i))
}

Expand Down Expand Up @@ -383,13 +378,11 @@ impl DFSchema {
&self,
qualifier: Option<&TableReference>,
name: &str,
) -> Result<(Option<&OwnedTableReference>, &Field)> {
) -> Result<(Option<&TableReference>, &Field)> {
if let Some(qualifier) = qualifier {
let idx = self
.index_of_column_by_name(Some(qualifier), name)?
.ok_or_else(|| {
field_not_found(Some(qualifier.to_string()), name, self)
})?;
.ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
} else {
self.qualified_field_with_unqualified_name(name)
Expand Down Expand Up @@ -428,7 +421,7 @@ impl DFSchema {
pub fn qualified_fields_with_unqualified_name(
&self,
name: &str,
) -> Vec<(Option<&OwnedTableReference>, &Field)> {
) -> Vec<(Option<&TableReference>, &Field)> {
self.iter()
.filter(|(_, field)| field.name() == name)
.map(|(qualifier, field)| (qualifier, field.as_ref()))
Expand Down Expand Up @@ -456,7 +449,7 @@ impl DFSchema {
pub fn qualified_field_with_unqualified_name(
&self,
name: &str,
) -> Result<(Option<&OwnedTableReference>, &Field)> {
) -> Result<(Option<&TableReference>, &Field)> {
let matches = self.qualified_fields_with_unqualified_name(name);
match matches.len() {
0 => Err(unqualified_field_not_found(name, self)),
Expand Down Expand Up @@ -527,7 +520,7 @@ impl DFSchema {
) -> Result<&Field> {
let idx = self
.index_of_column_by_name(Some(qualifier), name)?
.ok_or_else(|| field_not_found(Some(qualifier.to_string()), name, self))?;
.ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;

Ok(self.field(idx))
}
Expand All @@ -544,7 +537,7 @@ impl DFSchema {
pub fn qualified_field_from_column(
&self,
column: &Column,
) -> Result<(Option<&OwnedTableReference>, &Field)> {
) -> Result<(Option<&TableReference>, &Field)> {
self.qualified_field_with_name(column.relation.as_ref(), &column.name)
}

Expand Down Expand Up @@ -750,7 +743,7 @@ impl DFSchema {
}

/// Replace all field qualifier with new value in schema
pub fn replace_qualifier(self, qualifier: impl Into<OwnedTableReference>) -> Self {
pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
let qualifier = qualifier.into();
DFSchema {
field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
Expand All @@ -777,9 +770,7 @@ impl DFSchema {
}

/// Iterate over the qualifiers and fields in the DFSchema
pub fn iter(
&self,
) -> impl Iterator<Item = (Option<&OwnedTableReference>, &FieldRef)> {
pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
self.field_qualifiers
.iter()
.zip(self.inner.fields().iter())
Expand Down Expand Up @@ -1065,7 +1056,7 @@ mod tests {
#[test]
fn test_from_field_specific_qualified_schema() -> Result<()> {
let schema = DFSchema::from_field_specific_qualified_schema(
vec![Some("t1"), None],
vec![Some("t1".into()), None],
&Arc::new(Schema::new(vec![
Field::new("c0", DataType::Boolean, true),
Field::new("c1", DataType::Boolean, true),
Expand Down
6 changes: 3 additions & 3 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::result;
use std::sync::Arc;

use crate::utils::quote_identifier;
use crate::{Column, DFSchema, OwnedTableReference};
use crate::{Column, DFSchema, TableReference};
#[cfg(feature = "avro")]
use apache_avro::Error as AvroError;
use arrow::error::ArrowError;
Expand Down Expand Up @@ -141,7 +141,7 @@ pub enum SchemaError {
AmbiguousReference { field: Column },
/// Schema contains duplicate qualified field name
DuplicateQualifiedField {
qualifier: Box<OwnedTableReference>,
qualifier: Box<TableReference>,
name: String,
},
/// Schema contains duplicate unqualified field name
Expand Down Expand Up @@ -606,7 +606,7 @@ pub use plan_err as _plan_err;
pub use schema_err as _schema_err;

/// Create a "field not found" DataFusion::SchemaError
pub fn field_not_found<R: Into<OwnedTableReference>>(
pub fn field_not_found<R: Into<TableReference>>(
qualifier: Option<R>,
name: &str,
schema: &DFSchema,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ pub use functional_dependencies::{
pub use join_type::{JoinConstraint, JoinSide, JoinType};
pub use param_value::ParamValues;
pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::{OwnedSchemaReference, SchemaReference};
pub use schema_reference::SchemaReference;
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{OwnedTableReference, ResolvedTableReference, TableReference};
pub use table_reference::{ResolvedTableReference, TableReference};
pub use unnest::UnnestOptions;
pub use utils::project_schema;

Expand Down
8 changes: 0 additions & 8 deletions datafusion/common/src/schema_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ impl SchemaReference {
}
}

pub type OwnedSchemaReference = SchemaReference;

impl std::fmt::Display for SchemaReference {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand All @@ -43,9 +41,3 @@ impl std::fmt::Display for SchemaReference {
}
}
}

impl<'a> From<&'a OwnedSchemaReference> for SchemaReference {
fn from(value: &'a OwnedSchemaReference) -> Self {
value.clone()
}
}
37 changes: 6 additions & 31 deletions datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,6 @@ pub enum TableReference {
},
}

/// This is a [`TableReference`] that has 'static lifetime (aka it
/// owns the underlying string)
///
/// To convert a [`TableReference`] to an [`OwnedTableReference`], use
///
/// ```
/// # use datafusion_common::{OwnedTableReference, TableReference};
/// let table_reference = TableReference::from("mytable");
/// let owned_reference = table_reference.to_owned_reference();
/// ```
pub type OwnedTableReference = TableReference;

impl std::fmt::Display for TableReference {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down Expand Up @@ -244,12 +232,6 @@ impl TableReference {
}
}

/// Converts directly into an [`OwnedTableReference`] by cloning
/// the underlying data.
pub fn to_owned_reference(&self) -> OwnedTableReference {
self.clone()
}

/// Forms a string where the identifiers are quoted
///
/// # Example
Expand Down Expand Up @@ -322,19 +304,6 @@ impl TableReference {
}
}

/// Parse a `String` into a OwnedTableReference as a multipart SQL identifier.
impl From<String> for OwnedTableReference {
fn from(s: String) -> Self {
TableReference::parse_str(&s).to_owned_reference()
}
}

impl<'a> From<&'a OwnedTableReference> for TableReference {
fn from(value: &'a OwnedTableReference) -> Self {
value.clone()
}
}

/// Parse a string into a TableReference, normalizing where appropriate
///
/// See full details on [`TableReference::parse_str`]
Expand All @@ -350,6 +319,12 @@ impl<'a> From<&'a String> for TableReference {
}
}

impl From<String> for TableReference {
fn from(s: String) -> Self {
Self::parse_str(&s)
}
}

impl From<ResolvedTableReference> for TableReference {
fn from(resolved: ResolvedTableReference) -> Self {
Self::Full {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::datasource::TableProvider;
use crate::execution::context::SessionState;

use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{Constraints, DFSchema, DataFusionError, OwnedTableReference};
use datafusion_common::{Constraints, DFSchema, DataFusionError, TableReference};
use datafusion_expr::CreateExternalTable;

use async_trait::async_trait;
Expand Down Expand Up @@ -129,7 +129,7 @@ impl ListingSchemaProvider {
if !self.table_exist(table_name) {
let table_url = format!("{}/{}", self.authority, table_path);

let name = OwnedTableReference::bare(table_name);
let name = TableReference::bare(table_name);
let provider = self
.factory
.create(
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ mod tests {
use crate::execution::context::SessionContext;

use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{Constraints, DFSchema, OwnedTableReference};
use datafusion_common::{Constraints, DFSchema, TableReference};

#[tokio::test]
async fn test_create_using_non_std_file_ext() {
Expand All @@ -184,7 +184,7 @@ mod tests {
let factory = ListingTableFactory::new();
let context = SessionContext::new();
let state = context.state();
let name = OwnedTableReference::bare("foo");
let name = TableReference::bare("foo");
let cmd = CreateExternalTable {
name,
location: csv_file.path().to_str().unwrap().to_string(),
Expand Down Expand Up @@ -222,7 +222,7 @@ mod tests {
let factory = ListingTableFactory::new();
let context = SessionContext::new();
let state = context.state();
let name = OwnedTableReference::bare("foo");
let name = TableReference::bare("foo");

let mut options = HashMap::new();
options.insert("format.schema_infer_max_rec".to_owned(), "1000".to_owned());
Expand Down
Loading

0 comments on commit a689b9d

Please sign in to comment.