Skip to content

Commit 8900b5f

Browse files
committed
packer
1 parent 54adec5 commit 8900b5f

File tree

5 files changed

+77
-0
lines changed

5 files changed

+77
-0
lines changed

.idea/modules/kafka-transform-decrypt_main.iml

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules/kafka-transform-decrypt_test.iml

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

build.gradle

+3
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,7 @@ dependencies {
2929
// kafka
3030
//compile 'org.apache.kafka:kafka-clients:0.10.0.1'
3131
compile 'org.apache.kafka:kafka-clients:0.9.0.1'
32+
33+
// msgpack
34+
compile 'org.msgpack:msgpack-core:0.8.9'
3235
}

src/main/kotlin/main.kt

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import org.apache.kafka.clients.producer.Producer
44
import org.apache.kafka.clients.producer.ProducerConfig
55
import org.apache.kafka.clients.producer.ProducerRecord
66
import org.apache.kafka.common.errors.WakeupException
7+
import org.msgpack.core.MessagePack
78
import java.io.File
89
import java.util.*
910
import java.util.concurrent.atomic.AtomicBoolean

src/main/kotlin/packer.kt

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import org.msgpack.core.MessagePack
2+
import org.msgpack.core.buffer.ArrayBufferInput
3+
import org.msgpack.core.buffer.ArrayBufferOutput
4+
5+
/**
6+
* NOT THREAD SAFE
7+
*/
8+
class Packer(val transform: (buffer: ByteArray, len: Int) -> String) {
9+
private var buffer: ByteArray = ByteArray(256)
10+
private val unpacker = MessagePack.newDefaultUnpacker(buffer)
11+
private val packer = MessagePack.newDefaultBufferPacker()
12+
private val output = ArrayBufferOutput(256)
13+
14+
/**
15+
* Creates new msgpack array from `packed`
16+
*
17+
* Value to transform must be at index 1 (second item)
18+
* Transformed value will be returned at index 2 (third item)
19+
* `packed` must have nil at index 2
20+
*
21+
* The returned array will have the same number of items as `packed`
22+
* items at index 0, 1, 3+ will be reproduced exactly as is
23+
* index 2 will be the only difference between the two arrays
24+
*
25+
* @param packed an encoded msgpack array
26+
* @return an encoded msgpack array
27+
*/
28+
fun repack(packed: ByteArray): ByteArray {
29+
// reset everything
30+
val unpacker = unpacker
31+
val packer = packer
32+
unpacker.reset(ArrayBufferInput(packed))
33+
packer.reset(output) // TODO make sure we're not shrinking
34+
35+
36+
// array length
37+
packer.packArrayHeader(unpacker.unpackArrayHeader())
38+
39+
// index 0 MUST int (version)
40+
packer.packInt(unpacker.unpackInt())
41+
42+
// index 1 MUST byte array
43+
val len = unpacker.unpackBinaryHeader()
44+
val buffer = getBuffer(len)
45+
unpacker.readPayload(buffer, 0, len)
46+
packer.packBinaryHeader(len)
47+
packer.writePayload(buffer, 0, len)
48+
49+
// index 2 transform result
50+
val result = transform(buffer, len)
51+
packer.packString(result)
52+
unpacker.unpackNil()
53+
54+
// (cheat and) copy the rest verbatim
55+
val remaining = packed.size - unpacker.totalReadBytes
56+
packer.writePayload(packed, unpacker.totalReadBytes as Int, remaining as Int)
57+
58+
return packer.toByteArray()
59+
}
60+
61+
private fun getBuffer(len: Int): ByteArray {
62+
var buffer = buffer
63+
if (buffer.size >= len)
64+
return buffer
65+
66+
// use next power of 2
67+
buffer = ByteArray(Integer.highestOneBit(len) * 2)
68+
this.buffer = buffer
69+
return buffer
70+
}
71+
}

0 commit comments

Comments
 (0)