Skip to content

Commit

Permalink
GEOMESA-3301 Partitioned PostGIS - fix Avro export
Browse files Browse the repository at this point in the history
* Set correct sub-type hints in feature type user data
  • Loading branch information
elahrvivaz committed Sep 25, 2023
1 parent f1a535f commit b9e2900
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,43 @@ class PartitionedPostgisDialect(store: JDBCDataStore) extends PostGISDialect(sto
}
}

override def postCreateAttribute(
att: AttributeDescriptor,
tableName: String,
schemaName: String,
cx: Connection): Unit = {

def withCol(fn: ResultSet => Unit): Unit = {
val meta = cx.getMetaData
def escape(name: String): String = store.escapeNamePattern(meta, name)
WithClose(meta.getColumns(cx.getCatalog, escape(schemaName), escape(tableName), escape(att.getLocalName))) { cols =>
if (cols.next()) {
fn(cols)
} else {
logger.warn(s"Could not retrieve column metadata for attribute ${att.getLocalName}")
}
}
}

if (classOf[String].isAssignableFrom(att.getType.getBinding)) {
withCol { cols =>
val typeName = cols.getString("TYPE_NAME")
if ("json".equalsIgnoreCase(typeName) || "jsonb".equalsIgnoreCase(typeName)) {
att.getUserData.put(SimpleFeatureTypes.AttributeOptions.OptJson, "true")
}
}
} else if (classOf[java.util.List[_]].isAssignableFrom(att.getType.getBinding)) {
withCol { cols =>
val arrayType = super.getMapping(cols, cx)
if (arrayType.isArray) {
att.getUserData.put(SimpleFeatureTypes.AttributeConfigs.UserDataListType, arrayType.getComponentType.getName)
} else {
logger.warn(s"Found a list-type attribute but database type was not an array for ${att.getLocalName}")
}
}
}
}

override def postCreateFeatureType(
sft: SimpleFeatureType,
metadata: DatabaseMetaData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos
override def encodeTableName(raw: String, sql: StringBuffer): Unit = delegate.encodeTableName(raw, sql)
override def encodePostCreateTable(tableName: String, sql: StringBuffer): Unit =
delegate.encodePostCreateTable(tableName, sql)
override def postCreateAttribute(att: AttributeDescriptor, tableName: String, schemaName: String, cx: Connection): Unit =
delegate.postCreateAttribute(att, tableName, schemaName, cx)
override def postCreateFeatureType(
featureType: SimpleFeatureType,
metadata: DatabaseMetaData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.locationtech.geomesa.gt.partition.postgis.dialect.procedures.{DropAge
import org.locationtech.geomesa.gt.partition.postgis.dialect.tables.{PartitionTablespacesTable, PrimaryKeyTable, SequenceTable, UserDataTable}
import org.locationtech.geomesa.gt.partition.postgis.dialect.{PartitionedPostgisDialect, PartitionedPostgisPsDialect, TableConfig, TypeInfo}
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.geotools.{FeatureUtils, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.geotools.{FeatureUtils, ObjectType, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.io.WithClose
import org.locationtech.geomesa.utils.text.WKTUtils
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
Expand Down Expand Up @@ -531,6 +531,31 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll
}
}

"set appropriate user data for list and json attributes" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)

try {
ds must beAnInstanceOf[JDBCDataStore]

val sft = SimpleFeatureTypes.renameSft(this.sft, "attrtest")
ds.getTypeNames.toSeq must not(contain(sft.getTypeName))
ds.createSchema(sft)

val schema = Try(ds.getSchema(sft.getTypeName)).getOrElse(null)
schema must not(beNull)
schema.getUserData.asScala must containAllOf(sft.getUserData.asScala.toSeq)
logger.debug(s"Schema: ${SimpleFeatureTypes.encodeType(schema)}")

ObjectType.selectType(schema.getDescriptor("name")) mustEqual Seq(ObjectType.LIST, ObjectType.STRING)
ObjectType.selectType(schema.getDescriptor("props")) mustEqual Seq(ObjectType.STRING, ObjectType.JSON)
ObjectType.selectType(schema.getDescriptor("dtg")) mustEqual Seq(ObjectType.DATE)
ObjectType.selectType(schema.getDescriptor("geom")) mustEqual Seq(ObjectType.GEOMETRY, ObjectType.POINT)
} finally {
ds.dispose()
}
}

"support idle_in_transaction_session_timeout" in {
val sft = SimpleFeatureTypes.renameSft(this.sft, "timeout")

Expand Down

0 comments on commit b9e2900

Please sign in to comment.