Skip to content

Commit

Permalink
Support avro union types
Browse files Browse the repository at this point in the history
  • Loading branch information
cavemandaveman committed Jul 17, 2018
1 parent c9d6642 commit 14fbc41
Show file tree
Hide file tree
Showing 13 changed files with 59 additions and 32 deletions.
6 changes: 3 additions & 3 deletions nifi-encrypt-value-nar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
<parent>
<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-encrypt-value-bundle</artifactId>
<version>18.06.3</version>
<version>18.07.1</version>
</parent>

<artifactId>nifi-encrypt-value-nar</artifactId>
<version>18.06.3</version>
<version>18.07.1</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
Expand All @@ -34,7 +34,7 @@
<dependency>
<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-encrypt-value-processors</artifactId>
<version>18.06.3</version>
<version>18.07.1</version>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion nifi-encrypt-value-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-encrypt-value-bundle</artifactId>
<version>18.06.3</version>
<version>18.07.1</version>
</parent>

<artifactId>nifi-encrypt-value-processors</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.nineteen04labs.processors.util.Encryption;
import com.nineteen04labs.processors.util.FormatStream;

import org.apache.avro.Schema;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
Expand Down Expand Up @@ -108,8 +110,11 @@ public void process(InputStream in, OutputStream out) throws IOException {
JsonParser jsonParser;
JsonGenerator jsonGen = jsonFactory.createGenerator(baos);

if (flowFormat == "AVRO")
in = FormatStream.avroToJson(in, schemaString);
Schema schema = null;
if (flowFormat == "AVRO") {
schema = new Schema.Parser().parse(schemaString);
in = FormatStream.avroToJson(in, schema);
}

Reader r = new InputStreamReader(in);
BufferedReader br = new BufferedReader(r);
Expand All @@ -119,18 +124,43 @@ public void process(InputStream in, OutputStream out) throws IOException {
jsonParser = jsonFactory.createParser(line);
while (jsonParser.nextToken() != null) {
jsonGen.copyCurrentEvent(jsonParser);
if(fieldNames.contains(jsonParser.getCurrentName())) {
jsonParser.nextToken();
String hashedValue = Encryption.hashValue(jsonParser.getText(), algorithm);
jsonGen.writeString(hashedValue);
String tokenName = jsonParser.getCurrentName();
if(fieldNames.contains(tokenName)) {
String valueToHash = null;
String hashedValue = null;
//This is too fragile and ugly
if (schema != null) {
if (schema.getField(tokenName).schema().getType() == Schema.Type.UNION) {
if (jsonParser.currentToken() == JsonToken.START_OBJECT) {
while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
valueToHash = jsonParser.getText();
}
hashedValue = Encryption.hashValue(valueToHash, algorithm);
jsonGen.writeFieldName("string");
jsonGen.writeString(hashedValue);
jsonGen.writeEndObject();
}
} else {
jsonParser.nextToken();
valueToHash = jsonParser.getText();
hashedValue = Encryption.hashValue(valueToHash, algorithm);
jsonGen.writeString(hashedValue);
}
}
else {
jsonParser.nextToken();
valueToHash = jsonParser.getText();
hashedValue = Encryption.hashValue(valueToHash, algorithm);
jsonGen.writeString(hashedValue);
}
}
}
jsonGen.writeRaw("\n");
}
jsonGen.flush();

if (flowFormat == "AVRO")
baos = FormatStream.jsonToAvro(baos, schemaString);
baos = FormatStream.jsonToAvro(baos, schema);

baos.writeTo(out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class EncryptValueProperties {
.description("Specify the schema if the FlowFile format is Avro.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();

public static final PropertyDescriptor FIELD_NAMES = new PropertyDescriptor
Expand All @@ -46,6 +47,7 @@ public class EncryptValueProperties {
.description("Comma separated list of fields whose values to encrypt.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();

public static final PropertyDescriptor HASH_ALG = new PropertyDescriptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ public class FormatStream {

private static final Logger logger = LoggerFactory.getLogger(FormatStream.class);

public static InputStream avroToJson(InputStream in, String schemaString) throws IOException {
Schema schema = new Schema.Parser().parse(schemaString);

public static InputStream avroToJson(InputStream in, Schema schema) throws IOException {
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
DataFileStream<Object> streamReader = new DataFileStream<Object>(in, reader);
DatumWriter<Object> writer = new GenericDatumWriter<Object>(schema);
Expand All @@ -62,9 +60,7 @@ public static InputStream avroToJson(InputStream in, String schemaString) throws
return convertStream(baos);
}

public static ByteArrayOutputStream jsonToAvro(ByteArrayOutputStream jsonStream, String schemaString) throws IOException {
Schema schema = new Schema.Parser().parse(schemaString);

public static ByteArrayOutputStream jsonToAvro(ByteArrayOutputStream jsonStream, Schema schema) throws IOException {
InputStream input = convertStream(jsonStream);
ByteArrayOutputStream baos = new ByteArrayOutputStream();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ public void setSchema() throws IOException {

@Test
public void testSHA512() throws IOException {
Path sha512File = Paths.get("src/test/resources/sha512.avro");
testEncryption("SHA-512", sha512File);
testEncryption("SHA-512");
}

@Test
public void testNoEncryption() throws IOException {
runner.setProperty(EncryptValueProperties.FLOW_FORMAT, "AVRO");
runner.setProperty(EncryptValueProperties.AVRO_SCHEMA, avroSchema);
runner.setProperty(EncryptValueProperties.HASH_ALG, "SHA-512");
runner.setValidateExpressionUsage(false);

runner.enqueue(unencryptedFile);

Expand All @@ -62,21 +62,18 @@ public void testNoEncryption() throws IOException {
outFile.assertContentEquals(unencryptedFile);
}

private void testEncryption(final String hashAlgorithm, final Path encryptedFile) throws IOException {
runner.setProperty(EncryptValueProperties.FIELD_NAMES, "card_number,last_name");
private void testEncryption(final String hashAlgorithm) throws IOException {
runner.setProperty(EncryptValueProperties.FIELD_NAMES, "first_name,last_name,card_number");
runner.setProperty(EncryptValueProperties.FLOW_FORMAT, "AVRO");
runner.setProperty(EncryptValueProperties.AVRO_SCHEMA, avroSchema);
runner.setProperty(EncryptValueProperties.HASH_ALG, hashAlgorithm);
runner.setValidateExpressionUsage(false);

runner.enqueue(unencryptedFile);

runner.run();
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(EncryptValueRelationships.REL_SUCCESS, 1);

//final MockFlowFile outFile = runner.getFlowFilesForRelationship(EncryptValueRelationships.REL_SUCCESS).get(0);

//outFile.assertContentEquals(encryptedFile);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void testSHA512() throws IOException {
public void testNoEncryption() throws IOException {
runner.setProperty(EncryptValueProperties.FLOW_FORMAT, "JSON");
runner.setProperty(EncryptValueProperties.HASH_ALG, "SHA-512");
runner.setValidateExpressionUsage(false);

runner.enqueue(unencryptedFile);

Expand All @@ -53,9 +54,10 @@ public void testNoEncryption() throws IOException {
}

private void testEncryption(final String hashAlgorithm, final Path encryptedFile) throws IOException {
runner.setProperty(EncryptValueProperties.FIELD_NAMES, "card_number,last_name");
runner.setProperty(EncryptValueProperties.FIELD_NAMES, "first_name,last_name,card_number");
runner.setProperty(EncryptValueProperties.FLOW_FORMAT, "JSON");
runner.setProperty(EncryptValueProperties.HASH_ALG, hashAlgorithm);
runner.setValidateExpressionUsage(false);

runner.enqueue(unencryptedFile);

Expand Down
Binary file not shown.
4 changes: 2 additions & 2 deletions nifi-encrypt-value-processors/src/test/resources/sha512.json
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"status":"active","location":{"state":"CA","country":"US"},"first_name":"Catherine ","last_name":"20ff29ae253bf483fb5f3d95e3aaea1cb4d62c95972827838a1b4debfd05470c138e8e9b1b1eca74eb46da6f4d578279a88b452bb2f6b2e8ae2e473e37a054e1","title":"Ms","created_at":"2015-09-03T01:23:20.605354","updated_at":"2016-09-18T20:05:20.747376","entity":"bed56c6310c6497b8c456b9244c2a427","position":"Chair, BILETA ","id":"ffabd37094c24626a6901a03799c35d2","card_number":"4572b7b391c982302250b0c9312bcd4304d42d4214a968a749666f28b9db16abb9536a89470a2f4399d31c17d9ed6237c59a3cb9503883e95cb15d35b53c4919"}
{"status":"active","location":{"state":"MO","country":"US"},"first_name":"James ","last_name":"2f41f4845f1be07652c5888a45b327b5c0b9ef324f7e9cf840721161af425afdd2cd574f7ef9d9877ef43bcd076b2640135d40d49e26b8134e43e822ff070680","title":"Mr","created_at":"2016-03-11T11:11:11.986462","updated_at":"2017-12-10T10:10:10.9047382","entity":"klo36c6310c6497b8c456b9244c0pl4n","position":"Chair, BILETA ","id":"ud96d37094c24626a6901a03798jnm5g","card_number":"3569541bdd41ea2880590b0b38a6439f3e2c674dd2f27bfb81673dc920408af13b43bf4cca8803d8b91652e7a6944896cf0906818e1c9e97d455643ba9871ee9"}
{"status":"active","location":{"state":"CA","country":"US"},"first_name":"019542970f4628243c4353bc2cdda0e17c42acc8a532d1ac0bfb5fdbe2afe143434f6d03a3e0586dea72fc78dc6c9607d05250fc8906f7428e756cf9020bd84b","last_name":"20ff29ae253bf483fb5f3d95e3aaea1cb4d62c95972827838a1b4debfd05470c138e8e9b1b1eca74eb46da6f4d578279a88b452bb2f6b2e8ae2e473e37a054e1","title":"Ms","created_at":"2015-09-03T01:23:20.605354","updated_at":"2016-09-18T20:05:20.747376","entity":"bed56c6310c6497b8c456b9244c2a427","position":"Chair, BILETA","id":"ffabd37094c24626a6901a03799c35d2","card_number":"4572b7b391c982302250b0c9312bcd4304d42d4214a968a749666f28b9db16abb9536a89470a2f4399d31c17d9ed6237c59a3cb9503883e95cb15d35b53c4919"}
{"status":"active","location":{"state":"MO","country":"US"},"first_name":"123c86e1f2ac255ba31f1ad742defe23d194269669d2aac0d2572e20e9378e395976f84db305caeba1f91e7996463031d4c49365a7a9f4c7dc404873ad330974","last_name":"2f41f4845f1be07652c5888a45b327b5c0b9ef324f7e9cf840721161af425afdd2cd574f7ef9d9877ef43bcd076b2640135d40d49e26b8134e43e822ff070680","title":"Mr","created_at":"2016-03-11T11:11:11.986462","updated_at":"2017-12-10T10:10:10.9047382","entity":"klo36c6310c6497b8c456b9244c0pl4n","position":"Chair, BILETA","id":"ud96d37094c24626a6901a03798jnm5g","card_number":"3569541bdd41ea2880590b0b38a6439f3e2c674dd2f27bfb81673dc920408af13b43bf4cca8803d8b91652e7a6944896cf0906818e1c9e97d455643ba9871ee9"}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
}
}, {
"name" : "first_name",
"type" : "string"
"type" : ["null","string"]
}, {
"name" : "last_name",
"type" : "string"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"status":"active","location":{"state":"CA","country":"US"},"first_name":"Catherine ","last_name":"Easton","title":"Ms","created_at":"2015-09-03T01:23:20.605354","updated_at":"2016-09-18T20:05:20.747376","entity":"bed56c6310c6497b8c456b9244c2a427","position":"Chair, BILETA ","id":"ffabd37094c24626a6901a03799c35d2","card_number":"1234567891011123"}
{"status":"active","location":{"state":"MO","country":"US"},"first_name":"James ","last_name":"Smith","title":"Mr","created_at":"2016-03-11T11:11:11.986462","updated_at":"2017-12-10T10:10:10.9047382","entity":"klo36c6310c6497b8c456b9244c0pl4n","position":"Chair, BILETA ","id":"ud96d37094c24626a6901a03798jnm5g","card_number":"9876543210987654"}
{"status":"active","location":{"state":"CA","country":"US"},"first_name":"Catherine","last_name":"Easton","title":"Ms","created_at":"2015-09-03T01:23:20.605354","updated_at":"2016-09-18T20:05:20.747376","entity":"bed56c6310c6497b8c456b9244c2a427","position":"Chair, BILETA","id":"ffabd37094c24626a6901a03799c35d2","card_number":"1234567891011123"}
{"status":"active","location":{"state":"MO","country":"US"},"first_name":"James","last_name":"Smith","title":"Mr","created_at":"2016-03-11T11:11:11.986462","updated_at":"2017-12-10T10:10:10.9047382","entity":"klo36c6310c6497b8c456b9244c0pl4n","position":"Chair, BILETA","id":"ud96d37094c24626a6901a03798jnm5g","card_number":"9876543210987654"}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-encrypt-value-bundle</artifactId>
<version>18.06.3</version>
<version>18.07.1</version>
<packaging>pom</packaging>

<modules>
Expand Down

0 comments on commit 14fbc41

Please sign in to comment.