Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Proposed changes
This PR enables the splitting of the SQL rows to insert into batches. If the size of the list of rows the Translator has to insert is greater than a configured value
M
, the rows get split into smaller batches (lists), each having a size no greater thanM
, and each batch gets inserted separately, i.e. the Translator issues a separate SQL bulk insert for each batch. We do this since some backends (e.g. Crate) limit how much data you can shovel in a single SQL (bulk) insert statement---see #445 about it.Splitting happens as explained in the notes below, using a cost function to compute how much data each row to insert holds in memory and a maximum batch size
M
(= cost in bytes) read from the env---see the notes below about configuration.Types of changes
Checklist
Further comments
Splitting spec
We split a stream in batches so the cumulative cost of each batch is within a set cost goal. Given an input stream
s
and a cost functionc
, we want to produce a sequence of streamsb
such that joining theb
streams yieldss
and, for eachb
stream of length> 1
, mappingc
to each element and summing the costs yields a value≤ M
, whereM
is a configured cost goal. In symbols:Notice it can happen that to make batches satisfying (1) and (2), some
b[k]
contains just one elementx > M
since that doesn't violate (1) and (2).Implementation
We use Python streams to process data in constant space and linear time. Working with Python streams is anything but easy in my opinion so the implementation looks quite involved but the concept is fairly simple. In fact, for the mathematically inclined soul out there, the Python implementation is doing what this recursively defined function does (using Haskell-y syntax for lists), only in a more obscure way
Notice this isn't a solution to #193 but is certainly one piece of the puzzle if we want to piece together a stream-based architecture. Why should we care? Well, even if we split the insert into batches, we still have two huge datasets in memory: the Python representation of the input NGSI JSON doc and its translation to tabular format. Ouch, not exactly a big-data friendly design. In an ideal world, the notify endpoint would work in constant space and linear time...
Configuration
There's a new
INSERT_MAX_SIZE
env var to turn on the splitting into batches. If set, this variable limits how much data you can shovel in a single SQL bulk insert to a valueM
---see above for the details of how data gets split into batches of at most sizeM
. We read this variable in on each API call to thenotify
endpoint so it's sort of dynamic that way and will affect every later insert operation. Accepted values are sizes in bytes (B) or2^10
multiples (KiB, MiB, GiB), e.g.10 B
,1.2 KiB
,0.9 GiB
. (Technically, anything bitmath can digest will do, e.g.MB
,kB
, and friends.) If the variable isn't set (or the set value isn't valid), the Translator processes SQL inserts normally without splitting data into batches.