Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for stateful array with grouping using hashmap with buckets #67

Merged
merged 10 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 164 additions & 27 deletions src/sql-newcodegen.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ const { unique, union } = sets
const KEY_SIZE = 256
const HASH_SIZE = 256

const BUCKET_SIZE = 256
const DATA_SIZE = KEY_SIZE * BUCKET_SIZE

const HASH_MASK = HASH_SIZE - 1

let filters
Expand Down Expand Up @@ -36,6 +39,7 @@ let initRequired = {
"min": true,
"max": true,
"count": true,
"array": true,
}


Expand Down Expand Up @@ -110,6 +114,8 @@ let cgen = {
plus: (lhs, rhs) => cgen.binary(lhs, rhs, "+"),
minus: (lhs, rhs) => cgen.binary(lhs, rhs, "-"),

mul: (lhs, rhs) => cgen.binary(lhs, rhs, "*"),

and: (lhs, rhs) => cgen.binary(lhs, rhs, "&&"),
equal: (lhs, rhs) => cgen.binary(lhs, rhs, "=="),
notEqual: (lhs, rhs) => cgen.binary(lhs, rhs, "!="),
Expand All @@ -129,17 +135,18 @@ let cgen = {
comment: (buf) => (s) => buf.push("// " + s),
stmt: (buf) => (expr) => buf.push(expr + ";"),

declareVar: (buf) => (type, name, init) => buf.push(type + " " + name + (init ? ` = ${init};` : ";")),
declareArr: (buf) => (type, name, len, init) => buf.push(`${type} ${name}[${len}]` + (init ? ` = ${init};` : ";")),
declarePtr: (buf) => (type, name, init) => buf.push(`${type} *${name}` + (init ? ` = ${init};` : ";")),
declarePtrPtr: (buf) => (type, name, init) => buf.push(`${type} **${name}` + (init ? ` = ${init};` : ";")),
declareVar: (buf) => (type, name, init, constant = false) => buf.push((constant ? "const " : "") + type + " " + name + (init ? ` = ${init};` : ";")),
declareArr: (buf) => (type, name, len, init, constant = false) => buf.push((constant ? "const " : "") + `${type} ${name}[${len}]` + (init ? ` = ${init};` : ";")),
declarePtr: (buf) => (type, name, init, constant = false) => buf.push((constant ? "const " : "") + `${type} *${name}` + (init ? ` = ${init};` : ";")),
declarePtrPtr: (buf) => (type, name, init, constant = false) => buf.push((constant ? "const " : "") + `${type} **${name}` + (init ? ` = ${init};` : ";")),

declareInt: (buf) => (name, init) => cgen.declareVar(buf)("int", name, init),
declareULong: (buf) => (name, init) => cgen.declareVar(buf)("unsigned long", name, init),
declareCharArr: (buf) => (name, len, init) => cgen.declareArr(buf)("char", name, len, init),
declareIntPtr: (buf) => (name, len, init) => cgen.declarePtr(buf)("int", name, len, init),
declareCharPtr: (buf) => (name, len, init) => cgen.declarePtr(buf)("char", name, len, init),
declareCharPtrPtr: (buf) => (name, len, init) => cgen.declarePtrPtr(buf)("char", name, len, init),
declareIntPtr: (buf) => (name, init) => cgen.declarePtr(buf)("int", name, init),
declareCharPtr: (buf) => (name, init) => cgen.declarePtr(buf)("char", name, init),
declareConstCharPtr: (buf) => (name, init) => cgen.declarePtr(buf)("char", name, init, true),
declareCharPtrPtr: (buf) => (name, init) => cgen.declarePtrPtr(buf)("char", name, init),

printErr: (buf) => (fmt, ...args) => buf.push(cgen.call("fprintf", "stderr", fmt, ...args) + ";"),

Expand Down Expand Up @@ -289,7 +296,7 @@ let codegen = (q, buf) => {
return String(q.op)
} else if (typeof q.op == "string") {
let name = getNewName("tmp_str")
cgen.declareCharArr(buf)(name, q.op.length + 1, '"' + q.op + '"')
cgen.declareConstCharPtr(buf)(name, '"' + q.op + '"')
return { str: name, len: q.op.length }
} else {
throw new Error("constant not supported: " + pretty(q))
Expand Down Expand Up @@ -402,6 +409,7 @@ let hash = (buf, key, schema) => {
// Emit the code that finds the key in the hashmap.
// Linear probing is used for resolving collisions.
// Comparison of keys is based on different key types.
// The actual storage of the values / data does not affect the lookup
let hashLookUp = (buf, sym, key) => {
let { keySchema } = hashMapEnv[sym]
let hashed = hash(buf, key, keySchema)
Expand Down Expand Up @@ -469,14 +477,18 @@ let hashLookUpOrUpdate = (buf, sym, key, update) => {
}

let lhs
if (typing.isString(valSchema)) {
if (typing.isObject(valSchema)) {
lhs = `${sym}_bucket_counts[${keyPos}]`
} else if (typing.isString(valSchema)) {
lhs = { str: `${sym}_values_str[${keyPos}]`, len: `${sym}_values_len[${keyPos}]` }
} else {
lhs = `${sym}_values[${keyPos}]`
}

update(lhs)
})

return [pos, keyPos]
}

// Emit the code that performs a lookup of the key in the hashmap, then
Expand Down Expand Up @@ -506,13 +518,47 @@ let hashUpdate = (buf, sym, key, update) => {
})

let lhs
if (typing.isString(valSchema)) {
if (typing.isObject(valSchema)) {
lhs = `${sym}_bucket_counts[${keyPos}]`
} else if (typing.isString(valSchema)) {
lhs = { str: `${sym}_values_str[${keyPos}]`, len: `${sym}_values_len[${keyPos}]` }
} else {
lhs = `${sym}_values[${keyPos}]`
}

update(lhs)

return [pos, keyPos]
}

let hashBufferInsert = (buf, sym, key, value) => {
let { keySchema, valSchema } = hashMapEnv[sym]

let [pos, keyPos] = hashLookUp(buf, sym, key)

let dataPos = getNewName("data_pos")
cgen.declareInt(buf)(dataPos, `${sym}_data_count`)

cgen.stmt(buf)(cgen.inc(`${sym}_data_count`))

let bucketPos = getNewName("bucket_pos")
cgen.declareInt(buf)(bucketPos, `${sym}_bucket_counts[${keyPos}]`)

cgen.stmt(buf)(cgen.assign(`${sym}_bucket_counts[${keyPos}]`, cgen.plus(bucketPos, "1")))

let idx = cgen.plus(cgen.mul(keyPos, BUCKET_SIZE), bucketPos)
cgen.stmt(buf)(cgen.assign(`${sym}_buckets[${idx}]`, dataPos))

if (!typing.isObject(valSchema)) {
throw new Error("array type expected")
}

if (typing.isString(valSchema.objValue)) {
cgen.stmt(buf)(cgen.assign(`${sym}_data_str[${dataPos}]`, value.str))
cgen.stmt(buf)(cgen.assign(`${sym}_data_len[${dataPos}]`, value.len))
} else {
cgen.stmt(buf)(cgen.assign(`${sym}_data[${dataPos}]`, value))
}
}

// Emit code that initializes a hashmap.
Expand Down Expand Up @@ -544,47 +590,122 @@ let hashMapInit = (buf, sym, keySchema, valSchema) => {

cgen.comment(buf)(`values of ${sym}`)

if (typing.isString(valSchema)) {
if (typing.isObject(valSchema)) {
// stateful "array" op
if (typing.isString(valSchema.objValue)) {
// arrays for the actual data will have size KEY_SIZE * BUCKET_SIZE
cgen.declareCharPtrPtr(buf)(`${sym}_data_str`, cgen.cast("char **", cgen.malloc("char *", DATA_SIZE)))
cgen.declareIntPtr(buf)(`${sym}_data_len`, cgen.cast("int *", cgen.malloc("int", DATA_SIZE)))
} else {
let cType = convertToCType(valSchema.objValue)
cgen.declarePtr(buf)(cType, `${sym}_data`, cgen.cast(`${cType} *`, cgen.malloc(cType, DATA_SIZE)))
}
cgen.declareInt(buf)(`${sym}_data_count`, "0")

cgen.declareIntPtr(buf)(`${sym}_buckets`, cgen.cast("int *", cgen.malloc("int", DATA_SIZE)))
cgen.declareIntPtr(buf)(`${sym}_bucket_counts`, cgen.cast("int *", cgen.malloc("int", KEY_SIZE)))
// throw new Error("hashMap value object not implemented")
} else if (typing.isString(valSchema)) {
cgen.declareCharPtrPtr(buf)(`${sym}_values_str`, cgen.cast("char **", cgen.malloc("char *", KEY_SIZE)))
cgen.declareIntPtr(buf)(`${sym}_values_len`, cgen.cast("int *", cgen.malloc("int", KEY_SIZE)))
} else if (typing.isObject(valSchema)) {
if (!typing.isInteger(valSchema.objKey)) {
throw new Error("hashMap value object does not support non-integer keys")
}
throw new Error("hashMap value object not implemented")
} else {
// let convertToCType report "type not supported" errors
let cType = convertToCType(valSchema)
cgen.declarePtr(buf)(cType, `${sym}_values`, cgen.cast(`${cType} *`, cgen.malloc(cType, KEY_SIZE)))
}

hashMapEnv[sym] = { keySchema, valSchema }
}

let hashMapShallowCopy = (buf, sym1, sym2, keySchema, valSchema) => {
cgen.comment(buf)(`init hashmap for ${sym1}`)
// keys
cgen.comment(buf)(`keys of ${sym1}`)

if (typing.isString(keySchema)) {
cgen.declareCharPtrPtr(buf)(`${sym1}_keys_str`, `${sym2}_keys_str`)
cgen.declareIntPtr(buf)(`${sym1}_keys_len`, `${sym2}_keys_len`)
} else {
let cType = convertToCType(keySchema)
cgen.declarePtr(buf)(cType, `${sym1}_keys`, `${sym2}_keys`)
}

cgen.comment(buf)(`key count for ${sym1}`)
cgen.declareInt(buf)(`${sym1}_key_count`, `${sym2}_key_count`)

// htable
cgen.comment(buf)(`hash table for ${sym1}`)
cgen.declareIntPtr(buf)(`${sym1}_htable`, `${sym2}_htable`)

cgen.comment(buf)(`values of ${sym1}`)

if (typing.isObject(valSchema)) {
// stateful "array" op
if (typing.isString(valSchema.objValue)) {
cgen.declareCharPtrPtr(buf)(`${sym1}_data_str`, `${sym2}_data_str`)
cgen.declareIntPtr(buf)(`${sym1}_data_len`, `${sym2}_data_len`)
} else {
let cType = convertToCType(valSchema.objValue)
cgen.declarePtr(buf)(cType, `${sym1}_data`, `${sym2}_data`)
}
cgen.declareInt(buf)(`${sym1}_data_count`, `${sym2}_data_count`)

cgen.declareIntPtr(buf)(`${sym1}_buckets`, `${sym2}_buckets`)
cgen.declareIntPtr(buf)(`${sym1}_bucket_counts`, `${sym2}_bucket_counts`)
} else if (typing.isString(valSchema)) {
cgen.declareCharPtrPtr(buf)(`${sym1}_values_str`, `${sym2}_values_str`)
cgen.declareIntPtr(buf)(`${sym1}_values_len`, `${sym2}_values_len`)
} else {
// let convertToCType report "type not supported" errors
let cType = convertToCType(valSchema)
cgen.declarePtr(buf)(cType, `${sym1}_values`, `${sym2}_values`)
}

hashMapEnv[sym1] = { keySchema, valSchema }
}

// Emit code that prints the keys and values in a hashmap.
let hashMapPrint = (buf, sym) => {
let { keySchema, valSchema } = hashMapEnv[sym]
buf.push(`for (int i = 0; i < ${HASH_SIZE}; i++) {`)
buf.push(`int keyPos = ${sym}_htable[i];`)
buf.push(`if (keyPos == -1) {`)
buf.push(`int key_pos = ${sym}_htable[i];`)
buf.push(`if (key_pos == -1) {`)
buf.push(`continue;`)
buf.push(`}`)
buf.push(`// print key`)

if (typing.isString(keySchema)) {
buf.push(`print(${sym}_keys_str[keyPos], ${sym}_keys_len[keyPos]);`)
buf.push(`print(${sym}_keys_str[key_pos], ${sym}_keys_len[key_pos]);`)
} else {
buf.push(`printf("%${getFormatSpecifier(keySchema)}", ${sym}_keys[keyPos]);`)
buf.push(`printf("%${getFormatSpecifier(keySchema)}", ${sym}_keys[key_pos]);`)
}

buf.push(`print(": ", 2);`)

buf.push(`// print value`)
if (typing.isString(valSchema)) {
buf.push(`print(${sym}_values_str[keyPos], ${sym}_values_len[keyPos]);`)
buf.push(`print("\\n", 1);`)
if (typing.isObject(valSchema)) {
buf.push(`print("[", 1);`)
buf.push(`int bucket_count = ${sym}_bucket_counts[key_pos];`)
buf.push(`for (int j = 0; j < bucket_count; j++) {`)
buf.push(`int data_pos = ${sym}_buckets[key_pos * 256 + j];`)

if (typing.isString(valSchema.objValue)) {
buf.push(`print(${sym}_data_str[data_pos], ${sym}_data_len[data_pos]);`)
} else {
buf.push(`printf("%${getFormatSpecifier(valSchema.objValue)}", ${sym}_data[data_pos]);`)
}

buf.push(`if (j != bucket_count - 1) {`)
buf.push(`print(", ", 2);`)
buf.push(`}`)
buf.push(`}`)
buf.push(`print("]", 1);`)
} else if (typing.isString(valSchema)) {
buf.push(`print(${sym}_values_str[key_pos], ${sym}_values_len[key_pos]);`)
} else {
buf.push(`printf("%${getFormatSpecifier(valSchema)}\\n", ${sym}_values[keyPos]);`)
buf.push(`printf("%${getFormatSpecifier(valSchema)}", ${sym}_values[key_pos]);`)
}
buf.push(`print("\\n", 1);`)
buf.push(`}`)
}

Expand All @@ -602,6 +723,8 @@ let emitStmInit = (q, sym) => {
init = `INT_MAX`
} else if (q.op == "max") {
init = `INT_MIN`
} else if (q.op == "array") {
init = `0`
} else {
throw new Error("stateful op not supported: " + pretty(q))
}
Expand Down Expand Up @@ -670,7 +793,9 @@ let emitStmUpdate = (q, sym) => {
update = (lhs) => cgen.stmt(buf)(cgen.assign(lhs, e1))
}
} else if (q.op == "array") {
throw new Error("stateful op not implmeneted: " + pretty(q))
let key = mksetVarEnv[q.fre[0]].val
hashBufferInsert(buf, sym, key, e1)
return buf
} else {
throw new Error("stateful op not supported: " + pretty(q))
}
Expand Down Expand Up @@ -870,6 +995,8 @@ let emitCode = (q, ir) => {
prolog.push(`#include "rhyme-sql.h"`)
prolog.push("int main() {")

let trivialUpdate = {}

// Collect hashmaps for groupby
for (let i in assignments) {
let sym = tmpSym(i)
Expand All @@ -879,6 +1006,10 @@ let emitCode = (q, ir) => {
if (q.key == "update") {
let keySchema = q.arg[3].arg[0].arg[0].schema
hashMapEnv[sym] = { keySchema: keySchema.type, valSchema: q.schema.type.objValue }

if (q.arg[2].fre.length == 1 && q.arg[1].op == q.arg[2].fre[0]) {
trivialUpdate[sym] = (q.arg[2].key == "pure" ? tmpSym(q.arg[2].arg[0].op) : tmpSym(q.arg[2].op))
}
}
}

Expand Down Expand Up @@ -949,7 +1080,13 @@ let emitCode = (q, ir) => {
let keySchema = mksetVarEnv[q.fre[0]].schema
let buf = []
hashMapInit(buf, sym, keySchema.type, q.schema.type)
assign(buf, sym, [], []);
assign(buf, sym, [], [])
} else if (q.key == "update" && trivialUpdate[sym] !== undefined) {
let keySchema = mksetVarEnv[q.arg[1].op].schema
let buf = []
hashMapShallowCopy(buf, sym, trivialUpdate[sym], keySchema.type, q.schema.type.objValue)
assign(buf, sym, [], [trivialUpdate[sym]])
continue
}

// emit init
Expand All @@ -961,7 +1098,7 @@ let emitCode = (q, ir) => {
let fv = union(q.fre, q.bnd)
let deps = [...fv, ...q.tmps.map(tmpSym)] // XXX rhs dims only?

assign(emitStmUpdate(q, sym, q.fre), sym, q.fre, deps)
assign(emitStmUpdate(q, sym), sym, q.fre, deps)
}

let res = codegen(q, [], {})
Expand Down
33 changes: 24 additions & 9 deletions test/cgen/se-cgen-sql-new.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -520,17 +520,32 @@ let regionData = [
{ region: "Europe", country: "UK" },
]

// test("groupByArray", async () => {
// let country = rh`loadCSV "./cgen-sql/country.csv" ${countrySchema}`
// let region = rh`loadCSV "./cgen-sql/region.csv" ${regionSchema}`
test("groupByArray", async () => {
let region = rh`loadCSV "./cgen-sql/region.csv" ${regionSchema}`

let query = rh`array ${region}.*O.country | group ${region}.*O.region`

let func = await compile(query, { backend: "c-sql-new", outDir, outFile: "groupByArray.c", schema: types.never })
let res = await func()

expect(res).toBe(`Asia: [Japan, China]
Europe: [France, UK]
`)
})

// let q1 = rh`${region}.*O.region | group ${region}.*O.country`
// let query = rh`array ${country}.*.city | group ${q1}.(${country}.*.country)`
test("hashJoinArray", async () => {
let country = rh`loadCSV "./cgen-sql/country.csv" ${countrySchema}`
let region = rh`loadCSV "./cgen-sql/region.csv" ${regionSchema}`

// let func = await compile(query, { backend: "c-sql-new", outDir, outFile: "groupByArrayJS.c", schema: types.never })
// let res = await func()
let q1 = rh`${region}.*O.region | group ${region}.*O.country`
let query = rh`array ${country}.*.population | group ${q1}.(${country}.*.country)`

// console.log(res)
// })
let func = await compile(query, { backend: "c-sql-new", outDir, outFile: "hashJoinArray.c", schema: types.never })
let res = await func()

expect(res).toBe(`Asia: [30, 20]
Europe: [10, 10]
`)
})

/**/