Skip to content

Commit

Permalink
Merge pull request #312 from bmeg/feature/pivot
Browse files Browse the repository at this point in the history
Pivot Operation
  • Loading branch information
kellrott authored Aug 5, 2024
2 parents aeb30de + 40967a7 commit b172cf7
Show file tree
Hide file tree
Showing 16 changed files with 990 additions and 735 deletions.
6 changes: 6 additions & 0 deletions conformance/graphs/fhir.edges
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"from":"patient_a", "to":"observation_a1", "label":"patient_observation"}
{"from":"patient_a", "to":"observation_a2", "label":"patient_observation"}
{"from":"patient_a", "to":"observation_a3", "label":"patient_observation"}
{"from":"patient_b", "to":"observation_b1", "label":"patient_observation"}
{"from":"patient_b", "to":"observation_b2", "label":"patient_observation"}
{"from":"patient_b", "to":"observation_b3", "label":"patient_observation"}
8 changes: 8 additions & 0 deletions conformance/graphs/fhir.vertices
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"gid":"patient_a", "label":"Patient", "data":{"name":"Alice"}}
{"gid":"patient_b", "label":"Patient", "data":{"name":"Bob"}}
{"gid":"observation_a1", "label":"Observation", "data":{"key":"age", "value":36}}
{"gid":"observation_a2", "label":"Observation", "data":{"key":"sex", "value":"Female"}}
{"gid":"observation_a3", "label":"Observation", "data":{"key":"blood_pressure", "value":"111/78"}}
{"gid":"observation_b1", "label":"Observation", "data":{"key":"age", "value":42}}
{"gid":"observation_b2", "label":"Observation", "data":{"key":"sex", "value":"Male"}}
{"gid":"observation_b3", "label":"Observation", "data":{"key":"blood_pressure", "value":"120/80"}}
2 changes: 1 addition & 1 deletion conformance/tests/ot_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def test_traversal_gid_aggregation(man):
return errors

if planet_agg_map[row["key"]] != row["value"]:
errors.append("Incorrect bucket count returned: %s" % res)
errors.append("Incorrect bucket count returned: %s" % row)

if count != 2:
errors.append(
Expand Down
14 changes: 14 additions & 0 deletions conformance/tests/ot_pivot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from __future__ import absolute_import

import gripql


def test_pivot(man):
errors = []
G = man.setGraph("fhir")

for row in G.query().V().hasLabel("Patient").as_("a").out("patient_observation").pivot("$a._gid", "$.key", "$.value" ):
print(row)

return errors

69 changes: 69 additions & 0 deletions engine/core/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math"
"reflect"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/gdbi/tpath"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/kvi"
"github.com/bmeg/grip/log"
"github.com/bmeg/grip/util/copy"
"github.com/influxdata/tdigest"
Expand Down Expand Up @@ -433,6 +435,73 @@ func (r *Render) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe,

////////////////////////////////////////////////////////////////////////////////

// Render takes current state and renders into requested structure
type Pivot struct {
Stmt *gripql.PivotStep
}

// Process runs the pivot processor
func (r *Pivot) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe, out gdbi.OutPipe) context.Context {
go func() {
defer close(out)
kv := man.GetTempKV()
kv.BulkWrite(func(bl kvi.KVBulkWrite) error {
for t := range in {
if t.IsSignal() {
out <- t
continue
}
//fmt.Printf("Checking %#v\n", t.GetCurrent())
id := gdbi.TravelerPathLookup(t, r.Stmt.Id)
if idStr, ok := id.(string); ok {
field := gdbi.TravelerPathLookup(t, r.Stmt.Field)
if fieldStr, ok := field.(string); ok {
value := gdbi.TravelerPathLookup(t, r.Stmt.Value)
if v, err := json.Marshal(value); err == nil {
key := bytes.Join([][]byte{[]byte(idStr), []byte(fieldStr)}, []byte{0})
bl.Set(key, v)
}
}
}
}
return nil
})
kv.View(func(it kvi.KVIterator) error {
it.Seek([]byte{0})
lastKey := ""
curDict := map[string]any{}
for it.Seek([]byte{0}); it.Valid(); it.Next() {
tmp := bytes.Split(it.Key(), []byte{0})
curKey := string(tmp[0])
curField := string(tmp[1])
if lastKey == "" {
lastKey = curKey
}
var curData any
value, _ := it.Value()
json.Unmarshal(value, &curData)
if lastKey != curKey {
curDict["_id"] = curKey
out <- &gdbi.BaseTraveler{Render: curDict}
curDict = map[string]any{}
curDict[curField] = curData
lastKey = curKey
} else {
curDict[curField] = curData
}
}
if lastKey != "" {
curDict["_id"] = lastKey
out <- &gdbi.BaseTraveler{Render: curDict}
}
return nil
})
}()
return ctx
}

////////////////////////////////////////////////////////////////////////////////

// Path tells system to return path data
type Path struct {
Template interface{} //this isn't really used yet.
Expand Down
4 changes: 4 additions & 0 deletions engine/core/statement_compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (sc *DefaultStmtCompiler) Render(stmt *gripql.GraphStatement_Render, ps *gd
return &Render{stmt.Render.AsInterface()}, nil
}

func (sc *DefaultStmtCompiler) Pivot(stmt *gripql.GraphStatement_Pivot, ps *gdbi.State) (gdbi.Processor, error) {
return &Pivot{stmt.Pivot}, nil
}

func (sc *DefaultStmtCompiler) Path(stmt *gripql.GraphStatement_Path, ps *gdbi.State) (gdbi.Processor, error) {
return &Path{stmt.Path.AsSlice()}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions gdbi/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type StatementCompiler interface {
Select(gs *gripql.GraphStatement_Select, ps *State) (Processor, error)

Render(gs *gripql.GraphStatement_Render, ps *State) (Processor, error)
Pivot(gs *gripql.GraphStatement_Pivot, ps *State) (Processor, error)

Path(gs *gripql.GraphStatement_Path, ps *State) (Processor, error)
Unwind(gs *gripql.GraphStatement_Unwind, ps *State) (Processor, error)
Fields(gs *gripql.GraphStatement_Fields, ps *State) (Processor, error)
Expand Down
8 changes: 8 additions & 0 deletions gdbi/statement_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ func StatementProcessor(
ps.LastType = RenderData
return out, err

case *gripql.GraphStatement_Pivot:
if ps.LastType != VertexData && ps.LastType != EdgeData {
return nil, fmt.Errorf(`"pivot" statement is only valid for edge or vertex types not: %s`, ps.LastType.String())
}
out, err := sc.Pivot(stmt, ps)
ps.LastType = RenderData
return out, err

case *gripql.GraphStatement_Path:
if ps.LastType != VertexData && ps.LastType != EdgeData {
return nil, fmt.Errorf(`"path" statement is only valid for edge or vertex types not: %s`, ps.LastType.String())
Expand Down
Loading

0 comments on commit b172cf7

Please sign in to comment.