Triton is an opinionated set of tooling for building a data pipeline around an AWS stack including Kinesis and S3.
It provides the necessary glue for building real applications on top of the type of architecture.
As your application collects data, write it to Kinesis streams as a series of events. Other applications in your infrastructure read from this stream providing a solid pattern for services to share data.
Triton aims to provide a level of tooling, glue and utility to make this ecosystem easy to use. Namely:
- Standardized, uniform method of determining what streams to write to.
- Process (and standard format) for archiving stream data to S3.
- Higher-level APIs for processing streams from Kinesis or S3.
Triton applications should share a configuration file for defining streams. Its yaml formatted, and looks something like:
my_stream:
name: my_stream_v2
partition_key: value
region: us-west-1
The name configuration allows for a decoupling between application-level
and logical stream names. So your application may only know about a stream
named user_activity
but the underlying AWS configured stream name may change.
Events in Triton are Serialized using Message Pack
When stored to S3, they are also framed and compressed using Snappy
This combination is good tradeoff between flexibility and performance.
Archive files in S3 are organized by date and time. For example:
20150710/user_activity_prod-store-1436553581.tri
This indicates the stream user_activity_prod
, triton client store
stored at unix timestamp 1436553581
.
The date and time specify when the event was processed, not emitted. There is no guarantee that each file will contain a specific hour of data.
Triton uses an external database (postgresql or sqlite) for clients to maintain their stream position.
For example, each time triton store
writes data to S3, it updates the
database with the last sequence number for each shard in the stream it's
processing. In this way, if the process (or instance) dies, it can resume from
where it left off ensuring uninterrupted data stored in S3.
This checkpoint mechanism is available as a library.
This repository includes a command line tool triton
which provides some
tooling for using Kinesis streams.
The triton store
command runs a process to store a Kinesis stream into S3.
Events are batched together and uploaded to S3 in something close to hourly files.
Example usage:
$ export TRITON_CONFIG=/etc/triton.yaml
$ export TRITON_DB="postgres://user:password@triton.rds.amazon.com"
$ triton store --bucket=triton-prod --stream=user_activity
The command line client in this package also acts as an example client.
Your client application will need to load the configuration to know how to connect to streams. Something like:
f, _ := os.Open("triton.yaml")
c, _ := triton.NewConfigFromFile(f)
sc, _ := c.ConfigForName("my_stream")
A live client would connect to the Kinesis shards and process records like:
kSvc := kinesis.New(config)
stream, err := triton.NewStreamReader(kSvc, sc.StreamName, nil)
for {
rec, err := stream.ReadRecord()
if err != nil {
if err == io.EOF {
break
} else {
panic(err)
}
}
b, err := json.Marshal(rec)
if err != nil {
panic(err)
}
fmt.Println(string(b))
}
There are two global variables that help control the interaction with Kinesis:
- MinPollInterval: minimum amount of time between Kinesis GetRecords api calls
- RequestLimit: Maximum amount of records to return for each GetRecords call
A very similar API is available for processing from S3:
s3Svc := s3.New(&aws.Config{Region: aws.String("us-west-1")})
start := time.Time(2015, 10, 1, 0, 0, 0, 0, nil)
end := time.Time(2015, 11, 1, 0, 0, 0, 0, nil)
stream, _ := triton.NewStoreReader(s3Svc, bucketName, "store", sc.StreamName, start, end)
for {
rec, err := stream.ReadRecord()
if err != nil {
if err == io.EOF {
break
} else {
panic(err)
}
}
b, err := json.Marshal(rec)
if err != nil {
panic(err)
}
fmt.Println(string(b))
}
A critical feature for building kinesis stream processors is to store the cursor position. If your client exits, or fails, you'll likely want to resume from where you left off.
By providing a 'Checkpointer' to the StreamReader, you get this extra functionality:
c, _ := triton.NewCheckpointer("myclient", sc.StreamName, db)
stream, _ := triton.NewStreamReader(kSvc, sc.StreamName, c)
for {
rec, _ := stream.ReadRecord()
stream.Checkpoint()
...
}
The core of the Triton stack is built in Go. It's assumed that Triton will be used with applications running in other languages besides go. With that in mind, see also triton-python
This package ships a single command: triton
Assuming a normal go workspace (placing this code in
$GOROOT/src/github.com/postmates/go-triton
), you can use the Makefile:
make
Standard go build commands of course also work.
- Metrics/Reporting hooks for easier status checks
- Better handle Kinesis shard splitting and combining
- Better patterns for dealing with arbitrary
map[string]interface{}
data