Skip to content

Commit

Permalink
[fix] keep field order when writing struct type data (apache#169)
Browse files Browse the repository at this point in the history
Co-authored-by: gnehil <gnehil489@github>
  • Loading branch information
gnehil and gnehil authored Dec 8, 2023
1 parent 4849ee7 commit 3c723a5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.slf4j.LoggerFactory

import java.sql.Timestamp
import java.time.format.DateTimeFormatter
import java.time.{LocalDateTime, ZoneOffset}
import java.util.Locale
import scala.collection.JavaConversions._
import scala.collection.mutable

Expand Down Expand Up @@ -194,11 +190,11 @@ private[spark] object SchemaUtils {
}
case st: StructType =>
val structData = row.getStruct(ordinal, st.length)
val map = mutable.HashMap[String, Any]()
val map = new java.util.TreeMap[String, Any]()
var i = 0
while (i < structData.numFields) {
val field = st.get(i)
map += field.name -> rowColumnValue(structData, i, field.dataType)
map.put(field.name, rowColumnValue(structData, i, field.dataType))
i += 1
}
MAPPER.writeValueAsString(map)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,41 @@

package org.apache.doris.spark.sql

import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
import org.junit.{Assert, Ignore, Test}
import org.apache.spark.unsafe.types.UTF8String
import org.junit.{Assert, Test}

import java.sql.{Date, Timestamp}

@Ignore
class SchemaUtilsTest {

@Test
def rowColumnValueTest(): Unit = {

val spark = SparkSession.builder().master("local").getOrCreate()
val row = InternalRow(
1,
CatalystTypeConverters.convertToCatalyst(Date.valueOf("2023-09-08")),
CatalystTypeConverters.convertToCatalyst(Timestamp.valueOf("2023-09-08 17:00:00")),
ArrayData.toArrayData(Array(1, 2, 3)),
CatalystTypeConverters.convertToCatalyst(Map[String, String]("a" -> "1")),
InternalRow(UTF8String.fromString("a"), 1)
)

val rdd = spark.sparkContext.parallelize(Seq(
Row(1, Date.valueOf("2023-09-08"), Timestamp.valueOf("2023-09-08 17:00:00"), Array(1, 2, 3),
Map[String, String]("a" -> "1"), Row("a", 1))
))
val df = spark.createDataFrame(rdd, new StructType().add("c1", IntegerType)
val schema = new StructType().add("c1", IntegerType)
.add("c2", DateType)
.add("c3", TimestampType)
.add("c4", ArrayType.apply(IntegerType))
.add("c5", MapType.apply(StringType, StringType))
.add("c6", StructType.apply(Seq(StructField("a", StringType), StructField("b", IntegerType)))))

val schema = df.schema

df.queryExecution.toRdd.foreach(row => {

val fields = schema.fields
Assert.assertEquals(1, SchemaUtils.rowColumnValue(row, 0, fields(0).dataType))
Assert.assertEquals("2023-09-08", SchemaUtils.rowColumnValue(row, 1, fields(1).dataType))
Assert.assertEquals("2023-09-08 17:00:00.0", SchemaUtils.rowColumnValue(row, 2, fields(2).dataType))
Assert.assertEquals("[1,2,3]", SchemaUtils.rowColumnValue(row, 3, fields(3).dataType))
Assert.assertEquals("{\"a\":\"1\"}", SchemaUtils.rowColumnValue(row, 4, fields(4).dataType))
Assert.assertEquals("{\"a\":\"a\",\"b\":1}", SchemaUtils.rowColumnValue(row, 5, fields(5).dataType))

})
.add("c6", StructType.apply(Seq(StructField("a", StringType), StructField("b", IntegerType))))

Assert.assertEquals(1, SchemaUtils.rowColumnValue(row, 0, schema.fields(0).dataType))
Assert.assertEquals("2023-09-08", SchemaUtils.rowColumnValue(row, 1, schema.fields(1).dataType))
Assert.assertEquals("2023-09-08 17:00:00.0", SchemaUtils.rowColumnValue(row, 2, schema.fields(2).dataType))
Assert.assertEquals("[1,2,3]", SchemaUtils.rowColumnValue(row, 3, schema.fields(3).dataType))
Assert.assertEquals("{\"a\":\"1\"}", SchemaUtils.rowColumnValue(row, 4, schema.fields(4).dataType))
Assert.assertEquals("{\"a\":\"a\",\"b\":1}", SchemaUtils.rowColumnValue(row, 5, schema.fields(5).dataType))

}

Expand Down

0 comments on commit 3c723a5

Please sign in to comment.