-
Notifications
You must be signed in to change notification settings - Fork 218
0.0.13 to 1.0.0 Migration Guide
The move from 0.0.13 to 1.0.0 introduces a few breaking changes for streamparse users, this guide provides a number of things to watch out for when upgrading:
streamparse now has a much better way of running your Python-based spouts and bolts. We're introducing a new streamparse runner and our own python-spout-spec
and python-bolt-spec
clojure functions.
First up, to get access to the new functionality, you'll have to update your streamparse clojure library to [com.parsely/streamparse "0.0.4-SNAPSHOT"]
.
Then, this requires two changes to topology definitions: the definition itself needs to be a function instead of a variable, and python spouts/bolts need to use the new DSL calls. Using the wordcount example, we now have:
(ns wordcount
(:use [streamparse.specs])
(:gen-class))
(defn wordcount [options] ;; now takes the ``options`` argument
[
;; spout configuration
{"word-spout" (python-spout-spec ;; uses ``python-spout-spec`` -- note the change in arguments
options ;; options that were passed into the topology definition
"spouts.words.WordSpout" ;; class name to run
["word"] ;; same as before
)
}
;; bolt configuration
{"count-bolt" (python-bolt-spec ;; uses ``python-bolt-spec`` -- note the change in arguments
options ;; options passed into topology definition
{"word-spout" :shuffle} ;; same as before
"bolts.wordcount.WordCounter" ;; class name to run
["word" "count"] ;; same as before
:p 2 ;; same as before
)
}
]
)
There are some pretty big benefits with this change:
- Nested folders within
src/
- Multiple bolt/spout classes per file
- No need for the
if __name__ == '__main__': BoltClass().run()
in each file
For more information about the two new functions, see the quickstart documentation
Starting in 1.0.0, all Bolts have three configuration parameters:
-
auto_ack - after
process
(orprocess_batch
) is called, ack the processed tuple(s) (default:True
) -
auto_anchor - any calls to
.emit
or.emit_many
will anchored the emitted tuple to the current tuple (default:True
) -
auto_fail - if an exception is raised during
process
orprocess_batch
, streamparse will fail the tuple before exiting (default:True
)
By default, all Bolt subclasses will have these parameters set to True
, to maintain a one-to-one mapping with your existing Bolts, we've written the following handy reference of how to create non-breaking versions of your existing Bolts:
New Param | Old Equivalent Value |
---|---|
auto_ack | no |
auto_anchor | no |
auto_fail | no |
Non breaking new version:
class MyBolt(Bolt):
auto_ack = False
auto_anchor = False
auto_fail = False
New Param | Old Equivalent Value |
---|---|
auto_ack | yes |
auto_anchor | yes |
auto_fail | no |
Non breaking new version of old BasicBolt:
class MyBolt(Bolt):
auto_fail = False
New Param | Old Equivalent Value |
---|---|
auto_ack | yes |
auto_anchor | no |
auto_fail | no |
Non breaking new version of old BatchingBolt
:
class MyBatchingBolt(BatchingBolt):
auto_anchor = False
auto_fail = False
BatchingBolt
also had the SECS_BETWEEN_BATCHES
param renamed to secs_between_batches
to fall in line with the other flags we've added. In your BatchingBolt
s, you'll need to update this setting:
class MyBatchingBolt
secs_between_batches = 3
auto_anchor = False
auto_fail = False
Python logging is now enabled for streamparse projects. To get started with it, modify your project's config.json file and add a new "log"
dictionary per environment:
{
"library": "",
"topology_specs": "topologies/",
"virtualenv_specs": "virtualenvs/",
"envs": {
"prod": {
"user": "",
"nimbus": "",
"workers": [],
"log": {
"path": "",
"max_bytes": 1000000,
"backup_count": 10,
"level": "info"
},
"virtualenv_root": ""
}
}
}
In the above snippet, log.path
has replaced the old log_path
setting and three new settings have been added:
- max_bytes - max bytes per log file
- backup_count - backups per log file
- level - the logging level to set for the root logger (
logging.getLogger()
), can be one of: critical, error, warning, info or debug
streamparse will send all log messages sent via the logging
module to a the following file path on Storm workers:
/path/to/logs/streamparse_<topology_name>_<component_name>_<task_id>_<process_id>.log
More information can be found in the 1.0.0 docs.