diff --git a/src/apps/pcap/README.md b/src/apps/pcap/README.md index 346bd0b770..8017717ed4 100644 --- a/src/apps/pcap/README.md +++ b/src/apps/pcap/README.md @@ -1,4 +1,6 @@ -# PcapReader and PcapWriter Apps (apps.pcap.pcap) +# Pcap Savefile Apps + +## PcapReader and PcapWriter Apps (apps.pcap.pcap) The `PcapReader` and `PcapWriter` apps can be used to inject and log raw packet data into and out of the app network using the @@ -14,10 +16,50 @@ port to a PCAP file. | | | | +------------+ +------------+ -## Configuration +### Configuration Both `PcapReader` and `PcapWriter` expect a filename string as their configuration arguments to read from and write to respectively. `PcapWriter` will alternatively accept an array as its configuration argument, with the first element being the filename and the second element being a *mode* argument to `io.open`. + +## Tap (apps.pcap.tap) + +The `Tap` app is a simple in-band packet tap that writes packets that it +sees to a pcap savefile. It can optionally only write packets that pass +a pcap filter, and optionally subsample so it can write only every /n/th +packet. + + DIAGRAM: pcaptap + +-------------------+ + input | | output + ---->* apps.pcap.tap.Tap *----> + | | + +-------------------+ + +### Configuration + +The `Tap` app accepts a table as its configuration argument. The +following keys are defined: + +— Key **filename** + +*Required*. The name of the file to which to write the packets. + +— Key **mode** + +*Optional*. Either `"truncate"` or `"append"`, indicating whether the +savefile will be truncated (the default) or appended to. + +— Key **filter** + +*Optional*. A pflang filter expression to select packets for tapping. +Only packets that pass this filter will be sampled for the packet tap. + +— Key **sample** + +*Optional*. A sampling period. Defaults to 1, indicating that every +packet seen by the tap and passing the optional filter string will be +written. Setting this value to 2 will capture every second packet, and +so on. diff --git a/src/apps/pcap/tap.lua b/src/apps/pcap/tap.lua new file mode 100644 index 0000000000..f18171136b --- /dev/null +++ b/src/apps/pcap/tap.lua @@ -0,0 +1,87 @@ +-- Use of this source code is governed by the Apache 2.0 license; see COPYING. + +module(...,package.seeall) + +local ffi = require("ffi") + +local app = require("core.app") +local lib = require("core.lib") +local link = require("core.link") +local pcap = require("lib.pcap.pcap") +local pf = require("pf") + +Tap = {} + +local tap_config_params = { + -- Name of file to which to write packets. + filename = { required=true }, + -- "truncate" to truncate the file, or "append" to add to the file. + mode = { default = "truncate" }, + -- Only packets that match this pflang filter will be captured. + filter = { }, + -- Only write every Nth packet that matches the filter. + sample = { default=1 }, +} + +function Tap:new(conf) + local o = lib.parse(conf, tap_config_params) + local mode = assert(({truncate='w+b', append='a+b'})[o.mode]) + o.file = assert(io.open(o.filename, mode)) + if o.file:seek() == 0 then pcap.write_file_header(o.file) end + if o.filter then o.filter = pf.compile_filter(o.filter) end + o.n = o.sample - 1 + return setmetatable(o, {__index = Tap}) +end + +function Tap:push () + local n = self.n + while not link.empty(self.input.input) do + local p = link.receive(self.input.input) + if not self.filter or self.filter(p.data, p.length) then + n = n + 1 + if n == self.sample then + n = 0 + pcap.write_record(self.file, p.data, p.length) + end + end + link.transmit(self.output.output, p) + end + self.n = n +end + +function selftest () + print('selftest: apps.pcap.tap') + + local config = require("core.config") + local Sink = require("apps.basic.basic_apps").Sink + local PcapReader = require("apps.pcap.pcap").PcapReader + + local function run(filter, sample) + local tmp = os.tmpname() + local c = config.new() + -- Re-use example from packet filter test. + config.app(c, "source", PcapReader, "apps/packet_filter/samples/v6.pcap") + config.app(c, "tap", Tap, {filename=tmp, filter=filter, sample=sample}) + config.app(c, "sink", Sink ) + + config.link(c, "source.output -> tap.input") + config.link(c, "tap.output -> sink.input") + app.configure(c) + while not app.app_table.source.done do app.breathe() end + + local n = 0 + for packet, record in pcap.records(tmp) do n = n + 1 end + os.remove(tmp) + + app.configure(config.new()) + + return n + end + + assert(run() == 161) + assert(run("icmp6") == 49) + assert(run(nil, 2) == 81) + assert(run("icmp6", 2) == 25) + + print('selftest: ok') +end diff --git a/src/apps/rss/README.md b/src/apps/rss/README.md new file mode 100644 index 0000000000..e92b0373b8 --- /dev/null +++ b/src/apps/rss/README.md @@ -0,0 +1,388 @@ +# RSS app (apps.rss.rss) + +The `rss` app implements the basic functionality needed to provide +generic *receive side scaling* to other apps. In essence, the `rss` +app takes packets from an arbitrary number `n` of input links and +distributes them to an arbitrary number `m` of output links + + DIAGRAM: rss + +--------+ + input_1 ---->* *----> output_1 + . | rss | . + . | | . + input_n ---->* *----> output_m + +--------+ + +The distribution algorithm has the property that all packets belonging +to the same *flow* are guaranteed to be mapped to the same output +link, where a flow is identified by the value of certain fields of the +packet header, depending on the type of packet. + +For IPv4 and IPv6, the basic classifier is given by the 3-tuple +(`source address`, `destination address`, `protocol`), where +`protocol` is the value of the protocol field of the IPv4 header or +the value of the next-header field that identifies the "upper-layer +protocol" of the IPv6 header (which may be preceeded by any number of +extension headers). + +If the protocol is either TCP (protocol #6), UDP (protocol #17) or +SCTP (protocol #132), the list of header fields is augmented by the +port numbers to yield the 5-tuple (`source address`, `destination +address`, `protocol`, `source port`, `destination port`). + +The output link is determined by applying a hash function to the set +of header fields + +``` +out_link = ( hash(flow_fields) % m ) + 1 +``` + +All other packets are not classified into flows and are always mapped +to the first output link. + +The actual scaling property is achieved by running the receivers in +separate processes and use specialized inter-process links to connect +them to the `rss` app. + +In addition to this basic functionality, the `rss` app also implements +the following set of extensions. + +## Flow-director + +The output links can be grouped into equivalence classes with respect +to matching conditions in terms of arbitrary pflang expressions as +provided by the `pf` module. Matching packets are only distributed to +the output links that belong to the equivalence class. By default, a +single equivalence class exists which matches all packets. It is +special in the sense that the matching condition cannot be expressed +in pflang. This default class is the only one that can receive non-IP +packets. + +Classes are specified in an explicit order when an instance of the +`rss` app is created. The default class is created implicitly as the +last element in the list. Each packet is matched against the filter +expressions, starting with the first one. If a match is found, the +packet is assigned to the corresponding equivalence class and +processing of the list stops. + +The default class can be disabled by configuration. In that case, +packets not assigned to any class are dropped. + +## Packet replication + +The standard flow-director assigns a packet to at most one class. Any +class can also be marked with the attribute `continue` to allow +matches to multiple classes. When a packet is matched to such a +class, it is distributed to the set of ouput links associated with +that class but processing of the remaining filter expressions +continues. If the packet matches a subsequent class, a copy is +created and distributed to the corresponding set of output links. +Processing stops when the packet matches a class that does not have +the `continue` attribute. + +## Weighted links + +By default, all output links in a class are treated the same. In +other words, if the input consists of a sufficiently large sample of +random flows, all links will receive about the same share of them. It +is possible to introduce a bias for certain links by assigning a +*weight* to them, given by a positive integer `w`. If the number of +links is `m` and the weight of link `i` (`1 <= i <= m`) is `w_i`, the +share of traffic received by it is given by + +``` +share_i = w_i/(w_1 + w_2 + ... + w_m) +``` + +For example, if `m = 2` and `w_1 = 1, w_2 = 2`, link #1 will get 1/3 +and link #2 will get 2/3 of the traffic. + +## Packet meta-data + +In order to compute the hash over the header fields, the `rss` app +must parse the packets to a certain extent. Internally, the result of +this analysis is appended as a block of data to the end of the actual +packet data. Because this data can be useful to other apps downstream +of the `rss` app, it is exposed as part of the API. + +The meta-data is structured as follows + +``` + struct { + uint16_t magic; + uint16_t ethertype; + uint16_t vlan; + uint16_t total_length; + uint8_t *filter_start; + uint16_t filter_length; + uint8_t *l3; + uint8_t *l4; + uint16_t filter_offset; + uint16_t l3_offset; + uint16_t l4_offset; + uint8_t proto; + uint8_t frag_offset; + int16_t length_delta; + } +``` + +* `magic` + + This field contains the constant `0x5abb` to mark the start of a + valid meta-data block. The **get** API function asserts that this + value is correct. + +* `ethertype` + + This is the Ethertype contained in the Ethernet header of the + packet. If the frame is of type 802.1q, i.e. the Ethertype is + `0x8100`, the `ethertype` field is set to the effective Ethertype + following the 802.1q header. Only one level of tagging is + recognised, i.e. for double-tagged frames, `ethertype` will contain + the value `0x8100`. + +* `vlan` + + If the frame contains a 802.1q tag, `vlan` is set to the value of + the `VID` field of the 802.1q header. Otherwise it is set to 0. + +* `total_length` + + If `ethertype` identifies the frame as either a IPv4 or IPv6 packet + (i.e. the values `0x0800` and `0x86dd`, respectively), + `total_length` is the size of the L3 payload of the Ethernet frame + according to the L3 header, including the L3 header itself. For + IPv4, this is the value of the header's *Total Length* field. For + IPv6, it is the sum of the header's *Payload Length* field and the + size of the basic header (40 bytes). + + For all other values of `ethertype`, `total_length` is set to the + effective size of the packet (according to the `length` field of the + `packet` data structure) minus the the size of the Ethernet header + (14 bytes for untagged frames and 18 bytes for 802.1q tagged + frames). + +* `filter_start` + + This is a pointer into the packet that can be passed as first + argument to a BPF matching function generated by + **pf.compile_filter**. + + For untagged frames, this is a pointer to the proper Ethernet + header. + + For 802.1q tagged frames, an offset of 4 bytes is added to skip the + 802.1q header. The reason for this is that the `pf` module does not + implement the `vlan` primitive of the standard BPF syntax. The + additional 4-byte offset places the effective Ethertype (i.e. the + same value as in the `ethertype` meta-data field) at the position of + an untagged Ethernet frame. Note that this makes the original MAC + addresses unavailable to the filter. + +* `filter_length` + + This value is the size of the chunk of data pointed to by + `filter_start` and can be passed as second argument to a BPF + matching function generated by **pf.compile_filter**. It is equal + to the size of the packet if the frame is untagged or 4 bytes less + than that if the frame is 802.1q tagged. + +* `l3` + + This is a pointer to the start of the L3 header in the packet. + +* `l4` + + This is a pointer to the start of the L4 header in the packet. For + IPv4 and IPv6, it points to the first byte following the L3 header. + For all other packets, it is equal to `l3`. + +* `filter_offset`, `l3_offset`, `l4_offset` + + These values are the offsets of `filter_start`, `l3`, and `l4` + relative to the start of the packet. They are used by the **copy** + API call to re-calculate the pointers after the meta-data block has + been relocated. + +* `proto` + + For IPv4 and IPv6, the `proto` field contains the identifier of the + *upper layer protocol* carried in the payload of the packet. For + all other packets, its value is undefined. + + For IPv4, the upper layer protocol is given by the value of the + *Protocol* field of the header. For IPv6, it is the value of the + *Next Header* field of the last extension header in the packet's + header chain. The `rss` app recognizes the following protocol + identifiers as extension headers according to the [IANA + ipv6-parameters + registry](http://www.iana.org/assignments/ipv6-parameters) + + * 0 IPv6 Hop-by-Hop Option + * 43 Routing Header for IPv6 + * 44 Fragment Header for IPv6 + * 51 Authentication Header + * 60 Destination Options for IPv6 + * 135 Mobility Header + * 139 Host Identity Protocol + * 140 Shim6 Protocol + + Note that the protocols 50 (Encapsulating Security Payload, ESP), + 253 and 254 (reserved for experimentation and testing) are treated + as upper layer protocols, even though, technically, they are + classified as extension headers. + +* `frag_offset` + + For fragmented IPv4 and IPv6 packets, the `frag_offset` field + contains the offset of the fragment in the original packet's payload + in 8-byte units. A value of zero indicates that the packet is + either not fragmented at all or is the initial fragment. + + For non-IP packets, the value is undefined. + +* `length_delta` + + This field contains the difference of the packet's effective length + (as given by the `length` field of the packet data structure) and + the size of the packet calculated from the IP header, i.e. the sum + of `l3_offset` and `total_length`. For a regular packet, this + difference is zero. + + A negative value indicates that the packet has been truncated. A + typical scenario where this is expected to occur is a setup + involving a port-mirror that truncates packets either due to + explicit configuration or due to a hardware limitation. The + `length_delta` field can be used by a downstream app to determine + whether it has received a complete packet. + + A positive value indicates that the packet contains additional data + which is not part of the protocol data unit. This is not expected + to occur under normal circumstances. However, it has been observed + that some devices perform this kind of padding when port-mirroring + is configured with packet truncation and the mirrored packet is + smaller than the truncation limit. + + For non-IP packets, `length_delta` is always zero. + +## IPv6 extension header elimination + +The `pf` module does not implement the `protochain` primitive for +IPv6. The only extension header it can deal with is the fragmentation +header (protocol 44). As a consequence, packets containing arbitrary +extension headers can not be matched against filter expressions. + +To overcome this limitation, the meta-data generator of the `rss` app +removes all extension headers from a packet by default, leaving only +the basic IPv6 header followed immediately by the upper layer +protocol. The values of the *Payload Length* and *Next Header* fields +of the basic IPv6 header as well as the packet length are adjusted +accordingly. + +## VLAN pseudo-tagging + +Since the `rss` app can accept packets from multiple sources, the +information on which link the packet was received is not trivially +available to receiving apps unless the packets contain a unique +identifier of some sort, e.g. a particular VLAN tag. If such an +identifier is not available, the `rss` app can be configured to attach +a pseudo VLAN tag to packets arriving on a particular input link. It +is called "pseudo tagging" because the VLAN is only added to the +packet's meta-data, not the packet itself. As a consequence, a +receiving app only sees this kind of tag when it examines the +meta-data provided by the `rss` app. Such a pseudo-tag also overrides +any native VLAN tag that a packet might have. + +The pseudo-tagging is enabled by following a convention for the naming +of input links as described below. + +If proper VLAN tagging is required, the `vlan.vlan.Tagger` app can be +pushed between the packet source and the input link. + +## Configuration + +The `rss` app accepts the following arguments. + +— Key **default_class** + +*Optional*. A boolean that specifies whether the default filter class +should be enabled. The default is `true`. The name of the default +class is *default*. + +— Key **classes** + +*Optional*. An ordered list of class specifications. Each +specification must be a table with the following keys. + + * Key **name** + + *Required*. The name of the class. It must be unique among all + classes and it must match the Lua regular expression `%w+`. + + * Key **filter** + + *Required*. A string containing a pflang filter expression. + + * Key **continue** + + *Optional*. A boolean that specifies whether processing of classes + should continue if a packet has matched the filter of this class. + The default is `false`. + +— Key **remove_extension_headers** + +*Optional*. A boolean that specifies whether IPv6 extension headers +shoud be removed from packets. The default is `true`. + +The **classes** configuration option specifies the set of classes +known to an instance of the `rss` app. The assignment of links to +classes is done implicitly by connecting other apps using the +convention `_` for the name of the links, where +`` is the name of the class to which the links should be +assigned exactly as specified by the **name** parameter of the class +definition. The `` specifier can be any string (adhering to +the naming convention for links) that distinguishes the links within a +class. + +If the instance specifier is formatted as `_`, where +`` is restricted to the pattern `%w+` and `` must be +a number, the link's weight is set to the value ``. The +default weight for a links is 1. + +If the `rss` app detects an output link whose name does not match any +of the configured classes, it issues a warning message and ignores the +link. Classes to which no output links are assigned are ignored. + +The names of the input links are arbitrary unless the VLAN +pseudo-tagging feature should be used. In that case, the link must be +named `vlan`, where `` must be a number between 1 +and 4094 and will be placed in the `` meta-data field of every +packet received on the link (irrespective of whether the packet has a +real VLAN ID or not). + +## Meta-data API + +The meta-data functionality is provided by the module +`apps.rss.metadata` and provides the following API. + +— Function **add** *packet*, *remove_extension_headers*, *vlan* + +Analyzes *packet* and adds a meta-data block starting immediately +after the packet data. If the boolean *remove_extension_headers* is +`true`, IPv6 extension headers are removed from the packet. The +optional *vlan* overrides the value of the `vlan` meta-data field +extracted from the packet, irrespective of whether the packet actually +has a tag or not. + +An error is raised if there is not enough room for the mata-data block +in the packet. + +— Function **get** *packet* + +Returns a pointer to the meta-data in *packet*. An error is raised if +the meta-data block does not start with the magic number (`0x5abb`). + +— Function **copy** *packet* + +Creates a copy of *packet* including the meta-data block. Returns a +pointer to the new packet. diff --git a/src/apps/rss/metadata.lua b/src/apps/rss/metadata.lua new file mode 100644 index 0000000000..96ab01fc49 --- /dev/null +++ b/src/apps/rss/metadata.lua @@ -0,0 +1,310 @@ +-- Use of this source code is governed by the Apache 2.0 license; see COPYING. + +module(..., package.seeall) + +local ffi = require("ffi") +local lib = require("core.lib") +local consts = require("apps.lwaftr.constants") + +local ntohs = lib.ntohs +local htons = lib.htons + +local ethertype_ipv4 = consts.ethertype_ipv4 +local ethertype_ipv6 = consts.ethertype_ipv6 +local ethernet_header_size = consts.ethernet_header_size +local o_ipv4_total_length = consts.o_ipv4_total_length +local o_ipv4_ver_and_ihl = consts.o_ipv4_ver_and_ihl +local o_ipv4_flags = consts.o_ipv4_flags +local o_ipv4_proto = consts.o_ipv4_proto +local ipv6_fixed_header_size = consts.ipv6_fixed_header_size +local o_ipv6_payload_len = consts.o_ipv6_payload_len +local o_ipv6_next_header = consts.o_ipv6_next_header + +local uint16_ptr_t = ffi.typeof('uint16_t *') + +local function get_ipv4_total_length(l3) + return ntohs(ffi.cast(uint16_ptr_t, l3 + o_ipv4_total_length)[0]) +end + +local function get_ipv4_ihl(l3) + return (bit.band((l3 + o_ipv4_ver_and_ihl)[0], 0x0f)) +end + +local function get_ipv4_offset(l3) + local flags_offset = ntohs(ffi.cast(uint16_ptr_t, l3 + o_ipv4_flags)[0]) + return (bit.band(0x1fff, flags_offset)) +end + +local function get_ipv4_protocol(l3) + return l3[o_ipv4_proto] +end + +local function get_ipv6_payload_length(l3) + return ntohs(ffi.cast(uint16_ptr_t, l3 + o_ipv6_payload_len)[0]) +end + +local function set_ipv6_payload_length(l3, length) + (ffi.cast(uint16_ptr_t, l3 + o_ipv6_payload_len))[0] = htons(length) +end + +local function get_ipv6_next_header(l3) + return l3[o_ipv6_next_header] +end + +local function set_ipv6_next_header(l3, type) + l3[o_ipv6_next_header] = type +end + +local function ptr_to(ctype) + return ffi.typeof('$*', ctype) +end + +local ipv6_ext_hdr_t = ffi.typeof([[ + struct { + uint8_t next_header; + uint8_t length; + uint8_t data[0]; + } __attribute__((packed)) +]]) +local ipv6_ext_hdr_ptr_t = ptr_to(ipv6_ext_hdr_t) + +local ipv6_frag_hdr_t = ffi.typeof([[ + struct { + uint8_t next_header; + uint8_t reserved; + uint16_t offset_flags; + uint32_t identificaton; + } __attribute__((packed)) +]]) +local ipv6_frag_hdr_ptr_t = ptr_to(ipv6_frag_hdr_t) + +local function ipv6_generic_ext_hdr(ptr) + local ext_hdr = ffi.cast(ipv6_ext_hdr_ptr_t, ptr) + local next_header = ext_hdr.next_header + local length = ext_hdr.length + -- Length in units of 8 bytes, not including the first 8 bytes + return length * 8 + 8, next_header +end + +-- The fragmentation header inspector sets this upvalue as a side +-- effect. Only at most one fragmentation header is expected in a +-- header chain. +local ipv6_frag_offset + +local ipv6_ext_hdr_fns = { + [0] = + -- Hop-by-hop + ipv6_generic_ext_hdr, + [43] = + -- Routing + ipv6_generic_ext_hdr, + [44] = + -- Fragmentation, fixed size (8 bytes) + function(ptr) + local frag_hdr = ffi.cast(ipv6_frag_hdr_ptr_t, ptr) + local next_header = frag_hdr.next_header + ipv6_frag_offset = bit.rshift(ntohs(frag_hdr.offset_flags), 3) + return 8, next_header + end, + [51] = + -- IPSec authentication header RFC4302. Next header and length + -- fields are the same as for a generic header, but the units of + -- the length differs. + function(ptr) + local ext_hdr = ffi.cast(ipv6_ext_hdr_ptr_t, ptr) + local next_header = ext_hdr.next_header + -- Length in units of 4 bytes minus 2 + local payload_len = ext_hdr.length + return payload_len * 4 - 2, next_header + end, + [59] = + -- No next header + function(ptr) + return 0, 255 + end, + [60] = + -- Destination + ipv6_generic_ext_hdr, + [135] = + -- Mobility RFC6275 + ipv6_generic_ext_hdr, + [139] = + -- HIP RFC7401 + ipv6_generic_ext_hdr, + [140] = + -- Shim6 RFC5533 + ipv6_generic_ext_hdr, +} + +local function traverse_extension_headers(pkt, l3, squash) + local payload = l3 + ipv6_fixed_header_size + local payload_length = get_ipv6_payload_length(l3) + -- This differs from payload_length if the packet is truncated + local eff_payload_length = pkt.data + pkt.length - payload + local ulp = get_ipv6_next_header(l3) + + local next_header = ulp + local ext_hdrs_size = 0 + ipv6_frag_offset = 0 + local ipv6_ext_hdr_fn = ipv6_ext_hdr_fns[next_header] + while ipv6_ext_hdr_fn do + hdr_size, next_header = ipv6_ext_hdr_fn(payload + ext_hdrs_size) + ext_hdrs_size = ext_hdrs_size + hdr_size + if ext_hdrs_size < 0 or ext_hdrs_size > eff_payload_length then + -- The extension header has lead us out of the packet, bail + -- out and leave the packet unmodified. The ulp returned to + -- the caller is the next header field of the basic header. + goto exit + end + ipv6_ext_hdr_fn = ipv6_ext_hdr_fns[next_header] + end + -- All extension headers known to us have been skipped. next_header + -- contains what we consider as the "upper layer protocol". + ulp = next_header + if ext_hdrs_size > 0 and squash then + pkt.length = pkt.length - ext_hdrs_size + payload_length = payload_length - ext_hdrs_size + set_ipv6_next_header(l3, ulp) + set_ipv6_payload_length(l3, payload_length) + ffi.C.memmove(payload, payload + ext_hdrs_size, + eff_payload_length - ext_hdrs_size + + ffi.sizeof(pkt_meta_data_t)) + end + ::exit:: + return payload_length, ulp +end + +ether_header_t = ffi.typeof([[ + struct { + uint8_t dhost[6]; + uint8_t shost[6]; + union { + struct { + uint16_t type; + } ether; + struct { + uint16_t tpid; + uint16_t tci; + uint16_t type; + } dot1q; + }; + } __attribute__((packed)) +]]) +ether_header_ptr_t = ptr_to(ether_header_t) + +local magic_number = 0x5ABB + +pkt_meta_data_t = ffi.typeof([[ + struct { + uint16_t magic; + /* Actual ethertype for single-tagged frames */ + uint16_t ethertype; + /* vlan == 0 if untagged frame */ + uint16_t vlan; + /* Total size, excluding the L2 header */ + uint16_t total_length; + /* Pointer and length that can be passed directly to a pflua filter */ + uint8_t *filter_start; + uint16_t filter_length; + /* Pointers to the L3 and L4 headers */ + uint8_t *l3; + uint8_t *l4; + /* Offsets of the respective pointers relative to the + start of the packet. Used to re-calculate the + pointers by copy() */ + uint16_t filter_offset; + uint16_t l3_offset; + uint16_t l4_offset; + uint8_t proto; + /* Fragment offset in units of 8 bytes. Equals 0 if not fragmented + or initial fragment */ + uint8_t frag_offset; + /* Difference between packet length and length + according to the l3 header, negative if the + packet is truncated, == 0 if not. A positive value + would indicate that the packet contains some kind + of padding. This should not occur under normal + circumstances. */ + int16_t length_delta; + /* Used by the rss app */ + uint16_t ref; + uint16_t hash; + } __attribute__((packed)) +]]) +pkt_meta_data_ptr_t = ptr_to(pkt_meta_data_t) + +local function md_ptr (pkt) + assert(ffi.C.PACKET_PAYLOAD_SIZE - pkt.length >= ffi.sizeof(pkt_meta_data_t)) + return ffi.cast(pkt_meta_data_ptr_t, pkt.data + pkt.length) +end + +local function set_pointers (md, pkt) + local data = pkt.data + md.filter_start = data + md.filter_offset + md.l3 = data + md.l3_offset + md.l4 = data + md.l4_offset +end + +function get (pkt) + local md = md_ptr(pkt) + assert(md.magic == magic_number) + return md +end + +function copy (pkt) + local smd = get(pkt) + local cpkt = packet.clone(pkt) + local dmd = md_ptr(cpkt) + ffi.copy(dmd, smd, ffi.sizeof(pkt_meta_data_t)) + set_pointers(dmd, cpkt) + return cpkt +end + +function add (pkt, rm_ext_headers, vlan_override) + local vlan = 0 + local filter_offset = 0 + local l3_offset = ethernet_header_size + local hdr = ffi.cast(ether_header_ptr_t, pkt.data) + local ethertype = lib.ntohs(hdr.ether.type) + if ethertype == 0x8100 then + ethertype = lib.ntohs(hdr.dot1q.type) + vlan = bit.band(lib.ntohs(hdr.dot1q.tci), 0xFFF) + filter_offset = 4 + l3_offset = l3_offset + filter_offset + end + + local md = md_ptr(pkt) + md.magic = magic_number + md.ref = 0 + md.ethertype = ethertype + md.vlan = vlan_override or vlan + md.l3_offset = l3_offset + local l3 = pkt.data + l3_offset + + if ethertype == ethertype_ipv4 then + md.total_length = get_ipv4_total_length(l3) + md.l4_offset = l3_offset + 4 * get_ipv4_ihl(l3) + md.frag_offset = get_ipv4_offset(l3) + md.proto = get_ipv4_protocol(l3) + elseif ethertype == ethertype_ipv6 then + -- Optionally remove all extension headers from the packet and + -- track the position of the metadata block + local payload_length, next_header = + traverse_extension_headers(pkt, l3, rm_ext_headers) + md = get(pkt) + md.total_length = payload_length + ipv6_fixed_header_size + md.l4_offset = l3_offset + ipv6_fixed_header_size + md.frag_offset = ipv6_frag_offset + md.proto = next_header + else + md.total_length = pkt.length - l3_offset + md.l4_offset = l3_offset + end + + md.filter_offset = filter_offset + md.filter_length = pkt.length - filter_offset + md.length_delta = pkt.length - l3_offset - md.total_length + set_pointers(md, pkt) + + return md +end diff --git a/src/apps/rss/rss.lua b/src/apps/rss/rss.lua new file mode 100644 index 0000000000..24efd2d1cf --- /dev/null +++ b/src/apps/rss/rss.lua @@ -0,0 +1,418 @@ +-- Use of this source code is governed by the Apache 2.0 license; see COPYING. + +module(..., package.seeall) + +local packet = require("core.packet") +local lib = require("core.lib") +local counter = require("core.counter") +local siphash = require("lib.hash.siphash") +local metadata = require("apps.rss.metadata") +local pf = require("pf") +local ffi = require("ffi") + +local rshift = bit.rshift +local receive, transmit = link.receive, link.transmit +local nreadable = link.nreadable +local free, clone = packet.free, packet.clone +local mdadd, mdget, mdcopy = metadata.add, metadata.get, metadata.copy + +local transport_proto_p = { + -- TCP + [6] = true, + -- UDP + [17] = true, + -- SCTP + [132] = true +} + +rss = { + config = { + default_class = { default = true }, + classes = { default = {} }, + remove_extension_headers = { default = true } + }, + shm = { + rxpackets = { counter, 0}, + rxdrops_filter = { counter, 0} + } +} +local class_config = { + name = { required = true }, + filter = { required = true }, + continue = { default = false } +} + +local hash_info = { + -- IPv4 + [0x0800] = { + addr_offset = 12, + addr_size = 8 + }, + -- IPv6 + [0x86dd] = { + addr_offset = 8, + addr_size = 32 + }, +} + +function rss:new (config) + local o = { classes = {}, + links_configured = {}, + queue = link.new("queue"), + rxpackets = 0, + rxdrops_filter = 0, + sync_timer = lib.throttle(1), + rm_ext_headers = config.remove_extension_headers + } + + for _, info in pairs(hash_info) do + info.key_t = ffi.typeof([[ + struct { + uint8_t addrs[$]; + uint32_t ports; + uint8_t proto; + } __attribute__((packed)) + ]], info.addr_size) + info.key = info.key_t() + info.hash_fn = + siphash.make_hash({ size = ffi.sizeof(info.key), + key = siphash.random_sip_hash_key() }) + end + + local function add_class (name, match_fn, continue) + assert(name:match("%w+"), "Illegal class name: "..name) + table.insert(o.classes, { + name = name, + match_fn = match_fn, + continue = continue, + input = link.new(name), + output = { n = 0 } + }) + end + + local classes = { default = true } + for _, class in ipairs(config.classes) do + local config = lib.parse(class, class_config) + assert(not classes[config.name], + "Duplicate filter class: "..config.name) + classes[config.name] = true + add_class(config.name, pf.compile_filter(config.filter), + config.continue) + end + if config.default_class then + -- Catch-all default filter + add_class("default", function () return true end) + end + + return setmetatable(o, { __index = self }) +end + +function rss:link () + for name, l in pairs(self.output) do + if type(name) == "string" then + if not self.links_configured[name] then + self.links_configured[name] = true + local match = false + for _, class in ipairs(self.classes) do + local instance = name:match("^"..class.name.."_(.*)") + if instance then + match = true + local weight = instance:match("^%w+_(%d+)$") or 1 + for _ = 1, weight do + table.insert(class.output, l) + end + -- Avoid calls to lj_tab_len() in distribute() + class.output.n = #class.output + end + end + if not match then + print("Ignoring link (does not match any filters): "..name) + end + end + end + end + + self.classes_active = {} + for _, class in ipairs(self.classes) do + if #class.output > 0 then + table.insert(self.classes_active, class) + end + end + + self.input_tagged = {} + for name, link in pairs(self.input) do + if type(name) == "string" then + local vlan = name:match("^vlan(%d+)$") + if vlan then + vlan = tonumber(vlan) + assert(vlan > 0 and vlan < 4095, "Illegal VLAN id: "..vlan) + end + table.insert(self.input_tagged, { link = link, vlan = vlan }) + end + end +end + +local function hash (md) + local info = hash_info[md.ethertype] + local hash = 0 + if info then + ffi.copy(info.key.addrs, md.l3 + info.addr_offset, info.addr_size) + if transport_proto_p[md.proto] then + info.key.ports = ffi.cast("uint32_t *", md.l4)[0] + else + info.key.ports = 0 + end + info.key.proto = md.proto + -- Our SipHash implementation produces only even numbers to satisfy some + -- ctable internals. + hash = rshift(info.hash_fn(info.key), 1) + end + md.hash = hash +end + +local function distribute (p, links, hash) + -- This relies on the hash being a 16-bit value + local index = rshift(hash * links.n, 16) + 1 + transmit(links[index], p) +end + +function rss:push () + local queue = self.queue + + for _, input in ipairs(self.input_tagged) do + local link, vlan = input.link, input.vlan + local npackets = nreadable(link) + self.rxpackets = self.rxpackets + npackets + for _ = 1, npackets do + local p = receive(link) + hash(mdadd(p, self.rm_ext_headers, vlan)) + transmit(queue, p) + end + end + + for _, class in ipairs(self.classes_active) do + -- Apply the filter to all packets. If a packet matches, it is + -- put on the class' input queue. If the class is of type + -- "continue" or the packet doesn't match the filter, it is put + -- back onto the main queue for inspection by the next class. + for _ = 1, nreadable(queue) do + local p = receive(queue) + local md = mdget(p) + if class.match_fn(md.filter_start, md.filter_length) then + md.ref = md.ref + 1 + transmit(class.input, p) + if class.continue then + transmit(queue, p) + end + else + transmit(queue, p) + end + end + end + + for _ = 1, nreadable(queue) do + local p = receive(queue) + local md = mdget(p) + if md.ref == 0 then + self.rxdrops_filter = self.rxdrops_filter + 1 + free(p) + end + end + + for _, class in ipairs(self.classes_active) do + for _ = 1, nreadable(class.input) do + local p = receive(class.input) + local md = mdget(p) + if md.ref > 1 then + md.ref = md.ref - 1 + distribute(mdcopy(p), class.output, md.hash) + else + distribute(p, class.output, md.hash) + end + end + end + + if self.sync_timer() then + counter.set(self.shm.rxpackets, self.rxpackets) + counter.set(self.shm.rxdrops_filter, self.rxdrops_filter) + end +end + +function selftest () + local vlan_id = 123 + local addr_ip = ffi.new("uint8_t[4]") + local addr_ip6 = ffi.new("uint8_t[16]") + local function random_ip(addr) + for i = 0, ffi.sizeof(addr) - 1 do + addr[i] = math.random(255) + end + return addr + end + + local ext_hdr = ffi.new([[ + struct { + uint8_t next_header; + uint8_t length; + uint8_t data[14]; + } __attribute__((packed)) + ]]) + local function push_ext_hdr(dgram, next_header) + local p = dgram:packet() + ext_hdr.next_header = next_header + ext_hdr.length = 1 + local length = ffi.sizeof(ext_hdr) + p = packet.prepend(p, ext_hdr, length) + dgram:new(p) + return length + end + + local Source = {} + + function Source:new() + local o = { + eth = require("lib.protocol.ethernet"):new({}), + ip = require("lib.protocol.ipv4"):new({ protocol = 17 }), + ip6 = require("lib.protocol.ipv6"):new({ next_header = 17 }), + udp = require("lib.protocol.udp"):new({}), + dgram = require("lib.protocol.datagram"):new() + } + return setmetatable(o, {__index=Source}) + end + + function Source:random_packet() + local p = packet.allocate() + local payload_size = math.random(9000) + p.length = payload_size + self.dgram:new(p) + self.udp:src_port(math.random(2^16-1)) + self.udp:dst_port(math.random(2^16-1)) + self.dgram:push(self.udp) + if math.random() > 0.5 then + self.ip:src(random_ip(addr_ip)) + self.ip:dst(random_ip(addr_ip)) + self.ip:total_length(self.ip:sizeof() + self.udp:sizeof() + + payload_size) + self.dgram:push(self.ip) + self.eth:type(0x0800) + else + local next_header = 17 + local ext_hdr_size = 0 + for _ = 1, math.ceil(math.random(3)) do + ext_hdr_size = ext_hdr_size + + push_ext_hdr(self.dgram, next_header) + next_header = 0 -- Hop-by-hop header + end + self.ip6:payload_length(ext_hdr_size + self.udp:sizeof() + + payload_size) + self.ip6:next_header(next_header) + self.ip6:src(random_ip(addr_ip6)) + self.ip6:dst(random_ip(addr_ip6)) + self.dgram:push(self.ip6) + self.eth:type(0x86dd) + end + self.dgram:push(self.eth) + return self.dgram:packet() + end + + function Source:pull () + for _, o in ipairs(self.output) do + for i = 1, engine.pull_npackets do + transmit(o, self:random_packet()) + end + end + end + + local Sink = {} + + function Sink:new () + return setmetatable({}, { __index = Sink }) + end + + function Sink:push () + for _, i in ipairs(self.input) do + for _ = 1, link.nreadable(i) do + local p = receive(i) + local md = mdget(p) + assert(md.ethertype == 0x0800 or md.ethertype == 0x86dd, + md.ethertype) + assert(md.vlan == 0 or md.vlan == vlan_id) + local offset = md.vlan == 0 and 0 or 4 + assert(md.filter_offset == offset, md.filter_offset) + assert(md.filter_start == p.data + offset) + assert(md.l3 == p.data + 14 + offset) + assert(md.total_length == p.length - 14 - offset) + assert(md.filter_length == p.length - offset) + if md.ethertype == 0x0800 then + assert(md.l4 == md.l3 + 20) + else + assert(md.l4 == md.l3 + 40) + end + assert(md.proto == 17) + assert(md.frag_offset == 0) + assert(md.length_delta == 0, md.length_delta) + packet.free(p) + end + end + end + + local graph = config.new() + config.app(graph, "rss", rss, { classes = { + { name = "ip", + filter = "ip", + continue = true }, + { name = "ip6", + filter = "ip6", + continue = true } } }) + config.app(graph, "source1", Source) + config.app(graph, "source2", Source) + config.app(graph, "vlan", require("apps.vlan.vlan").Tagger, + { tag = vlan_id }) + config.link(graph, "source1.output -> rss.input_plain") + config.link(graph, "source2.output -> vlan.input") + config.link(graph, "vlan.output -> rss.input_vlan") + + local sink_groups = { + { name = "default", n = 4}, + { name = "ip", n = 4 }, + { name = "ip6", n = 4 }, + } + for g, group in ipairs(sink_groups) do + for i = 1, group.n do + local sink_name = "sink"..g..i + config.app(graph, sink_name, Sink) + config.link(graph, "rss."..group.name.."_"..i + .." -> "..sink_name..".input") + end + end + + engine.configure(graph) + engine.main({ duration = 2, report = { showlinks = true } }) + + local function pkts(name, dir) + local app = engine.app_table[name] + if dir == "out" then + return tonumber(counter.read(app.output.output.stats.rxpackets)) + else + return tonumber(counter.read(app.input.input.stats.rxpackets)) + end + end + + local npackets = pkts("source1", "out") + pkts("source2", "out") + for g, group in ipairs(sink_groups) do + for i = 1, group.n do + local share = npackets/group.n + if group.name ~= "default" then + share = share/2 + end + local sink_name = "sink"..g..i + local pkts = pkts(sink_name, "in") + local threshold = 0.05 + local value = math.abs(1.0 - pkts/share) + if value >= threshold then + error(string.format("Unexpected traffic share on %s " + .."(expected %f, got %f)", + sink_name, threshold, value)) + end + end + end +end diff --git a/src/doc/genbook.sh b/src/doc/genbook.sh index ca23aadd1a..89a58590be 100755 --- a/src/doc/genbook.sh +++ b/src/doc/genbook.sh @@ -74,6 +74,8 @@ $(cat $mdroot/apps/test/README.md) $(cat $mdroot/apps/wall/README.md) +$(cat $mdroot/apps/rss/README.md) + # Libraries $(cat $mdroot/lib/README.checksum.md) diff --git a/src/lib/protocol/ipv4.lua b/src/lib/protocol/ipv4.lua index 8e6e40af24..4ecd82a9bf 100644 --- a/src/lib/protocol/ipv4.lua +++ b/src/lib/protocol/ipv4.lua @@ -35,6 +35,7 @@ ipv4._ulp = { [17] = "lib.protocol.udp", [47] = "lib.protocol.gre", [58] = "lib.protocol.icmp.header", + [1] = "lib.protocol.icmp.header", }, method = 'protocol' } ipv4:init( diff --git a/src/lib/protocol/tcp.lua b/src/lib/protocol/tcp.lua index 8e5aec189a..8df38aaf95 100644 --- a/src/lib/protocol/tcp.lua +++ b/src/lib/protocol/tcp.lua @@ -36,8 +36,8 @@ function tcp:new (config) local o = tcp:superClass().new(self) o:src_port(config.src_port) o:dst_port(config.dst_port) - o:seq_num(config.seq) - o:ack_num(config.ack) + o:seq_num(config.seq_num) + o:ack_num(config.ack_num) o:window_size(config.window_size) o:header().pad = 0 o:offset(config.offset or 0)