Skip to content

Commit

Permalink
Allow record updates via envoy on matching IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
tjerman authored and Fajfa committed Aug 22, 2024
1 parent 65b8def commit f17a9da
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 15 deletions.
4 changes: 4 additions & 0 deletions server/compose/envoy/record_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type (
Mapping envoyx.DatasourceMapping
Provider envoyx.Provider

CheckExisting func(ctx context.Context, ref ...[]string) ([]uint64, error)

currentIndex int

// Reusable buffer for reading records
Expand All @@ -25,6 +27,8 @@ type (
// Index to map from ref to ID
// @todo we might need to flush these to the disc in case a huge dataset is passed in
refToID map[string]uint64
// @todo might be worth putting both into one map; not sure how much space we'd save up
existingIDs map[uint64]bool
}

// iteratorProvider is a wrapper around the dal.Iterator to conform to the
Expand Down
5 changes: 3 additions & 2 deletions server/compose/envoy/store_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,9 @@ func (d StoreDecoder) decodeRecordDatasource(ctx context.Context, s store.Storer
}

ou := &RecordDatasource{
Provider: &iteratorProvider{iter: iter},
refToID: make(map[string]uint64),
Provider: &iteratorProvider{iter: iter},
refToID: make(map[string]uint64),
existingIDs: make(map[uint64]bool),
// @todo consider providing defaults from the outside
Mapping: envoyx.DatasourceMapping{
KeyField: []string{"id"},
Expand Down
7 changes: 6 additions & 1 deletion server/compose/envoy/store_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,14 @@ func (e StoreEncoder) validateRecord(*types.Record) (err error) {
}

func (e StoreEncoder) prepare(ctx context.Context, p envoyx.EncodeParams, s store.Storer, rt string, nn envoyx.NodeSet) (err error) {
dl, err := e.grabDal(p)
if err != nil {
return
}

switch rt {
case ComposeRecordDatasourceAuxType:
return e.prepareRecordDatasource(ctx, p, s, nn)
return e.prepareRecordDatasource(ctx, p, s, dl, nn)
}

return
Expand Down
64 changes: 53 additions & 11 deletions server/compose/envoy/store_encode_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
rvFormatter = values.Formatter()
)

func (e StoreEncoder) prepareRecordDatasource(ctx context.Context, p envoyx.EncodeParams, s store.Storer, nn envoyx.NodeSet) (err error) {
func (e StoreEncoder) prepareRecordDatasource(ctx context.Context, p envoyx.EncodeParams, s store.Storer, dl dal.FullService, nn envoyx.NodeSet) (err error) {
// @todo match existing records; for now use just the ID like V1

for _, n := range nn {
Expand All @@ -41,7 +41,7 @@ func (e StoreEncoder) prepareRecordDatasource(ctx context.Context, p envoyx.Enco
panic("unexpected datasource type: node expecting type of RecordDatasource")
}

err = e.prepareRecords(ctx, p, s, ds)
err = e.prepareRecords(ctx, p, s, dl, ds, nn)
if err != nil {
return
}
Expand All @@ -54,22 +54,45 @@ func (e StoreEncoder) prepareRecordDatasource(ctx context.Context, p envoyx.Enco
return
}

func (e StoreEncoder) prepareRecords(ctx context.Context, p envoyx.EncodeParams, s store.Storer, ds *RecordDatasource) (err error) {
func (e StoreEncoder) prepareRecords(ctx context.Context, p envoyx.EncodeParams, s store.Storer, dl dal.FullService, ds *RecordDatasource, nn envoyx.NodeSet) (err error) {
var (
aux = make(map[string]string)
more bool
ident []string
)

ds.refToID = make(map[string]uint64)
ds.existingIDs = make(map[uint64]bool)

// Just so we don't need to do a branch later down the line
cex := ds.CheckExisting
if cex == nil || len(ds.Mapping.KeyField) == 0 {
cex = func(ctx context.Context, ref ...[]string) ([]uint64, error) {
return make([]uint64, len(ref)), nil
}
}

for {
ident, more, err = ds.Next(ctx, aux)
if err != nil || !more {
return
}

ds.AddRef(id.Next(), ident...)
// @todo we'll need to batch these up
existing, err := cex(ctx, ident)
if err != nil {
return err
}

if existing[0] != 0 {
ds.existingIDs[existing[0]] = true
}

if existing[0] == 0 {
existing[0] = id.Next()
}

ds.AddRef(existing[0], ident...)

// Construct a simple record for basic validation/preprocessing
rec := types.Record{}
Expand Down Expand Up @@ -143,7 +166,8 @@ func (e StoreEncoder) encodeRecordDatasource(ctx context.Context, p envoyx.Encod

var (
rec types.Record
records types.RecordSet
creates types.RecordSet
updates types.RecordSet
rve *types.RecordValueErrorSet
)
for {
Expand Down Expand Up @@ -185,15 +209,29 @@ func (e StoreEncoder) encodeRecordDatasource(ctx context.Context, p envoyx.Encod
}

ax := rec
records = append(records, &ax)

if len(records) > recordBatchMaxChunk {
err = dalutils.ComposeRecordCreate(ctx, dl, mod, records...)
if ds.existingIDs[rec.ID] {
updates = append(updates, &ax)
} else {
creates = append(creates, &ax)
}

if len(creates) > recordBatchMaxChunk {
err = dalutils.ComposeRecordCreate(ctx, dl, mod, creates...)
if err != nil {
return
}

creates = make(types.RecordSet, 0, recordBatchMaxChunk/2)
}

if len(updates) > recordBatchMaxChunk {
err = dalutils.ComposeRecordUpdate(ctx, dl, mod, updates...)
if err != nil {
return
}

records = make(types.RecordSet, 0, recordBatchMaxChunk/2)
updates = make(types.RecordSet, 0, recordBatchMaxChunk/2)
}
return
}()
Expand All @@ -213,8 +251,12 @@ func (e StoreEncoder) encodeRecordDatasource(ctx context.Context, p envoyx.Encod
}
}

if len(records) > 0 {
err = dalutils.ComposeRecordCreate(ctx, dl, mod, records...)
if len(creates) > 0 {
err = dalutils.ComposeRecordCreate(ctx, dl, mod, creates...)
}

if len(updates) > 0 {
err = dalutils.ComposeRecordUpdate(ctx, dl, mod, updates...)
}
return
}
Expand Down
67 changes: 67 additions & 0 deletions server/compose/rest/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/cortezaproject/corteza/server/compose/dalutils"
composeEnvoy "github.com/cortezaproject/corteza/server/compose/envoy"
"github.com/cortezaproject/corteza/server/compose/rest/request"
"github.com/cortezaproject/corteza/server/compose/service"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cortezaproject/corteza/server/pkg/filter"
"github.com/cortezaproject/corteza/server/pkg/revisions"
"github.com/cortezaproject/corteza/server/store"
"github.com/spf13/cast"
)

type (
Expand Down Expand Up @@ -530,17 +532,80 @@ func (ctrl *Record) ImportRun(ctx context.Context, r *request.RecordImportRun) (
Placeholder: true,
}

keyField := []string{}

// Check if we're mapping the ID field even
for _, v := range importSession.Fields {
auxf := strings.ToLower(v)

if auxf == "id" || auxf == "recordid" || auxf == "record_id" {
keyField = []string{importSession.Key}
}
}

node = &envoyx.Node{
Datasource: &composeEnvoy.RecordDatasource{
Mapping: envoyx.DatasourceMapping{
SourceIdent: importSession.Name,
References: map[string]string{},
Scope: map[string]string{},
Defaultable: false,
KeyField: keyField,
Mapping: envoyx.FieldMapping{
Map: fieldMapping,
},
},

CheckExisting: func(ctx context.Context, idents ...[]string) (out []uint64, err error) {
qp := make([]string, 0, len(idents))
for _, ident := range idents {
if len(ident) != 1 {
continue
}

rid := cast.ToUint64(ident[0])
if rid == 0 {
continue
}

qp = append(qp, fmt.Sprintf("recordID='%d'", rid))
}

bong, _, err := dalutils.ComposeRecordsList(ctx, dal.Service(), mod, types.RecordFilter{
ModuleID: mod.ID,
NamespaceID: mod.NamespaceID,
Query: strings.Join(qp, " OR "),
})
if err != nil {
return
}

for _, ident := range idents {
if len(ident) != 1 {
out = append(out, 0)
continue
}

rid := cast.ToUint64(ident[0])
if rid == 0 {
out = append(out, 0)
continue
}

// Find the correct one in the fetched slice
got := uint64(0)
for _, r := range bong {
if r.ID == rid {
got = r.ID
break
}
}

out = append(out, got)
}

return
},
},
ResourceType: composeEnvoy.ComposeRecordDatasourceAuxType,
Identifiers: envoyx.MakeIdentifiers(importSession.Name),
Expand Down Expand Up @@ -573,6 +638,8 @@ func (ctrl *Record) ImportRun(ctx context.Context, r *request.RecordImportRun) (
return
}

// panic("AAAAAA")

{
// @todo this is temporary because the service's logic is a bit flawed for this case
err = storeEncoder.Prepare(ctx, encodeParams, composeEnvoy.ComposeRecordDatasourceAuxType, envoyx.NodeSet{node})
Expand Down
2 changes: 1 addition & 1 deletion server/compose/service/import_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (svc *importSession) Create(ctx context.Context, f io.ReadSeeker, name, con

// @todo improve this bit
k := prepKey(f)
if k == "id" || k == "recordid" {
if k == "id" || k == "recordid" || k == "record_id" {
sh.Key = f
}
}
Expand Down

0 comments on commit f17a9da

Please sign in to comment.