From b9e2900546f4fb923c541931f5d8b59e23f1de76 Mon Sep 17 00:00:00 2001 From: Emilio Lahr-Vivaz Date: Thu, 21 Sep 2023 13:57:30 +0000 Subject: [PATCH] GEOMESA-3301 Partitioned PostGIS - fix Avro export * Set correct sub-type hints in feature type user data --- .../dialect/PartitionedPostgisDialect.scala | 37 +++++++++++++++++++ .../dialect/PartitionedPostgisPsDialect.scala | 2 + .../PartitionedPostgisDataStoreTest.scala | 27 +++++++++++++- 3 files changed, 65 insertions(+), 1 deletion(-) diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala index cbcd52c596cd..da9594b380fc 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala @@ -123,6 +123,43 @@ class PartitionedPostgisDialect(store: JDBCDataStore) extends PostGISDialect(sto } } + override def postCreateAttribute( + att: AttributeDescriptor, + tableName: String, + schemaName: String, + cx: Connection): Unit = { + + def withCol(fn: ResultSet => Unit): Unit = { + val meta = cx.getMetaData + def escape(name: String): String = store.escapeNamePattern(meta, name) + WithClose(meta.getColumns(cx.getCatalog, escape(schemaName), escape(tableName), escape(att.getLocalName))) { cols => + if (cols.next()) { + fn(cols) + } else { + logger.warn(s"Could not retrieve column metadata for attribute ${att.getLocalName}") + } + } + } + + if (classOf[String].isAssignableFrom(att.getType.getBinding)) { + withCol { cols => + val typeName = cols.getString("TYPE_NAME") + if ("json".equalsIgnoreCase(typeName) || "jsonb".equalsIgnoreCase(typeName)) { + att.getUserData.put(SimpleFeatureTypes.AttributeOptions.OptJson, "true") + } + } + } else if (classOf[java.util.List[_]].isAssignableFrom(att.getType.getBinding)) { + withCol { cols => + val arrayType = super.getMapping(cols, cx) + if (arrayType.isArray) { + att.getUserData.put(SimpleFeatureTypes.AttributeConfigs.UserDataListType, arrayType.getComponentType.getName) + } else { + logger.warn(s"Found a list-type attribute but database type was not an array for ${att.getLocalName}") + } + } + } + } + override def postCreateFeatureType( sft: SimpleFeatureType, metadata: DatabaseMetaData, diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisPsDialect.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisPsDialect.scala index ccab21d6e761..7217ac189079 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisPsDialect.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisPsDialect.scala @@ -97,6 +97,8 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos override def encodeTableName(raw: String, sql: StringBuffer): Unit = delegate.encodeTableName(raw, sql) override def encodePostCreateTable(tableName: String, sql: StringBuffer): Unit = delegate.encodePostCreateTable(tableName, sql) + override def postCreateAttribute(att: AttributeDescriptor, tableName: String, schemaName: String, cx: Connection): Unit = + delegate.postCreateAttribute(att, tableName, schemaName, cx) override def postCreateFeatureType( featureType: SimpleFeatureType, metadata: DatabaseMetaData, diff --git a/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala b/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala index 6ac54c648be1..3d5fa6a504b8 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala @@ -20,7 +20,7 @@ import org.locationtech.geomesa.gt.partition.postgis.dialect.procedures.{DropAge import org.locationtech.geomesa.gt.partition.postgis.dialect.tables.{PartitionTablespacesTable, PrimaryKeyTable, SequenceTable, UserDataTable} import org.locationtech.geomesa.gt.partition.postgis.dialect.{PartitionedPostgisDialect, PartitionedPostgisPsDialect, TableConfig, TypeInfo} import org.locationtech.geomesa.utils.collection.SelfClosingIterator -import org.locationtech.geomesa.utils.geotools.{FeatureUtils, SimpleFeatureTypes} +import org.locationtech.geomesa.utils.geotools.{FeatureUtils, ObjectType, SimpleFeatureTypes} import org.locationtech.geomesa.utils.io.WithClose import org.locationtech.geomesa.utils.text.WKTUtils import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType} @@ -531,6 +531,31 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll } } + "set appropriate user data for list and json attributes" in { + val ds = DataStoreFinder.getDataStore(params.asJava) + ds must not(beNull) + + try { + ds must beAnInstanceOf[JDBCDataStore] + + val sft = SimpleFeatureTypes.renameSft(this.sft, "attrtest") + ds.getTypeNames.toSeq must not(contain(sft.getTypeName)) + ds.createSchema(sft) + + val schema = Try(ds.getSchema(sft.getTypeName)).getOrElse(null) + schema must not(beNull) + schema.getUserData.asScala must containAllOf(sft.getUserData.asScala.toSeq) + logger.debug(s"Schema: ${SimpleFeatureTypes.encodeType(schema)}") + + ObjectType.selectType(schema.getDescriptor("name")) mustEqual Seq(ObjectType.LIST, ObjectType.STRING) + ObjectType.selectType(schema.getDescriptor("props")) mustEqual Seq(ObjectType.STRING, ObjectType.JSON) + ObjectType.selectType(schema.getDescriptor("dtg")) mustEqual Seq(ObjectType.DATE) + ObjectType.selectType(schema.getDescriptor("geom")) mustEqual Seq(ObjectType.GEOMETRY, ObjectType.POINT) + } finally { + ds.dispose() + } + } + "support idle_in_transaction_session_timeout" in { val sft = SimpleFeatureTypes.renameSft(this.sft, "timeout")