Skip to content

Commit

Permalink
Add optional bind callback to scan
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Hulette committed Jan 26, 2018
1 parent 5bdf17f commit 1910962
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
4 changes: 2 additions & 2 deletions js/src/Arrow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import * as predicate_ from './predicate';
import { Vector } from './vector';
import { RecordBatch } from './recordbatch';
import { Schema, Field, Type } from './type';
import { Table, DataFrame, NextFunc, CountByResult } from './table';
import { Table, DataFrame, NextFunc, BindFunc, CountByResult } from './table';
import { read, readAsync } from './ipc/reader/arrow';

export import View = vector_.View;
Expand All @@ -36,7 +36,7 @@ export import TimeBitWidth = type_.TimeBitWidth;
export import TypedArrayConstructor = type_.TypedArrayConstructor;

export { read, readAsync };
export { Table, DataFrame, NextFunc, CountByResult };
export { Table, DataFrame, NextFunc, BindFunc, CountByResult };
export { Field, Schema, RecordBatch, Vector, Type };

export namespace util {
Expand Down
11 changes: 7 additions & 4 deletions js/src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import { isPromise, isAsyncIterable } from './util/compat';
import { Vector, DictionaryVector, IntVector, StructVector } from './vector';
import { ChunkedView } from './vector/chunked';

export type NextFunc = (idx: number, cols: RecordBatch) => void;
export type NextFunc = (idx: number, batch: RecordBatch) => void;
export type BindFunc = (batch: RecordBatch) => void;

export interface DataFrame {
filter(predicate: Predicate): DataFrame;
scan(next: NextFunc): void;
scan(next: NextFunc, bind?: BindFunc): void;
count(): number;
countBy(col: (Col|string)): CountByResult;
}
Expand Down Expand Up @@ -128,11 +129,12 @@ export class Table implements DataFrame {
public filter(predicate: Predicate): DataFrame {
return new FilteredDataFrame(this.batches, predicate);
}
public scan(next: NextFunc) {
public scan(next: NextFunc, bind?: BindFunc) {
const batches = this.batches, numBatches = batches.length;
for (let batchIndex = -1; ++batchIndex < numBatches;) {
// load batches
const batch = batches[batchIndex];
if (bind) { bind(batch); }
// yield all indices
for (let index = -1, numRows = batch.length; ++index < numRows;) {
next(index, batch);
Expand Down Expand Up @@ -189,7 +191,7 @@ class FilteredDataFrame implements DataFrame {
this.batches = batches;
this.predicate = predicate;
}
public scan(next: NextFunc) {
public scan(next: NextFunc, bind?: BindFunc) {
// inlined version of this:
// this.parent.scan((idx, columns) => {
// if (this.predicate(idx, columns)) next(idx, columns);
Expand All @@ -199,6 +201,7 @@ class FilteredDataFrame implements DataFrame {
for (let batchIndex = -1; ++batchIndex < numBatches;) {
// load batches
const batch = batches[batchIndex];
if (bind) { bind(batch); }
const predicate = this.predicate.bind(batch);
// yield all indices
for (let index = -1, numRows = batch.length; ++index < numRows;) {
Expand Down

0 comments on commit 1910962

Please sign in to comment.