Skip to content

Commit

Permalink
Parse order preserving cassandra token.
Browse files Browse the repository at this point in the history
Fixes #202.
  • Loading branch information
clutchski committed Sep 27, 2012
1 parent 47ab7b1 commit 34600c2
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 12 deletions.
54 changes: 44 additions & 10 deletions checks/cassandra.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Check cassandra cluster health via nodetool.
"""
from subprocess import Popen, PIPE
from collections import deque
import os.path
import re
import itertools
Expand Down Expand Up @@ -35,8 +36,44 @@ def _find(lines, regex, postprocess=_fst, all=False):
return None
else:
return res[0]


def _parseInfoToken(self, lines):
"""
Extract the token from the cassandra info
v 0.7
36299342986353445520010708318471778930
v 0.8
Token : 51022655878160265769426795515063697984
With order preserving partitioner
Token : Token(bytes[56713727820156410577229101240436610842])
"""

# The first regex to match will return the token.
regexes = deque([
r"^(\d+)$", # Version 0.7
r"Token.*\(bytes\[(\d+)\]\)", # Version 0.8 with order preserving
r"^Token[^:]+: ([0-9]+)$" # Version 0.8
])

token = None
while not token and regexes:
r = regexes.popleft()
token = self._find(lines, r)

if not token:
raise Exception("Couldn't find token in:\n %s" % "\n".join(lines))

# Convert token to a float since it does not fit in a 2**64 value.
# The loss of precision does not really matter since a well-balanced cluster
# will have markedly different tokens across all nodes.
return float(token)


def _parseInfo(self, info, results):
def _parseInfo(self, info, results, logger):
"""
v 0.7
Expand Down Expand Up @@ -66,14 +103,11 @@ def convert_size(g):
return str(int(float(size) * self. UNITS_FACTOR[unit]))

lines = info.split("\n")
# Convert token to a float since it does not fit in a 2**64 value.
# The loss of precision does not really matter since a well-balanced cluster
# will have markedly different tokens across all nodes.
t = Cassandra._find(lines, r"^(\d+)$")
if t: # v0.7
results["token"] = float(t)
else: # v0.8
results["token"] = float(Cassandra._find(lines, r"^Token[^:]+: ([0-9]+)$"))

try:
results["token"] = self._parseInfoToken(lines)
except:
logger.exception("Unable to parse Cassandra token. Continuing. Stacktrace:")

results["load"] = float(Cassandra._find(lines,
r"^Load[^:]+:\s+([0-9.]+).*([KMGT]B|bytes)$", postprocess=convert_size))
Expand Down Expand Up @@ -293,7 +327,7 @@ def check(self, logger, agentConfig):

# nodetool info
pipe = Popen("%s %s" % (nodetool_cmd, "info"), shell=True, universal_newlines=True, bufsize=bufsize, stdout=PIPE, stderr=None).stdout
self._parseInfo(pipe.read(), results)
self._parseInfo(pipe.read(), results, logger)
logger.debug("Cassandra info: %s" % results)
pipe.close()

Expand Down
10 changes: 10 additions & 0 deletions tests/cassandra/info.opp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Token : Token(bytes[56713727820156410577229101240436610842])
Generation No : 1331653944
Gossip active : true
Load : 283.87 GB
Generation No : 1331653944
Uptime (seconds) : 188319
Heap Memory (MB) : 2527.04 / 3830.00
Data Center : 28
Rack : 76
Exceptions : 0
32 changes: 30 additions & 2 deletions tests/test_cassandra.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,51 @@
import logging
import unittest
import os
import os.path

from nose.plugins.attrib import attr

from checks.cassandra import Cassandra



logger = logging.getLogger(__name__)

class TestCassandra(unittest.TestCase):
def setUp(self):
self.info = open(os.path.join(os.path.dirname(__file__), "cassandra", "info"), "r").read()
self.info8 = open(os.path.join(os.path.dirname(__file__), "cassandra", "info.8"), "r").read()
self.tpstats = open(os.path.join(os.path.dirname(__file__), "cassandra", "tpstats"), "r").read()
self.tpstats8 = open(os.path.join(os.path.dirname(__file__), "cassandra", "tpstats.8"), "r").read()
self.cfstats = open(os.path.join(os.path.dirname(__file__), "cassandra", "cfstats"), "r").read()
self.info_opp = open(os.path.join(os.path.dirname(__file__), "cassandra", "info.opp"), "r").read()
self.c = Cassandra()

def tearDown(self):
pass

@attr('cassandra')
def testParseInfoOpp(self):
# Assert we can parse tokens from nodes using the order preserving
# partitioner.
res = {}
self.c._parseInfo(self.info_opp, res, logger)
self.assertNotEquals(len(res.keys()), 0)
self.assertEquals(res.get("token"), 5.6713727820156407e+37)
# self.assertEquals(res.get("load"), 304803091578.0)
# self.assertEquals(res.get("uptime"), 188319)
# self.assertEquals(res.get("heap_used"), 2527.04)
# self.assertEquals(res.get("heap_total"), 3830.0)
# self.assertEquals(res.get("datacenter"), 28)
# self.assertEquals(res.get("rack"), 76)
# self.assertEquals(res.get("exceptions"), 0)
#

@attr('cassandra')
def testParseInfo(self):
res = {}
# v0.7
self.c._parseInfo(self.info, res)
self.c._parseInfo(self.info, res, logger)
self.assertNotEquals(len(res.keys()), 0)
self.assertEquals(res.get("load"), 467988.0)
self.assertEquals(res.get("token"), 3.6299342986353447e+37)
Expand All @@ -28,7 +54,7 @@ def testParseInfo(self):
self.assertEquals(res.get("heap_total"), 1019.88)
# v0.8
res = {}
self.c._parseInfo(self.info8, res)
self.c._parseInfo(self.info8, res, logger)
self.assertNotEquals(len(res.keys()), 0)
self.assertEquals(res.get("load"), 304803091578.0)
self.assertEquals(res.get("token"), 5.102265587816026e+37)
Expand All @@ -39,11 +65,13 @@ def testParseInfo(self):
self.assertEquals(res.get("rack"), 76)
self.assertEquals(res.get("exceptions"), 0)

@attr('cassandra')
def testParseCfstats(self):
res = {}
self.c._parseCfstats(self.cfstats, res)
self.assertNotEquals(len(res.keys()), 0)

@attr('cassandra')
def testParseTpstats(self):
res = {}
self.c._parseTpstats(self.tpstats, res)
Expand Down

0 comments on commit 34600c2

Please sign in to comment.