forked from thepaul/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththrift_hsha_test.py
130 lines (108 loc) · 4.47 KB
/
thrift_hsha_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import glob
import os
import shlex
import subprocess
import time
import unittest
import pytest
import logging
from dtest import DEFAULT_DIR, Tester, create_ks
from thrift_test import get_thrift_client
from tools.jmxutils import JolokiaAgent, make_mbean, remove_perf_disable_shared_mem
since = pytest.mark.since
logger = logging.getLogger(__name__)
JNA_PATH = '/usr/share/java/jna.jar'
ATTACK_JAR = 'lib/cassandra-attack.jar'
# Use jna.jar in {CASSANDRA_DIR,DEFAULT_DIR}/lib/, since >=2.1 needs correct version
try:
if glob.glob('%s/lib/jna-*.jar' % os.environ['CASSANDRA_DIR']):
logger.debug('Using jna.jar in CASSANDRA_DIR/lib..')
JNA_IN_LIB = glob.glob('%s/lib/jna-*.jar' % os.environ['CASSANDRA_DIR'])
JNA_PATH = JNA_IN_LIB[0]
except KeyError:
if glob.glob('%s/lib/jna-*.jar' % DEFAULT_DIR):
print ('Using jna.jar in DEFAULT_DIR/lib/..')
JNA_IN_LIB = glob.glob('%s/lib/jna-*.jar' % DEFAULT_DIR)
JNA_PATH = JNA_IN_LIB[0]
@since('2.0', max_version='4')
class TestThriftHSHA(Tester):
def test_closing_connections(self):
"""
@jira_ticket CASSANDRA-6546
Test CASSANDRA-6546 - do connections get closed when disabling / renabling thrift service?
"""
cluster = self.cluster
cluster.set_configuration_options(values={
'start_rpc': 'true',
'rpc_server_type': 'hsha',
'rpc_max_threads': 20
})
cluster.populate(1)
(node1,) = cluster.nodelist()
remove_perf_disable_shared_mem(node1)
cluster.start(wait_for_binary_proto=True)
session = self.patient_cql_connection(node1)
create_ks(session, 'test', 1)
session.execute("CREATE TABLE \"CF\" (key text PRIMARY KEY, val text) WITH COMPACT STORAGE;")
def make_connection():
host, port = node1.network_interfaces['thrift']
client = get_thrift_client(host, port)
client.transport.open()
return client
pools = []
connected_thrift_clients = make_mbean('metrics', type='Client', name='connectedThriftClients')
for i in range(10):
logger.debug("Creating connection pools..")
for x in range(3):
pools.append(make_connection())
logger.debug("Disabling/Enabling thrift iteration #{i}".format(i=i))
node1.nodetool('disablethrift')
node1.nodetool('enablethrift')
logger.debug("Closing connections from the client side..")
for client in pools:
client.transport.close()
with JolokiaAgent(node1) as jmx:
num_clients = jmx.read_attribute(connected_thrift_clients, "Value")
assert int(num_clients) == 0, "There are still open Thrift connections after stopping service " + str(num_clients)
@unittest.skipIf(not os.path.exists(ATTACK_JAR), "No attack jar found")
@unittest.skipIf(not os.path.exists(JNA_PATH), "No JNA jar found")
def test_6285(self):
"""
@jira_ticket CASSANDRA-6285
Test CASSANDRA-6285 with Viktor Kuzmin's attack jar.
This jar file is not a part of this repository, you can
compile it yourself from sources found on CASSANDRA-6285. This
test will be skipped if the jar file is not found.
"""
cluster = self.cluster
cluster.set_configuration_options(values={
'start_rpc': 'true',
'rpc_server_type': 'hsha',
'rpc_max_threads': 20
})
# Enable JNA:
with open(os.path.join(self.test_path, 'test', 'cassandra.in.sh'), 'w') as f:
f.write('CLASSPATH={jna_path}:$CLASSPATH\n'.format(
jna_path=JNA_PATH))
cluster.populate(2)
nodes = (node1, node2) = cluster.nodelist()
[n.start(use_jna=True) for n in nodes]
logger.debug("Cluster started.")
session = self.patient_cql_connection(node1)
create_ks(session, 'tmp', 2)
session.execute("""CREATE TABLE "CF" (
key blob,
column1 timeuuid,
value blob,
PRIMARY KEY (key, column1)
) WITH COMPACT STORAGE;
""")
logger.debug("running attack jar...")
p = subprocess.Popen(shlex.split("java -jar {attack_jar}".format(attack_jar=ATTACK_JAR)))
p.communicate()
logger.debug("Stopping cluster..")
cluster.stop()
logger.debug("Starting cluster..")
cluster.start(no_wait=True)
logger.debug("Waiting 10 seconds before we're done..")
time.sleep(10)