diff --git a/README.md b/README.md index 1d94ec9..53b8217 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,7 @@ situations. - Golang 1.14 ## Recent Major Feature Additions/Changes -- Added `Transform.CurrentRawRecord()` for caller of omniparser to access the raw ingested record. +- Added `Transform.RawRecord()` for caller of omniparser to access the raw ingested record. - Deprecated `custom_parse` in favor of `custom_func` (`custom_parse` is still usable for back-compatibility, it is just removed from all public docs and samples). - Added `NonValidatingReader` EDI segment reader. diff --git a/doc/gettingstarted.md b/doc/gettingstarted.md index aab50b8..39c1f8b 100644 --- a/doc/gettingstarted.md +++ b/doc/gettingstarted.md @@ -715,7 +715,10 @@ for { break } if err != nil { ... } - // output contains a []byte of the ingested and transformed record. + // output contains a []byte of the ingested and transformed record. + + // Also transform.RawRecord() gives you access to the raw record. + fmt.Println(transform.RawRecord().Checksum()) } ``` diff --git a/doc/programmability.md b/doc/programmability.md index f170b24..29d4d4f 100644 --- a/doc/programmability.md +++ b/doc/programmability.md @@ -35,11 +35,9 @@ for { } if err != nil { ... } // output contains a []byte of the ingested and transformed record. - - raw, err := transform.CurrentRawRecord() - if err != nil { ... } - rawRecord := raw.(*omniv21.RawRecord) // assuming the schema is of `omni.2.1` version. - fmt.Println(rawRecord.UUIDv3()) // rawRecord.UUIDv3() returns a stable hash of the current raw record. + + // Also transform.RawRecord() gives you access to the raw record. + fmt.Println(transform.RawRecord().Checksum()) } ``` Note this out-of-box omniparser setup contains only the `omni.2.1` schema handler, meaning only schemas diff --git a/extensions/omniv21/ingester.go b/extensions/omniv21/ingester.go index 3b13c22..0e0f9c6 100644 --- a/extensions/omniv21/ingester.go +++ b/extensions/omniv21/ingester.go @@ -9,19 +9,21 @@ import ( "github.com/jf-tech/omniparser/extensions/omniv21/fileformat" "github.com/jf-tech/omniparser/extensions/omniv21/transform" "github.com/jf-tech/omniparser/idr" + "github.com/jf-tech/omniparser/schemahandler" "github.com/jf-tech/omniparser/transformctx" ) -// RawRecord contains the raw data ingested in from the input stream in the form of an IDR tree. -// Note callers outside this package should absolutely make **NO** modifications to the content of -// RawRecord. Treat it like read-only. -type RawRecord struct { - Node *idr.Node +type rawRecord struct { + node *idr.Node } -// UUIDv3 returns a stable MD5(v3) hash of the RawRecord. -func (rr *RawRecord) UUIDv3() string { - hash, _ := customfuncs.UUIDv3(nil, idr.JSONify2(rr.Node)) +func (rr *rawRecord) Raw() interface{} { + return rr.node +} + +// Checksum returns a stable MD5(v3) hash of the rawRecord. +func (rr *rawRecord) Checksum() string { + hash, _ := customfuncs.UUIDv3(nil, idr.JSONify2(rr.node)) return hash } @@ -31,19 +33,19 @@ type ingester struct { customParseFuncs transform.CustomParseFuncs // Deprecated. ctx *transformctx.Ctx reader fileformat.FormatReader - rawRecord RawRecord + rawRecord rawRecord } // Read ingests a raw record from the input stream, transforms it according the given schema and return // the raw record, transformed JSON bytes. -func (g *ingester) Read() (interface{}, []byte, error) { - if g.rawRecord.Node != nil { - g.reader.Release(g.rawRecord.Node) - g.rawRecord.Node = nil +func (g *ingester) Read() (schemahandler.RawRecord, []byte, error) { + if g.rawRecord.node != nil { + g.reader.Release(g.rawRecord.node) + g.rawRecord.node = nil } n, err := g.reader.Read() if n != nil { - g.rawRecord.Node = n + g.rawRecord.node = n } if err != nil { // Read() supposed to have already done CtxAwareErr error wrapping. So directly return. diff --git a/extensions/omniv21/ingester_test.go b/extensions/omniv21/ingester_test.go index ccf365d..273e23b 100644 --- a/extensions/omniv21/ingester_test.go +++ b/extensions/omniv21/ingester_test.go @@ -91,7 +91,8 @@ func TestIngester_Read_Success(t *testing.T) { } raw, b, err := g.Read() assert.NoError(t, err) - assert.Equal(t, "41665284-dab9-300d-b647-7ace9cb514b4", raw.(*RawRecord).UUIDv3()) + assert.Equal(t, "41665284-dab9-300d-b647-7ace9cb514b4", raw.Checksum()) + assert.Equal(t, "{}", idr.JSONify2(raw.Raw().(*idr.Node))) assert.Equal(t, "123", string(b)) assert.Equal(t, 0, g.reader.(*testReader).releaseCalled) raw, b, err = g.Read() diff --git a/extensions/omniv21/samples/testCommon.go b/extensions/omniv21/samples/testCommon.go index f8abfe2..139de73 100644 --- a/extensions/omniv21/samples/testCommon.go +++ b/extensions/omniv21/samples/testCommon.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/jf-tech/omniparser" - "github.com/jf-tech/omniparser/extensions/omniv21" "github.com/jf-tech/omniparser/idr" "github.com/jf-tech/omniparser/transformctx" ) @@ -49,12 +48,11 @@ func SampleTestCommon(t *testing.T, schemaFile, inputFile string) string { err = json.Unmarshal(recordBytes, &transformed) assert.NoError(t, err) - raw, err := transform.CurrentRawRecord() + raw, err := transform.RawRecord() assert.NoError(t, err) - rawRecord := raw.(*omniv21.RawRecord) records = append(records, record{ - RawRecord: idr.JSONify2(rawRecord.Node), - RawRecordHash: rawRecord.UUIDv3(), + RawRecord: idr.JSONify2(raw.Raw().(*idr.Node)), + RawRecordHash: raw.Checksum(), TransformedRecord: transformed, }) } diff --git a/schemahandler/schemaHandler.go b/schemahandler/schemaHandler.go index 897b800..bdb43de 100644 --- a/schemahandler/schemaHandler.go +++ b/schemahandler/schemaHandler.go @@ -37,6 +37,14 @@ type SchemaHandler interface { NewIngester(ctx *transformctx.Ctx, input io.Reader) (Ingester, error) } +// RawRecord represents a raw record ingested from the input. +type RawRecord interface { + // Raw returns the actual raw record that is version specific to each of the schema handler. + Raw() interface{} + // Checksum returns a UUIDv3 (MD5) stable hash of the raw record. + Checksum() string +} + // Ingester is an interface of ingestion and transformation for a given input stream. type Ingester interface { // Read is called repeatedly during the processing of an input stream. Each call it should return @@ -46,7 +54,7 @@ type Ingester interface { // one record at a time, OR, processes and returns one record for each call. However, the overall // design principle of omniparser is to have streaming processing capability so memory won't be a // constraint when dealing with large input file. All built-in ingesters are implemented this way. - Read() (interface{}, []byte, error) + Read() (RawRecord, []byte, error) // IsContinuableError is called to determine if the error returned by Read is fatal or not. After Read // is called, the result record or error will be returned to caller. After caller consumes record or diff --git a/transform.go b/transform.go index 96c5443..f5c32b4 100644 --- a/transform.go +++ b/transform.go @@ -21,16 +21,14 @@ type Transform interface { // return the same error. // Note if returned error isn't nil, then returned []byte will be nil. Read() ([]byte, error) - // CurrentRawRecord returns the current raw record ingested from the input stream. If - // the last Read call failed, or Read hasn't been called yet, it will return an error. - // Each schema handler and extension has its own definition of what a raw record is - // so please check their corresponding doc. - CurrentRawRecord() (interface{}, error) + // RawRecord returns the current raw record ingested from the input stream. If the last + // Read call failed, or Read hasn't been called yet, it will return an error. + RawRecord() (schemahandler.RawRecord, error) } type transform struct { ingester schemahandler.Ingester - lastRawRecord interface{} + lastRawRecord schemahandler.RawRecord lastErr error } @@ -70,9 +68,9 @@ func (o *transform) Read() ([]byte, error) { return transformed, err } -// CurrentRawRecord returns the current raw record ingested from the input stream. If -// the last Read call failed, or Read hasn't been called yet, it will return an error. -func (o *transform) CurrentRawRecord() (interface{}, error) { +// RawRecord returns the current raw record ingested from the input stream. If the last +// Read call failed, or Read hasn't been called yet, it will return an error. +func (o *transform) RawRecord() (schemahandler.RawRecord, error) { if o.lastErr != nil { return nil, o.lastErr } diff --git a/transform_test.go b/transform_test.go index e47ca62..482beab 100644 --- a/transform_test.go +++ b/transform_test.go @@ -9,26 +9,41 @@ import ( "github.com/stretchr/testify/assert" "github.com/jf-tech/omniparser/errs" + "github.com/jf-tech/omniparser/schemahandler" ) type testReadCall struct { - record []byte + result []byte err error } +func (trc testReadCall) Checksum() string { + if trc.err != nil { + panic("Checksum() called when err != nil") + } + return fmt.Sprintf("checksum of raw record of '%s'", string(trc.result)) +} + +func (trc testReadCall) Raw() interface{} { + if trc.err != nil { + panic("Raw() called when err != nil") + } + return fmt.Sprintf("raw record of '%s'", string(trc.result)) +} + type testIngester struct { readCalled int readCalls []testReadCall continuableErrs map[error]bool } -func (g *testIngester) Read() (interface{}, []byte, error) { +func (g *testIngester) Read() (schemahandler.RawRecord, []byte, error) { if g.readCalled >= len(g.readCalls) { panic(fmt.Sprintf("Read() called %d time(s), but not enough mock entries setup", g.readCalled)) } r := g.readCalls[g.readCalled] g.readCalled++ - return fmt.Sprintf("raw record %d", g.readCalled-1), r.record, r.err + return r, r.result, r.err } func (g *testIngester) IsContinuableError(err error) bool { @@ -45,9 +60,9 @@ func TestTransform_Read_EndWithEOF(t *testing.T) { tfm := &transform{ ingester: &testIngester{ readCalls: []testReadCall{ - {record: []byte("1st good read")}, + {result: []byte("1st good read")}, {err: continuableErr1}, - {record: []byte("2nd good read")}, + {result: []byte("2nd good read")}, {err: io.EOF}, }, continuableErrs: map[error]bool{continuableErr1: true}, @@ -56,16 +71,17 @@ func TestTransform_Read_EndWithEOF(t *testing.T) { record, err := tfm.Read() assert.NoError(t, err) assert.Equal(t, "1st good read", string(record)) - raw, err := tfm.CurrentRawRecord() + raw, err := tfm.RawRecord() assert.NoError(t, err) - assert.Equal(t, "raw record 0", raw.(string)) + assert.Equal(t, "raw record of '1st good read'", raw.Raw()) + assert.Equal(t, "checksum of raw record of '1st good read'", raw.Checksum()) record, err = tfm.Read() assert.Error(t, err) assert.True(t, errs.IsErrTransformFailed(err)) assert.Equal(t, continuableErr1.Error(), err.Error()) assert.Nil(t, record) - raw, err = tfm.CurrentRawRecord() + raw, err = tfm.RawRecord() assert.Error(t, err) assert.True(t, errs.IsErrTransformFailed(err)) assert.Nil(t, raw) @@ -73,15 +89,16 @@ func TestTransform_Read_EndWithEOF(t *testing.T) { record, err = tfm.Read() assert.NoError(t, err) assert.Equal(t, "2nd good read", string(record)) - raw, err = tfm.CurrentRawRecord() + raw, err = tfm.RawRecord() assert.NoError(t, err) - assert.Equal(t, "raw record 2", raw.(string)) + assert.Equal(t, "raw record of '2nd good read'", raw.Raw()) + assert.Equal(t, "checksum of raw record of '2nd good read'", raw.Checksum()) record, err = tfm.Read() assert.Error(t, err) assert.Equal(t, io.EOF, err) assert.Nil(t, record) - raw, err = tfm.CurrentRawRecord() + raw, err = tfm.RawRecord() assert.Error(t, err) assert.Equal(t, io.EOF, err) assert.Nil(t, raw) @@ -91,7 +108,7 @@ func TestTransform_Read_EndWithEOF(t *testing.T) { assert.Error(t, err) assert.Equal(t, io.EOF, err) assert.Nil(t, record) - raw, err = tfm.CurrentRawRecord() + raw, err = tfm.RawRecord() assert.Error(t, err) assert.Equal(t, io.EOF, err) assert.Nil(t, raw) @@ -101,7 +118,7 @@ func TestTransform_Read_EndWithNonContinuableError(t *testing.T) { tfm := &transform{ ingester: &testIngester{ readCalls: []testReadCall{ - {record: []byte("1st good read")}, + {result: []byte("1st good read")}, {err: errors.New("fatal error")}, }, }, @@ -109,16 +126,17 @@ func TestTransform_Read_EndWithNonContinuableError(t *testing.T) { record, err := tfm.Read() assert.NoError(t, err) assert.Equal(t, "1st good read", string(record)) - raw, err := tfm.CurrentRawRecord() + raw, err := tfm.RawRecord() assert.NoError(t, err) - assert.Equal(t, "raw record 0", raw.(string)) + assert.Equal(t, "raw record of '1st good read'", raw.Raw()) + assert.Equal(t, "checksum of raw record of '1st good read'", raw.Checksum()) record, err = tfm.Read() assert.Error(t, err) assert.False(t, errs.IsErrTransformFailed(err)) assert.Equal(t, "fatal error", err.Error()) assert.Nil(t, record) - raw, err = tfm.CurrentRawRecord() + raw, err = tfm.RawRecord() assert.Error(t, err) assert.False(t, errs.IsErrTransformFailed(err)) assert.Equal(t, "fatal error", err.Error()) @@ -129,15 +147,15 @@ func TestTransform_Read_EndWithNonContinuableError(t *testing.T) { assert.Error(t, err) assert.Equal(t, "fatal error", err.Error()) assert.Nil(t, record) - raw, err = tfm.CurrentRawRecord() + raw, err = tfm.RawRecord() assert.Error(t, err) assert.Equal(t, "fatal error", err.Error()) assert.Nil(t, raw) } -func TestTransform_CurrentRawRecord_CalledBeforeRead(t *testing.T) { +func TestTransform_RawRecord_CalledBeforeRead(t *testing.T) { tfm := &transform{ingester: &testIngester{readCalls: []testReadCall{}}} - raw, err := tfm.CurrentRawRecord() + raw, err := tfm.RawRecord() assert.Error(t, err) assert.Equal(t, "must call Read first", err.Error()) assert.Nil(t, raw)