Skip to content

Commit

Permalink
GEOMESA-3289 Fix ingest for schemas with > 479 attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Jul 27, 2023
1 parent 56e0cfa commit d3883a6
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,14 @@ package object kryo {
output.writeShort(count) // track the number of attributes
output.write(size) // size of each offset
val offset = output.position()
output.setPosition(offset + (size * (count + 1)) + (IntBitSet.size(count) * 4))
val position = offset + (size * (count + 1)) + (IntBitSet.size(count) * 4)
// setting the position to greater than the buffer size results in errors when trying to write some values later on
if (output.getBuffer.length < position) {
val buf = Array.ofDim[Byte](position * 2)
System.arraycopy(output.getBuffer, 0, buf, 0, offset)
output.setBuffer(buf, -1)
}
output.setPosition(position)
offset
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.specs2.runner.JUnitRunner
import java.nio.charset.StandardCharsets
import java.util
import java.util.{Collections, Date, UUID}
import scala.util.{Failure, Try}

@RunWith(classOf[JUnitRunner])
class KryoFeatureSerializerTest extends Specification with LazyLogging {
Expand Down Expand Up @@ -427,6 +428,24 @@ class KryoFeatureSerializerTest extends Specification with LazyLogging {
deserialized.getUserData.asScala must beEmpty
}

"correctly expand the buffer for large feature types" in {
val spec = "*geom:Point:srid=4326,dtg:Date," + Seq.tabulate(1000)(i => f"a$i%02d:Int").mkString(",")
val sft = SimpleFeatureTypes.createType("test", spec)
val sf = ScalaSimpleFeature.create(sft, "fid-0", "POINT(45.0 49.0)", "2013-01-02T00:00:00.000Z")
val serializer = KryoFeatureSerializer(sft, SerializationOptions.withoutId)
var serialized: Try[Array[Byte]] = Failure(new IllegalArgumentException("thread not invoked?"))
// we run the serialize in a new thread to avoid any buffer caching that can cause this test to not be reproducible
// buffers are cached in thread-locals, so a new thread should ensure a fresh buffer
val thread = new Thread(new Runnable() { override def run(): Unit = serialized = Try(serializer.serialize(sf)) })
thread.start()
thread.join()

serialized must beASuccessfulTry
val deserialized = serializer.deserialize(serialized.get)
deserialized.getAttributes mustEqual sf.getAttributes
deserialized.getUserData.asScala must beEmpty
}

"be backwards compatible" in {
val spec = "dtg:Date,*geom:Point:srid=4326"
val sft = SimpleFeatureTypes.createType("testType", spec)
Expand Down

0 comments on commit d3883a6

Please sign in to comment.