Skip to content

Commit

Permalink
fix provider
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 committed Jan 28, 2025
1 parent 5d0ee1d commit a83b97d
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions libs/datafusion/jni/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub async fn query(
ctx: SessionContext,
ticket: Bytes,
) -> datafusion::common::Result<DataFrame> {
let df = dataframe_for_index(&ctx, "theIndex".to_owned(), ticket).await?;
let df = dataframe_for_index(&ctx, "theIndex".to_owned(), ticket, "http://localhost:9400".to_owned()).await?;

df.sort(vec![col("score").sort(false, true)])
.map_err(|e| DataFusionError::Execution(format!("Failed to sort DataFrame: {}", e)))
Expand All @@ -33,9 +33,11 @@ pub async fn read_aggs(
ctx: SessionContext,
ticket: Bytes,
) -> datafusion::common::Result<DataFrame> {
let df = dataframe_for_index(&ctx, "theIndex".to_owned(), ticket).await?;
let df = dataframe_for_index(&ctx, "theIndex".to_owned(), ticket, "http://localhost:9400".to_owned()).await?;
df.filter(col("ord").is_not_null())?
.aggregate(vec![col("ord")], vec![sum(col("count")).alias("count")])
.aggregate(vec![col("ord")], vec![sum(col("count")).alias("count")])?
.sort(vec![col("count").sort(false, true)])? // Sort by count descending
.limit(0, Some(500)) // Get top 500 results
}

// inner join two tables together, returning a single DataFrame that can be consumed
Expand All @@ -49,10 +51,10 @@ pub async fn join(

let select_cols = vec![col(r#""docId""#), col(r#""shardId""#), col(&join_field)];

let left_df = dataframe_for_index(&ctx, "left".to_owned(), left).await?.select( select_cols.clone())?;
let left_df = dataframe_for_index(&ctx, "left".to_owned(), left, "http://localhost:9400".to_owned()).await?.select( select_cols.clone())?;

let alias = format!("right.{}", &join_field);
let right_df = dataframe_for_index(&ctx, "right".to_owned(), right).await?.select(
let right_df = dataframe_for_index(&ctx, "right".to_owned(), right, "http://localhost:9400".to_owned()).await?.select(
vec![col(r#""docId""#).alias("right_docId"), col(r#""shardId""#).alias("right_shardId"), col("instance_id").alias("right_instance_id")]
)?;

Expand Down

0 comments on commit a83b97d

Please sign in to comment.