Skip to content

Commit

Permalink
Fix flight compile error
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 11, 2021
1 parent f3a6769 commit f2cfa38
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Client API for sending requests to executors.

use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::{collections::HashMap, pin::Pin};
use std::{
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -135,24 +135,28 @@ impl BallistaClient {
}

struct FlightDataStream {
stream: Streaming<FlightData>,
stream: Mutex<Streaming<FlightData>>,
schema: SchemaRef,
}

impl FlightDataStream {
pub fn new(stream: Streaming<FlightData>, schema: SchemaRef) -> Self {
Self { stream, schema }
Self {
stream: Mutex::new(stream),
schema,
}
}
}

impl Stream for FlightDataStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx).map(|x| match x {
let mut stream = self.stream.lock().expect("mutex is bad");
stream.poll_next_unpin(cx).map(|x| match x {
Some(flight_data_chunk_result) => {
let converted_chunk = flight_data_chunk_result
.map_err(|e| ArrowError::from_external_error(Box::new(e)))
Expand Down

0 comments on commit f2cfa38

Please sign in to comment.