From d3d9fa2a7823758aae8584b1de672219b58c12d1 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Wed, 25 Oct 2017 15:51:07 +0100 Subject: [PATCH] use mapValues instead of map --- .../java/io/confluent/ksql/structured/SchemaKStream.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index 7dd74ab268fa..61424897889f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -108,17 +108,15 @@ public SchemaKStream filter(final Expression filterExpression) throws Exception } public SchemaKStream select(final Schema selectSchema) { - final KStream projectedKStream = - kstream.map((KeyValueMapper>) (key, row) -> { + kstream.mapValues(row -> { List newColumns = new ArrayList<>(); for (Field schemaField : selectSchema.fields()) { newColumns.add( row.getColumns().get(SchemaUtil.getFieldIndexByName(schema, schemaField.name()))); } - GenericRow newRow = new GenericRow(newColumns); - return new KeyValue<>(key, newRow); + return new GenericRow(newColumns); }); return new SchemaKStream(selectSchema, projectedKStream, keyField, Collections.singletonList(this),