From d3883a6bcec261956e9b2e88fa6838a2e9ffd358 Mon Sep 17 00:00:00 2001 From: Emilio Lahr-Vivaz Date: Thu, 27 Jul 2023 18:25:01 +0000 Subject: [PATCH] GEOMESA-3289 Fix ingest for schemas with > 479 attributes --- .../geomesa/features/kryo/package.scala | 9 ++++++++- .../kryo/KryoFeatureSerializerTest.scala | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/geomesa-features/geomesa-feature-kryo/src/main/scala/org/locationtech/geomesa/features/kryo/package.scala b/geomesa-features/geomesa-feature-kryo/src/main/scala/org/locationtech/geomesa/features/kryo/package.scala index ef5c9e4e0322..0c97203a415f 100644 --- a/geomesa-features/geomesa-feature-kryo/src/main/scala/org/locationtech/geomesa/features/kryo/package.scala +++ b/geomesa-features/geomesa-feature-kryo/src/main/scala/org/locationtech/geomesa/features/kryo/package.scala @@ -92,7 +92,14 @@ package object kryo { output.writeShort(count) // track the number of attributes output.write(size) // size of each offset val offset = output.position() - output.setPosition(offset + (size * (count + 1)) + (IntBitSet.size(count) * 4)) + val position = offset + (size * (count + 1)) + (IntBitSet.size(count) * 4) + // setting the position to greater than the buffer size results in errors when trying to write some values later on + if (output.getBuffer.length < position) { + val buf = Array.ofDim[Byte](position * 2) + System.arraycopy(output.getBuffer, 0, buf, 0, offset) + output.setBuffer(buf, -1) + } + output.setPosition(position) offset } } diff --git a/geomesa-features/geomesa-feature-kryo/src/test/scala/org/locationtech/geomesa/features/kryo/KryoFeatureSerializerTest.scala b/geomesa-features/geomesa-feature-kryo/src/test/scala/org/locationtech/geomesa/features/kryo/KryoFeatureSerializerTest.scala index d42e4441421e..40890d7e0ae2 100644 --- a/geomesa-features/geomesa-feature-kryo/src/test/scala/org/locationtech/geomesa/features/kryo/KryoFeatureSerializerTest.scala +++ b/geomesa-features/geomesa-feature-kryo/src/test/scala/org/locationtech/geomesa/features/kryo/KryoFeatureSerializerTest.scala @@ -24,6 +24,7 @@ import org.specs2.runner.JUnitRunner import java.nio.charset.StandardCharsets import java.util import java.util.{Collections, Date, UUID} +import scala.util.{Failure, Try} @RunWith(classOf[JUnitRunner]) class KryoFeatureSerializerTest extends Specification with LazyLogging { @@ -427,6 +428,24 @@ class KryoFeatureSerializerTest extends Specification with LazyLogging { deserialized.getUserData.asScala must beEmpty } + "correctly expand the buffer for large feature types" in { + val spec = "*geom:Point:srid=4326,dtg:Date," + Seq.tabulate(1000)(i => f"a$i%02d:Int").mkString(",") + val sft = SimpleFeatureTypes.createType("test", spec) + val sf = ScalaSimpleFeature.create(sft, "fid-0", "POINT(45.0 49.0)", "2013-01-02T00:00:00.000Z") + val serializer = KryoFeatureSerializer(sft, SerializationOptions.withoutId) + var serialized: Try[Array[Byte]] = Failure(new IllegalArgumentException("thread not invoked?")) + // we run the serialize in a new thread to avoid any buffer caching that can cause this test to not be reproducible + // buffers are cached in thread-locals, so a new thread should ensure a fresh buffer + val thread = new Thread(new Runnable() { override def run(): Unit = serialized = Try(serializer.serialize(sf)) }) + thread.start() + thread.join() + + serialized must beASuccessfulTry + val deserialized = serializer.deserialize(serialized.get) + deserialized.getAttributes mustEqual sf.getAttributes + deserialized.getUserData.asScala must beEmpty + } + "be backwards compatible" in { val spec = "dtg:Date,*geom:Point:srid=4326" val sft = SimpleFeatureTypes.createType("testType", spec)