Skip to content
This repository has been archived by the owner on Apr 22, 2022. It is now read-only.

add new sample, avro data convert to json as a new topic #5

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions divolte-kafka-streams/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Log file
*.log

# BlueJ files
*.ctxt

# Mobile Tools for Java (J2ME)
.mtj.tmp/

# Package Files #
*.jar
*.war
*.ear
*.zip
*.tar.gz
*.rar

target/

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

#IDE
idea/
*.iml
11 changes: 11 additions & 0 deletions divolte-kafka-streams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
### Avro Data convert to Json as a new topic

```
mvn clean install
mvn clean package
```

#### Run Consumer
```
java -jar target/divolte-kafka-streams-1.0-SNAPSHOT-jar-with-dependencies.jar
```
133 changes: 133 additions & 0 deletions divolte-kafka-streams/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.divolte.kafka.streams</groupId>
<artifactId>divolte-kafka-streams</artifactId>
<version>1.0-SNAPSHOT</version>


<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>

<properties>
<java.version>1.8</java.version>
<kafka.version>0.11.0.0-cp1</kafka.version>
<kafka.scala.version>2.11</kafka.scala.version>
<scala.version>${kafka.scala.version}.8</scala.version>
<confluent.version>3.3.0</confluent.version>
<scalatest.version>2.2.6</scalatest.version>
<algebird.version>0.13.0</algebird.version>
<avro.version>1.8.2</avro.version>
<chill.version>0.9.2</chill.version>
<jetty.version>9.2.12.v20150709</jetty.version>
<jackson.version>2.8.8</jackson.version>
<jersey.version>2.25</jersey.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>bijection-avro_2.10</artifactId>
<version>0.9.2</version>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>${confluent.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>

<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<!--force java 8-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>


<!--package as one fat jar-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.2</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>com.divolte.kafka.streams.StreamsDivolteApp</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.divolte.kafka.streams;

import java.io.IOException;
import java.io.EOFException;
import java.io.InputStream;
import java.io.*;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericData;
import org.apache.avro.io.*;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.Schema;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

public class AvroUtil {
public static String transform(byte[] value) {
String returnVal = "";
try {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(new File("src/main/resources/MyEventRecord.avsc"));
GenericRecord avroRecord = new GenericData.Record(schema);
returnVal = avroRecord.toString();

Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
GenericRecord record = recordInjection.invert(value).get();

returnVal = record.toString();
} catch (Exception e) {
String ex = e.toString();
}

return returnVal;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.divolte.kafka.streams;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;

public class StreamsDivolteApp {

public static void main(String[] args) {

Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-divolte-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.102:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

KStreamBuilder builder = new KStreamBuilder();
final Serde<String> stringSerde = Serdes.String();
final Serde<byte[]> byteArraySerde = Serdes.ByteArray();

builder.stream(stringSerde, byteArraySerde, "divolte-data")
.mapValues(value -> AvroUtil.transform(value))
.to("divolte-json");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.cleanUp(); // only do this in dev - not in prod
streams.start();

// print the topology
System.out.println(streams.toString());

// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
41 changes: 41 additions & 0 deletions divolte-kafka-streams/src/main/resources/MyEventRecord.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"namespace": "io.divolte.record",
"type": "record",
"name": "MyEventRecord",
"fields": [
{ "name": "userId", "type": ["null", "int"], "default": null },
{ "name": "q", "type": ["null", "string"], "default": null },
{ "name": "page", "type": ["null", "string"], "default": null },
{ "name": "n", "type": ["null", "int"], "default": null },
{ "name": "cookieCustom", "type": ["null", "string"], "default": null },
{ "name": "detectedDuplicate", "type": ["null", "boolean"], "default": null },
{ "name": "detectedCorruption", "type": ["null", "boolean"], "default": null },
{ "name": "firstInSession", "type": ["null", "boolean"], "default": null },
{ "name": "timestamp", "type": ["null", "long"], "default": null },
{ "name": "clientTimestamp", "type": ["null", "long"], "default": null },
{ "name": "remoteHost", "type": "string", "default": null },
{ "name": "referer", "type": ["null", "string"], "default": null },
{ "name": "location", "type": ["null", "string"], "default": null },
{ "name": "viewportPixelWidth", "type": ["null", "int"], "default": null },
{ "name": "viewportPixelHeight", "type": ["null", "int"], "default": null },
{ "name": "screenPixelWidth", "type": ["null", "int"], "default": null },
{ "name": "screenPixelHeight", "type": ["null", "int"], "default": null },
{ "name": "devicePixelRatio", "type": ["null", "int"], "default": null },
{ "name": "partyId", "type": ["null", "string"], "default": null },
{ "name": "sessionId", "type": ["null", "string"], "default": null },
{ "name": "pageViewId", "type": ["null", "string"], "default": null },
{ "name": "eventType", "type": "string", "default": "unknown" },
{ "name": "eventId", "type": "string", "default": "unknown" },
{ "name": "localPath", "type": ["null", "string"], "default": null },
{ "name": "userAgentString", "type": ["null", "string"], "default": null },
{ "name": "userAgentName", "type": ["null", "string"], "default": null },
{ "name": "userAgentFamily", "type": ["null", "string"], "default": null },
{ "name": "userAgentVendor", "type": ["null", "string"], "default": null },
{ "name": "userAgentType", "type": ["null", "string"], "default": null },
{ "name": "userAgentVersion", "type": ["null", "string"], "default": null },
{ "name": "userAgentDeviceCategory", "type": ["null", "string"], "default": null },
{ "name": "userAgentOsFamily", "type": ["null", "string"], "default": null },
{ "name": "userAgentOsVersion", "type": ["null", "string"], "default": null },
{ "name": "userAgentOsVendor", "type": ["null", "string"], "default": null }
]
}
5 changes: 5 additions & 0 deletions divolte-kafka-streams/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%p %m (%c:%L) %n
45 changes: 45 additions & 0 deletions divolte-kafka-streams/src/main/resources/mapping.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
mapping {
map {parse eventParameters().value('userId') to int32 } onto 'userId'

map eventType() onto 'eventType'
map firstInSession() onto 'firstInSession'
map timestamp() onto 'timestamp'
map remoteHost() onto 'remoteHost'
map duplicate() onto 'detectedDuplicate'
map corrupt() onto 'detectedCorruption'
map clientTimestamp() onto 'clientTimestamp'
map eventId() onto 'eventId'
map cookie('cookieCustom') onto 'cookieCustom'

map referer() onto 'referer'
map location() onto 'location'
map viewportPixelWidth() onto 'viewportPixelWidth'
map viewportPixelHeight() onto 'viewportPixelHeight'
map screenPixelWidth() onto 'screenPixelWidth'
map screenPixelHeight() onto 'screenPixelHeight'
map devicePixelRatio() onto 'devicePixelRatio'
map partyId() onto 'partyId'
map sessionId() onto 'sessionId'
map pageViewId() onto 'pageViewId'

map userAgentString() onto 'userAgentString'
def ua = userAgent()
map ua.name() onto 'userAgentName'
map ua.family() onto 'userAgentFamily'
map ua.vendor() onto 'userAgentVendor'
map ua.type() onto 'userAgentType'
map ua.version() onto 'userAgentVersion'
map ua.deviceCategory() onto 'userAgentDeviceCategory'
map ua.osFamily() onto 'userAgentOsFamily'
map ua.osVersion() onto 'userAgentOsVersion'
map ua.osVendor() onto 'userAgentOsVendor'

def locationUri = parse location() to uri
def localUri = parse locationUri.rawFragment() to uri
map localUri.path() onto 'localPath'

def localQuery = localUri.query()
map localQuery.value('q') onto 'q'
map localQuery.value('page') onto 'page'
map { parse localQuery.value('n') to int32 } onto 'n'
}