Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

send more info in doc to transformers #90

Merged
merged 3 commits into from
Jul 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions pkg/adaptor/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,25 @@ func (t *Transformer) transformOne(msg *message.Msg) (*message.Msg, error) {
)

// short circuit for deletes and commands
if msg.Op == message.Delete || msg.Op == message.Command {
if msg.Op == message.Command {
return msg, nil
}

now := time.Now().Nanosecond()
fullDoc := map[string]interface{}{
"data": msg.Data,
"ts": msg.Timestamp,
"op": msg.Op.String(),
}
if msg.IsMap() {
if doc, err = mejson.Marshal(msg.Data); err != nil {
t.pipe.Err <- t.transformerError(ERROR, err, msg)
return msg, nil
}
} else {
doc = msg.Data
fullDoc["data"] = doc
}

if value, err = t.vm.ToValue(doc); err != nil {
if value, err = t.vm.ToValue(fullDoc); err != nil {
t.pipe.Err <- t.transformerError(ERROR, err, msg)
return msg, nil
}
Expand All @@ -141,16 +145,36 @@ func (t *Transformer) transformOne(msg *message.Msg) (*message.Msg, error) {

afterVM := time.Now().Nanosecond()

switch r := result.(type) {
fullDoc, ok := result.(map[string]interface{})
if !ok {
t.pipe.Err <- t.transformerError(ERROR, fmt.Errorf("returned doc was not a map[string]interface{}"), msg)
return msg, fmt.Errorf("returned doc was not a map[string]interface{}")
}

msg.Op = message.OpTypeFromString(fullDoc["op"].(string))
msg.Timestamp = fullDoc["ts"].(int64)
switch data := fullDoc["data"].(type) {
case otto.Value:
exported, err := data.Export()
if err != nil {
t.pipe.Err <- t.transformerError(ERROR, err, msg)
return msg, nil
}
d, err := mejson.Unmarshal(exported.(map[string]interface{}))
if err != nil {
t.pipe.Err <- t.transformerError(ERROR, err, msg)
return msg, nil
}
msg.Data = map[string]interface{}(d)
case map[string]interface{}:
doc, err := mejson.Unmarshal(r)
d, err := mejson.Unmarshal(data)
if err != nil {
t.pipe.Err <- t.transformerError(ERROR, err, msg)
return msg, nil
}
msg.Data = map[string]interface{}(doc)
msg.Data = map[string]interface{}(d)
default:
msg.Data = r
msg.Data = data
}

if t.debug {
Expand Down
20 changes: 10 additions & 10 deletions pkg/adaptor/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ func TestTransformOne(t *testing.T) {
},
{
// delete the 'name' property
"module.exports=function(doc) { return _.omit(doc, ['name']) }",
"module.exports=function(doc) { doc['data'] = _.omit(doc['data'], ['name']); return doc }",
message.NewMsg(message.Insert, map[string]interface{}{"id": "id2", "name": "nick"}),
message.NewMsg(message.Insert, map[string]interface{}{"id": "id2"}),
false,
},
{
// delete's and commands should pass through, and the transformer fn shouldn't run
"module.exports=function(doc) { return _.omit(doc, ['name']) }",
message.NewMsg(message.Delete, map[string]interface{}{"id": "id2", "name": "nick"}),
// delete's should be processed the same
"module.exports=function(doc) { doc['data'] = _.omit(doc['data'], ['name']); return doc }",
message.NewMsg(message.Delete, map[string]interface{}{"id": "id2", "name": "nick"}),
message.NewMsg(message.Delete, map[string]interface{}{"id": "id2"}),
false,
},
{
// delete's and commands should pass through, and the transformer fn shouldn't run
"module.exports=function(doc) { return _.omit(doc, ['name']) }",
"module.exports=function(doc) { return _.omit(doc['data'], ['name']) }",
message.NewMsg(message.Command, map[string]interface{}{"id": "id2", "name": "nick"}),
message.NewMsg(message.Command, map[string]interface{}{"id": "id2", "name": "nick"}),
false,
Expand All @@ -62,17 +62,17 @@ func TestTransformOne(t *testing.T) {
},
{
// we should be able to change the bson
"module.exports=function(doc) { doc['id']['$oid'] = '54a4420502a14b9641000001'; return doc }",
"module.exports=function(doc) { doc['data']['id']['$oid'] = '54a4420502a14b9641000001'; return doc }",
message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}),
message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID2, "name": "nick"}),
false,
},
{
// we should be able to change the bson
"module.exports=function(doc) { return doc['name'] }",
// this throws an error
"module.exports=function(doc) { return doc['data']['name'] }",
message.NewMsg(message.Insert, map[string]interface{}{"id": bsonID1, "name": "nick"}),
message.NewMsg(message.Insert, "nick"),
false,
true,
},
}
for _, v := range data {
Expand All @@ -88,7 +88,7 @@ func TestTransformOne(t *testing.T) {
t.Errorf("error expected %t but actually got %v", v.err, err)
continue
}
if !reflect.DeepEqual(msg.Data, v.out.Data) || err != nil {
if (!reflect.DeepEqual(msg.Data, v.out.Data) || err != nil) && !v.err {
t.Errorf("expected:\n(%T) %+v\ngot:\n(%T) %+v with error (%v)\n", v.out.Data, v.out.Data, msg.Data, msg.Data, err)
}
}
Expand Down