From 1d60aa1436e81d5d67b3b16a8f6f2c5df7e57189 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Thu, 11 Jan 2018 16:07:55 -0500 Subject: [PATCH] Moved DataFrame ops to Table. DataFrame is now an interface --- js/perf/index.js | 19 ++- js/src/Arrow.externs.ts | 12 +- js/src/Arrow.ts | 8 +- js/src/bin/arrow2csv.ts | 2 +- js/src/dataframe/dataframe.ts | 179 ---------------------- js/src/{dataframe => vector}/predicate.ts | 0 js/src/vector/table.ts | 164 +++++++++++++++++--- 7 files changed, 158 insertions(+), 226 deletions(-) delete mode 100644 js/src/dataframe/dataframe.ts rename js/src/{dataframe => vector}/predicate.ts (100%) diff --git a/js/perf/index.js b/js/perf/index.js index 5ab3e76b24d7a..9527a8e842c5a 100644 --- a/js/perf/index.js +++ b/js/perf/index.js @@ -124,17 +124,16 @@ function createGetByIndexTest(vector) { } function createDataFrameDirectCountTest(table, column, test, value) { - let df = DataFrame.from(table); let colidx = table.columns.findIndex((c)=>c.name === column); if (test == 'gteq') { op = function () { sum = 0; - for (let batch = -1; ++batch < df.lengths.length;) { - const length = df.lengths[batch]; + for (let batch = -1; ++batch < table.lengths.length;) { + const length = table.lengths[batch]; // load batches - const columns = df.batches[batch]; + const columns = table.batches[batch]; // yield all indices for (let idx = -1; ++idx < length;) { @@ -145,11 +144,11 @@ function createDataFrameDirectCountTest(table, column, test, value) { } else if (test == 'eq') { op = function() { sum = 0; - for (let batch = -1; ++batch < df.lengths.length;) { - const length = df.lengths[batch]; + for (let batch = -1; ++batch < table.lengths.length;) { + const length = table.lengths[batch]; // load batches - const columns = df.batches[batch] + const columns = table.batches[batch] // yield all indices for (let idx = -1; ++idx < length;) { @@ -169,13 +168,13 @@ function createDataFrameDirectCountTest(table, column, test, value) { } function createDataFrameFilterCountTest(table, column, test, value) { - let df = DataFrame.from(table); let colidx = table.columns.findIndex((c)=>c.name === column); + let df; if (test == 'gteq') { - df = df.filter(col(column).gteq(value)); + df = table.filter(col(column).gteq(value)); } else if (test == 'eq') { - df = df.filter(col(column).eq(value)); + df = table.filter(col(column).eq(value)); } else { throw new Error(`Unrecognized test "${test}"`); } diff --git a/js/src/Arrow.externs.ts b/js/src/Arrow.externs.ts index d3bfdbbf8e123..0685d262cc186 100644 --- a/js/src/Arrow.externs.ts +++ b/js/src/Arrow.externs.ts @@ -50,6 +50,10 @@ Table.prototype.key; Table.prototype.select; /** @type {?} */ Table.prototype.toString; +/** @type {?} */ +Table.prototype.lengths; +/** @type {?} */ +Table.prototype.batches; let Vector = function() {}; /** @type {?} */ @@ -83,14 +87,6 @@ DictionaryVector.prototype.getKey; /** @type {?} */ DictionaryVector.prototype.getValue; -let DataFrame = function () {}; -/** @type {?} */ -DataFrame.prototype.lengths; -/** @type {?} */ -DataFrame.prototype.columns; -/** @type {?} */ -DataFrame.prototype.batches; - let Col = function() {}; /** @type {?} */ Col.prototype.gteq; diff --git a/js/src/Arrow.ts b/js/src/Arrow.ts index ce7235b8b13d4..d80cfed4864f8 100644 --- a/js/src/Arrow.ts +++ b/js/src/Arrow.ts @@ -45,8 +45,7 @@ import { TimestampVector, } from './vector/numeric'; -import { DataFrame } from './dataframe/dataframe'; -import { lit, col } from './dataframe/predicate'; +import { lit, col } from './vector/predicate'; // closure compiler always erases static method names: // https://github.com/google/closure-compiler/issues/1776 @@ -54,7 +53,6 @@ import { lit, col } from './dataframe/predicate'; Table['from'] = Table.from; Table['fromAsync'] = Table.fromAsync; BoolVector['pack'] = BoolVector.pack; -DataFrame['from'] = DataFrame.from; export { read, readAsync }; export { Table, Vector, StructRow }; @@ -88,8 +86,7 @@ export { FixedSizeListVector, }; -export { DataFrame } from './dataframe/dataframe'; -export { lit, col } from './dataframe/predicate'; +export { lit, col } from './vector/predicate'; /* These exports are needed for the closure umd targets */ @@ -103,7 +100,6 @@ try { Arrow['readAsync'] = readAsync; Arrow['Table'] = Table; Arrow['Vector'] = Vector; - Arrow['DataFrame'] = DataFrame; Arrow['StructRow'] = StructRow; Arrow['BoolVector'] = BoolVector; Arrow['ListVector'] = ListVector; diff --git a/js/src/bin/arrow2csv.ts b/js/src/bin/arrow2csv.ts index 01ef0b848ce75..117d417f1b4fa 100644 --- a/js/src/bin/arrow2csv.ts +++ b/js/src/bin/arrow2csv.ts @@ -97,7 +97,7 @@ files.forEach((source) => { printTable(table); }); -function printTable(table: Arrow.Table) { +function printTable(table: Arrow.Table) { let header = [...table.columns.map((_, i) => table.key(i))].map(stringify); let maxColumnWidths = header.map(x => x.length); // Pass one to convert to strings and count max column widths diff --git a/js/src/dataframe/dataframe.ts b/js/src/dataframe/dataframe.ts deleted file mode 100644 index c8db286f28eb4..0000000000000 --- a/js/src/dataframe/dataframe.ts +++ /dev/null @@ -1,179 +0,0 @@ -import { Vector } from "../vector/vector"; -import { StructVector, StructRow } from "../vector/struct"; -import { VirtualVector } from "../vector/virtual"; - -import { Predicate } from "./predicate" - -export type NextFunc = (idx: number, cols: Vector[]) => void; - -export class DataFrameRow extends StructRow { - constructor (batches: Vector[], idx: number) { - super(new StructVector({columns: batches}), idx); - } -} - -export interface DataFrameOps { - readonly batches: Vector[][]; - readonly lengths: Uint32Array; - filter(predicate: Predicate): DataFrameOps; - scan(next: NextFunc): void; - count(): number; -} - -export class DataFrame extends Vector implements DataFrameOps { - readonly lengths: Uint32Array; - constructor(readonly batches: Vector[][]) { - super(); - // for each batch - this.lengths = new Uint32Array(batches.map((batch)=>{ - // verify that every vector has the same length, and return that - // length - // throw an error if the lengths don't match - return batch.reduce((length, col) => { - if (col.length !== length) - throw new Error("Attempted to create a DataFrame with un-aligned vectors"); - return length; - }, batch[0].length); - })); - } - - get(idx: number): DataFrameRow|null { - let batch = 0; - while (idx > this.lengths[batch] && batch < this.lengths.length) - idx -= this.lengths[batch++]; - - if (batch === this.lengths.length) return null; - - else return new DataFrameRow(this.batches[batch], idx); - } - - filter(predicate: Predicate): DataFrameOps { - return new FilteredDataFrame(this, predicate); - } - - scan(next: NextFunc) { - for (let batch = -1; ++batch < this.lengths.length;) { - const length = this.lengths[batch]; - - // load batches - const columns = this.batches[batch]; - - // yield all indices - for (let idx = -1; ++idx < length;) { - next(idx, columns) - } - } - } - - count(): number { - return this.lengths.reduce((acc, val) => acc + val); - } - - *[Symbol.iterator]() { - for (let batch = -1; ++batch < this.lengths.length;) { - const length = this.lengths[batch]; - - // load batches - const columns = this.batches[batch]; - - // yield all indices - for (let idx = -1; ++idx < length;) { - yield new DataFrameRow(columns, idx); - } - } - } - - static from(table: Vector): DataFrame { - if (table instanceof StructVector) { - const columns = table.columns; - if (isAligned(columns)) { - // StructVector of aligned VirtualVectors - // break up VirtualVectors into batches - const batches = columns[0].vectors.map((_,i) => { - return columns.map((vec: VirtualVector) => { - return vec.vectors[i]; - }); - }); - return new DataFrame(batches); - } else { - return new DataFrame([columns]); - } - } else if (table instanceof VirtualVector && - table.vectors.every((vec) => vec instanceof StructVector)) { - return new DataFrame(table.vectors.map((vec) => { - return (vec as StructVector).columns; - })); - } else { - return new DataFrame([[table]]); - } - } -} - -class FilteredDataFrame implements DataFrameOps { - readonly lengths: Uint32Array; - readonly batches: Vector[][]; - constructor (readonly parent: DataFrameOps, private predicate: Predicate) { - this.batches = parent.batches; - this.lengths = parent.lengths; - } - - scan(next: NextFunc) { - // inlined version of this: - // this.parent.scan((idx, columns) => { - // if (this.predicate(idx, columns)) next(idx, columns); - // }); - for (let batch = -1; ++batch < this.lengths.length;) { - const length = this.lengths[batch]; - - // load batches - const columns = this.batches[batch]; - const predicate = this.predicate.bind(columns); - - // yield all indices - for (let idx = -1; ++idx < length;) { - if (predicate(idx, columns)) next(idx, columns); - } - } - } - - count(): number { - // inlined version of this: - // let sum = 0; - // this.parent.scan((idx, columns) => { - // if (this.predicate(idx, columns)) ++sum; - // }); - // return sum; - let sum = 0; - for (let batch = -1; ++batch < this.lengths.length;) { - const length = this.lengths[batch]; - - // load batches - const columns = this.batches[batch]; - const predicate = this.predicate.bind(columns); - - // yield all indices - for (let idx = -1; ++idx < length;) { - if (predicate(idx, columns)) ++sum; - } - } - return sum; - } - - filter(predicate: Predicate): DataFrameOps { - return new FilteredDataFrame( - this.parent, - this.predicate.and(predicate) - ); - } -} - -function isAligned(columns: Vector[]): columns is VirtualVector[] { - if (columns.every((col) => col instanceof VirtualVector)) { - const virtuals = columns as VirtualVector[] - - return virtuals.slice(1).every((col) => { - return col.aligned(virtuals[0]); - }); - } - return false; -} diff --git a/js/src/dataframe/predicate.ts b/js/src/vector/predicate.ts similarity index 100% rename from js/src/dataframe/predicate.ts rename to js/src/vector/predicate.ts diff --git a/js/src/vector/table.ts b/js/src/vector/table.ts index ca2b66a22da80..e81fe16a94ae8 100644 --- a/js/src/vector/table.ts +++ b/js/src/vector/table.ts @@ -18,44 +18,164 @@ import { Vector } from './vector'; import { StructVector, StructRow } from './struct'; import { read, readAsync } from '../reader/arrow'; +import { Predicate } from './predicate'; -function concatVectors(tableVectors: Vector[], batchVectors: Vector[]) { - return tableVectors.length === 0 ? batchVectors : batchVectors.map((vec, i, _vs, col = tableVectors[i]) => - vec && col && col.concat(vec) || col || vec - ) as Vector[]; +export type NextFunc = (idx: number, cols: Vector[]) => void; + +export class DataFrameRow extends StructRow { + constructor (batch: Vector[], idx: number) { + super(new StructVector({columns: batch}), idx); + } + toString() { + return this.toArray().map((x) => JSON.stringify(x)).join(', '); + } } -export class Table extends StructVector { +export interface DataFrame { + readonly batches: Vector[][]; + readonly lengths: Uint32Array; + filter(predicate: Predicate): DataFrame; + scan(next: NextFunc): void; + count(): number; +} + +function columnsFromBatches(batches: Vector[][]) { + const remaining = batches.slice(1); + return batches[0].map((vec, colidx) => + vec.concat(...remaining.map((batch) => batch[colidx])) + ); +} + +export class Table extends StructVector implements DataFrame { static from(sources?: Iterable | object | string) { - let columns: Vector[] = []; + let batches: Vector[][] = [[]]; if (sources) { - for (let vectors of read(sources)) { - columns = concatVectors(columns, vectors); - } + batches = Array.from(read(sources)); } - return new Table({ columns }); + return new Table({ batches }); } static async fromAsync(sources?: AsyncIterable) { - let columns: Vector[] = []; + let batches: Vector[][] = [[]]; if (sources) { - for await (let vectors of readAsync(sources)) { - columns = columns = concatVectors(columns, vectors); + batches = []; + for await (let batch of readAsync(sources)) { + batches.push(batch); } } - return new Table({ columns }); + return new Table({ batches }); } + + // VirtualVector of each column, spanning batches + readonly columns: Vector[]; + + // List of batches, where each batch is a list of Vectors + readonly batches: Vector[][]; + readonly lengths: Uint32Array; readonly length: number; - constructor(argv: { columns: Vector[] }) { - super(argv); - this.length = Math.max(...this.columns.map((col) => col.length)) | 0; + constructor(argv: { batches: Vector[][] }) { + super({columns: columnsFromBatches(argv.batches)}); + this.batches = argv.batches; + this.lengths = new Uint32Array(this.batches.map((batch) => batch[0].length)); + + this.length = this.lengths.reduce((acc, length) => acc + length); + } + get(idx: number): DataFrameRow { + let batch = 0; + while (idx > this.lengths[batch] && batch < this.lengths.length) + idx -= this.lengths[batch++]; + + if (batch === this.lengths.length) throw new Error("Overflow") + + else return new DataFrameRow(this.batches[batch], idx); + } + filter(predicate: Predicate): DataFrame { + return new FilteredDataFrame(this, predicate); + } + scan(next: NextFunc) { + for (let batch = -1; ++batch < this.lengths.length;) { + const length = this.lengths[batch]; + + // load batches + const columns = this.batches[batch]; + + // yield all indices + for (let idx = -1; ++idx < length;) { + next(idx, columns) + } + } } - get(index: number): TableRow { - return new TableRow(this, index); + count(): number { + return this.lengths.reduce((acc, val) => acc + val); + } + *[Symbol.iterator]() { + for (let batch = -1; ++batch < this.lengths.length;) { + const length = this.lengths[batch]; + + // load batches + const columns = this.batches[batch]; + + // yield all indices + for (let idx = -1; ++idx < length;) { + yield new DataFrameRow(columns, idx); + } + } } } -export class TableRow extends StructRow { - toString() { - return this.toArray().map((x) => JSON.stringify(x)).join(', '); +class FilteredDataFrame implements DataFrame { + readonly lengths: Uint32Array; + readonly batches: Vector[][]; + constructor (readonly parent: DataFrame, private predicate: Predicate) { + this.batches = parent.batches; + this.lengths = parent.lengths; + } + + scan(next: NextFunc) { + // inlined version of this: + // this.parent.scan((idx, columns) => { + // if (this.predicate(idx, columns)) next(idx, columns); + // }); + for (let batch = -1; ++batch < this.lengths.length;) { + const length = this.lengths[batch]; + + // load batches + const columns = this.batches[batch]; + const predicate = this.predicate.bind(columns); + + // yield all indices + for (let idx = -1; ++idx < length;) { + if (predicate(idx, columns)) next(idx, columns); + } + } + } + + count(): number { + // inlined version of this: + // let sum = 0; + // this.parent.scan((idx, columns) => { + // if (this.predicate(idx, columns)) ++sum; + // }); + // return sum; + let sum = 0; + for (let batch = -1; ++batch < this.lengths.length;) { + const length = this.lengths[batch]; + + // load batches + const columns = this.batches[batch]; + const predicate = this.predicate.bind(columns); + + // yield all indices + for (let idx = -1; ++idx < length;) { + if (predicate(idx, columns)) ++sum; + } + } + return sum; + } + + filter(predicate: Predicate): DataFrame { + return new FilteredDataFrame( + this.parent, + this.predicate.and(predicate) + ); } }