diff --git a/docs/new_protocol.asciidoc b/docs/new_protocol.asciidoc index a51e50a7b89..38f9962e04c 100644 --- a/docs/new_protocol.asciidoc +++ b/docs/new_protocol.asciidoc @@ -366,6 +366,122 @@ integration tests: this is the table name. For key-value stores, this is the key. If nothing seems to make sense to put in this field, use the empty string. +==== Helpers + +In libbeat some helpers exist to implement parsers for binary and text based +protocols. The Bytes_Ntoh... functions being the most low level helpers for +binary protocols using network byte order can be found in the libbeat/common +module. In addition to these very low level helpers a stream buffer for parsing +tcp based streams, or simply udp packets with integrated error handling is provided by +libbeat/common/streambuf. This demonstrates it's usage for parsing the memcached +protocols udp header: + +[source,go] +---------------------------------------------------------------------- +func parseUdpHeader(buf *streambuf.Buffer) (mcUdpHeader, error) { + var h mcUdpHeader + h.requestId, _ = buf.ReadNetUint16() + h.seqNumber, _ = buf.ReadNetUint16() + h.numDatagrams, _ = buf.ReadNetUint16() + buf.Advance(2) // ignore reserved + return h, buf.Err() +} +---------------------------------------------------------------------- + +The stream buffer is also used to implement the binary and text based protocols +for memcache. + +[source,go] +---------------------------------------------------------------------- + header := buf.Snapshot() + buf.Advance(memcacheHeaderSize) + + msg := parser.message + if msg.IsRequest { + msg.vbucket, _ = header.ReadNetUint16At(6) + } else { + msg.status, _ = header.ReadNetUint16At(6) + } + + cas, _ := header.ReadNetUint64At(16) + if cas != 0 { + setCasUnique(msg, cas) + } + msg.opaque, _ = header.ReadNetUint32At(12) + + // check message length + + extraLen, _ := header.ReadNetUint8At(4) + keyLen, _ := header.ReadNetUint16At(2) + totalLen, _ := header.ReadNetUint32At(8) + + ... + + if extraLen > 0 { + tmp, _ := buf.Collect(int(extraLen)) + extras := streambuf.NewFixed(tmp) + var err error + if msg.IsRequest && requestArgs != nil { + err = parseBinaryArgs(parser, requestArgs, header, extras) + } else if responseArgs != nil { + err = parseBinaryArgs(parser, responseArgs, header, extras) + } + if err != nil { + msg.AddNotes(err.Error()) + } + } + + if keyLen > 0 { + key, _ := buf.Collect(int(keyLen)) + keys := []memcacheString{memcacheString{key}} + msg.keys = keys + } + + if valueLen == 0 { + return parser.yield(buf.BufferConsumed()) + } +---------------------------------------------------------------------- + +It also implements a number of interfaces defined in the standard "io" package +and can easily be used to serialize some packets for testing parsers (see +protos/memcache/binary_test.go). + +In addition packetbeat provides the module packetbeat/protos/applayer with +common definitions among all application layer protocols. For example using the +Transaction type from applayer guarantees the final document to have all common +required fields defined. Just embedd the applayer.Transaction with you own +application layer transaction type to make use of it (from memcache protocol): + +[source,go] +---------------------------------------------------------------------- + type transaction struct { + applayer.Transaction + + command *commandType + + request *message + response *message + } + + func (t *transaction) Event(event common.MapStr) error { + // use applayer.Transaction to write common required fields + if err := t.Transaction.Event(event); err != nil { + logp.Warn("error filling generic transaction fields: %v", err) + return err + } + + mc := common.MapStr{} + event["memcache"] = mc + + ... + + return nil + } +---------------------------------------------------------------------- + +Use applayer.Message in conjunction with applayer.Transaction for creating the +transaction and applayer.Stream to manage your stream buffers for parsing. + === Testing