Skip to content

Commit

Permalink
fix chunk writer to store tags
Browse files Browse the repository at this point in the history
  • Loading branch information
oalam committed Oct 19, 2021
1 parent 70ba030 commit 1686d9b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 29 deletions.
9 changes: 0 additions & 9 deletions historian-spark/docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ docker tag hurence/historian-spark:latest hurence/historian-spark:1.3.8
Deploy the image to Docker hub
------------------------------

tag the image as latest

verify image build :

```shell script
docker images
docker tag <IMAGE_ID> latest
```

then login and push the latest image

```shell script
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import com.hurence.timeseries.model.Definitions._
import scala.collection.JavaConverters._

/**
* val options.config = Map(
* "zkhost" -> options.zkHosts,
* "collection" -> options.collectionName
* )
*
*/
* val options.config = Map(
* "zkhost" -> options.zkHosts,
* "collection" -> options.collectionName
* )
*
*/
class SolrChunksWriter extends Writer[Chunk] {


Expand All @@ -32,22 +32,15 @@ class SolrChunksWriter extends Writer[Chunk] {
else
options.config

var someTags : Boolean = true
val tagCols : List[Column] = if (options.config.contains(TAG_NAMES)) {
options.config(TAG_NAMES).split(",").toList
.map(tag => col(FIELD_TAGS)(tag).as(tag))
} else {
// No tags specified
someTags = false
List[Column]()
}
// build column names with tags
val mainCols = FIELDS.asScala.toList.map(name => col(name).as(getColumnFromField(name)))
val keysDF = ds.select(explode(map_keys(col(FIELD_TAGS)))).distinct()
val keys = keysDF.collect().map(f=>f.get(0))
val tagCols = keys.map(f=> col(FIELD_TAGS).getItem(f).as(f.toString)).toList

val mainCols = FIELDS.asScala.toList
.map(name => col(name).as(getColumnFromField(name)))

// todo manage dateFormatbucket and date interval
// write the dataset to SolR
ds
.select(mainCols ::: tagCols: _*)
.select(mainCols ::: tagCols:_*)
.withColumn(SOLR_COLUMN_VALUE, base64(col(SOLR_COLUMN_VALUE)))
.write
.format("solr")
Expand Down

0 comments on commit 1686d9b

Please sign in to comment.