Skip to content

0.0.13 to 1.0.0 Migration Guide

Mike Sukmanowsky edited this page Aug 25, 2014 · 3 revisions

The move from 0.0.13 to 1.0.0 introduces a few breaking changes for streamparse users, this guide provides a number of things to watch out for when upgrading:

New runner

streamparse now has a much better way of running your Python-based spouts and bolts. We're introducing a new streamparse runner and our own python-spout-spec and python-bolt-spec clojure functions.

First up, to get access to the new functionality, you'll have to update your streamparse clojure library to [com.parsely/streamparse "0.0.4-SNAPSHOT"].

Then, this requires two changes to topology definitions: the definition itself needs to be a function instead of a variable, and python spouts/bolts need to use the new DSL calls. Using the wordcount example, we now have:

(ns wordcount
  (:use     [streamparse.specs])
  (:gen-class))

(defn wordcount [options] ;; now takes the ``options`` argument
   [
    ;; spout configuration
    {"word-spout" (python-spout-spec ;; uses ``python-spout-spec`` -- note the change in arguments
          options                    ;; options that were passed into the topology definition
          "spouts.words.WordSpout"   ;; class name to run
          ["word"]                   ;; same as before
          )
    }
    ;; bolt configuration
    {"count-bolt" (python-bolt-spec  ;; uses ``python-bolt-spec`` -- note the change in arguments
          options                    ;; options passed into topology definition
          {"word-spout" :shuffle}    ;; same as before
          "bolts.wordcount.WordCounter"  ;; class name to run
          ["word" "count"]           ;; same as before
          :p 2                       ;; same as before
          )
    }
  ]
)

There are some pretty big benefits with this change:

  • Nested folders within src/
  • Multiple bolt/spout classes per file
  • No need for the if __name__ == '__main__': BoltClass().run() in each file

For more information about the two new functions, see the quickstart documentation

New Bolt API

Starting in 1.0.0, all Bolts have three configuration parameters:

  • auto_ack - after process (or process_batch) is called, ack the processed tuple(s) (default: True)
  • auto_anchor - any calls to .emit or .emit_many will anchored the emitted tuple to the current tuple (default: True)
  • auto_fail - if an exception is raised during process or process_batch, streamparse will fail the tuple before exiting (default: True)

By default, all Bolt subclasses will have these parameters set to True, to maintain a one-to-one mapping with your existing Bolts, we've written the following handy reference of how to create non-breaking versions of your existing Bolts:

Old Bolt

New Param Old Equivalent Value
auto_ack no
auto_anchor no
auto_fail no

Non breaking new version:

    class MyBolt(Bolt):

        auto_ack = False
        auto_anchor = False
        auto_fail = False

Old BasicBolt

New Param Old Equivalent Value
auto_ack yes
auto_anchor yes
auto_fail no

Non breaking new version of old BasicBolt:

    class MyBolt(Bolt):

        auto_fail = False

Old BatchingBolt:

New Param Old Equivalent Value
auto_ack yes
auto_anchor no
auto_fail no

Non breaking new version of old BatchingBolt:

class MyBatchingBolt(BatchingBolt):

    auto_anchor = False
    auto_fail = False

BatchingBolt also had the SECS_BETWEEN_BATCHES param renamed to secs_between_batches to fall in line with the other flags we've added. In your BatchingBolts, you'll need to update this setting:

class MyBatchingBolt
    
    secs_between_batches = 3
    auto_anchor = False
    auto_fail = False

Python Logging

Python logging is now enabled for streamparse projects. To get started with it, modify your project's config.json file and add a new "log" dictionary per environment:

{
    "library": "",
    "topology_specs": "topologies/",
    "virtualenv_specs": "virtualenvs/",
    "envs": {
        "prod": {
            "user": "",
            "nimbus": "",
            "workers": [],


            "log": {
                "path": "",
                "max_bytes": 1000000,
                "backup_count": 10,
                "level": "info"
            },


            "virtualenv_root": ""
        }
    }
}

In the above snippet, log.path has replaced the old log_path setting and three new settings have been added:

  • max_bytes - max bytes per log file
  • backup_count - backups per log file
  • level - the logging level to set for the root logger (logging.getLogger()), can be one of: critical, error, warning, info or debug

streamparse will send all log messages sent via the logging module to a the following file path on Storm workers:

/path/to/logs/streamparse_<topology_name>_<component_name>_<task_id>_<process_id>.log

More information can be found in the 1.0.0 docs.

Clone this wiki locally