Skip to content

Commit

Permalink
[DATA-928] Fetch tabular data in CLI via multiple requests (#2210)
Browse files Browse the repository at this point in the history
  • Loading branch information
agreenb authored Apr 12, 2023
1 parent de0b51c commit 10be1ad
Showing 1 changed file with 65 additions and 50 deletions.
115 changes: 65 additions & 50 deletions cli/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,69 +241,84 @@ func (c *AppClient) TabularData(dst string, filter *datapb.Filter) error {

var err error
var resp *datapb.TabularDataByFilterResponse
for count := 0; count < maxRetryCount; count++ {
resp, err = c.dataClient.TabularDataByFilter(context.Background(), &datapb.TabularDataByFilterRequest{
DataRequest: &datapb.DataRequest{
Filter: filter,
// TODO DATA-928: Get data in multiple requests. Each response can only return a few MBs of data.
},
CountOnly: false,
})
if err == nil {
break
}
}
if err != nil {
return err
}

mds := resp.GetMetadata()
for i, md := range mds {
mdJSONBytes, err := protojson.Marshal(md)
if err != nil {
return errors.Wrap(err, "error marshaling metadata")
}
//nolint:gosec
mdFile, err := os.Create(filepath.Join(dst, metadataDir, strconv.Itoa(i)+".json"))
if err != nil {
return errors.Wrapf(err, fmt.Sprintf("error creating metadata file for metadata index %d", i))
}
if _, err := mdFile.Write(mdJSONBytes); err != nil {
return errors.Wrapf(err, "error writing metadata file %s", mdFile.Name())
}
if err := mdFile.Close(); err != nil {
return errors.Wrapf(err, "error closing metadata file %s", mdFile.Name())
}
}

data := resp.GetData()
// TODO: [DATA-640] Support export in additional formats.
//nolint:gosec
dataFile, err := os.Create(filepath.Join(dst, dataDir, "data"+".ndjson"))
if err != nil {
return errors.Wrapf(err, "error creating data file")
}
w := bufio.NewWriter(dataFile)
for _, datum := range data {
// Write everything as json for now.
d := datum.GetData()
if d == nil {
continue

fmt.Fprintf(c.c.App.Writer, "Downloading..")
var last string
var metadataIdx int
for {
for count := 0; count < maxRetryCount; count++ {
resp, err = c.dataClient.TabularDataByFilter(context.Background(), &datapb.TabularDataByFilterRequest{
DataRequest: &datapb.DataRequest{
Filter: filter,
Limit: maxLimit,
Last: last,
},
CountOnly: false,
})
fmt.Fprintf(c.c.App.Writer, ".")
if err == nil {
break
}
}
m := d.AsMap()
m["TimeRequested"] = datum.GetTimeRequested()
m["TimeReceived"] = datum.GetTimeReceived()
m["MetadataIndex"] = datum.GetMetadataIndex()
j, err := json.Marshal(m)
if err != nil {
return errors.Wrap(err, "error marshaling json response")
return err
}
_, err = w.Write(append(j, []byte("\n")...))
if err != nil {
return errors.Wrapf(err, "error writing reading to file %s", dataFile.Name())

last = resp.GetLast()
mds := resp.GetMetadata()
if len(mds) == 0 {
break
}
for _, md := range mds {
mdJSONBytes, err := protojson.Marshal(md)
if err != nil {
return errors.Wrap(err, "error marshaling metadata")
}
//nolint:gosec
mdFile, err := os.Create(filepath.Join(dst, metadataDir, strconv.Itoa(metadataIdx)+".json"))
if err != nil {
return errors.Wrapf(err, fmt.Sprintf("error creating metadata file for metadata index %d", metadataIdx))
}
if _, err := mdFile.Write(mdJSONBytes); err != nil {
return errors.Wrapf(err, "error writing metadata file %s", mdFile.Name())
}
if err := mdFile.Close(); err != nil {
return errors.Wrapf(err, "error closing metadata file %s", mdFile.Name())
}

metadataIdx++
}

data := resp.GetData()
for _, datum := range data {
// Write everything as json for now.
d := datum.GetData()
if d == nil {
continue
}
m := d.AsMap()
m["TimeRequested"] = datum.GetTimeRequested()
m["TimeReceived"] = datum.GetTimeReceived()
m["MetadataIndex"] = datum.GetMetadataIndex()
j, err := json.Marshal(m)
if err != nil {
return errors.Wrap(err, "error marshaling json response")
}
_, err = w.Write(append(j, []byte("\n")...))
if err != nil {
return errors.Wrapf(err, "error writing reading to file %s", dataFile.Name())
}
}
}

fmt.Fprintf(c.c.App.Writer, "\n")
if err := w.Flush(); err != nil {
return errors.Wrapf(err, "error flushing writer for %s", dataFile.Name())
}
Expand Down

0 comments on commit 10be1ad

Please sign in to comment.