-
Notifications
You must be signed in to change notification settings - Fork 3
/
connector.py
100 lines (83 loc) · 3.46 KB
/
connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from cassandra.cluster import Cluster
from cassandra.metadata import Metadata
from cassandra.metadata import KeyspaceMetadata
from cassandra.metadata import TableMetadata
import re
import sys
from io import open
from schema_parser import SchemaParser
from cypher_queries_generator import CypherQueriesGenerator
import csv
import codecs
import argparse
class CassandraConnector(object):
"""
Handles exporting data from Cassandra to Neo4j
using CSV as intermediary
"""
def __init__(self, keyspace_name, tables=None, schema_file=None, queries=None):
self.KEYSPACE = keyspace_name
self.session = Cluster().connect(self.KEYSPACE)
self.keyspace_metadata = self.session.cluster.metadata.keyspaces[self.KEYSPACE]
self.tables = tables
self.schema_file = schema_file
def getTables(self):
if self.tables:
return self.tables
else:
return self.session.cluster.metadata.keyspaces[self.KEYSPACE].tables.keys()
def getColumnsForTable(self, table):
return self.session.cluster.metadata.keyspaces[self.KEYSPACE].tables[table].columns.keys()
def parse(self):
keyspace = self.session.cluster.metadata.keyspaces[self.KEYSPACE]
parser = SchemaParser(keyspace.export_as_string())
parser.parse()
sys.exit("Generated schema.yaml file.")
def export(self):
tableNames = self.getTables()
fileNames = [t + "_results.csv" for t in tableNames]
for t in tableNames:
results_file = codecs.open(t + "_results.csv", encoding='utf-8', mode='w+')
rows = self.session.execute('SELECT * FROM ' + t)
writer = csv.writer(results_file)
writer.writerow(self.getColumnsForTable(t))
writer.writerows([(e for e in row) for row in rows])
cypher_queries_gen = CypherQueriesGenerator(self.keyspace_metadata, schema_file)
cypher_queries_gen.generate()
cypher_queries_gen.build_queries(tableNames, fileNames)
class Neo4jNodeBuilder(object):
def __init__(self, keyspace, schema):
self.keyspace = keyspace
self.schema = schema
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("action", type=str, help="Specify action, 'parse' to generate schema.yaml file. Then 'export' to export to Neo4j.", choices=["parse", "export"])
parser.add_argument("-k", "--keyspace", type=str, help="Specify the Cassandra keyspace. If no keyspace is specified then 'playlist' will be used by default")
parser.add_argument("-t", "--tables", type=str, help="Specify Cassandra table(s) to export to Neo4j in a comma separated list. If not specified then all tables will be exported.")
parser.add_argument("-f", "--file", type=str, help="Specify the schema file which defines the Cassandra to Neo4j property graph data model mapping. If not specified then schema.yaml will be used.")
args=parser.parse_args()
#keyspace = ''
if args.keyspace:
keyspace = args.keyspace
else:
keyspace = 'playlist'
if args.file:
schema_file = args.file
else:
schema_file = "schema.yaml"
if args.action == "parse":
connector = CassandraConnector(keyspace)
connector.parse()
elif args.action == 'export':
if args.tables:
tables = args.tables.split(",")
connector = CassandraConnector(keyspace, tables=tables, schema_file=schema_file)
else:
connector = CassandraConnector(keyspace)
connector.export()
class Neo4jNodeBuilder(object):
def __init__(self, keyspace, schema):
self.keyspace = keyspace
self.schema = schema