-
Notifications
You must be signed in to change notification settings - Fork 6
/
sf_export
executable file
·176 lines (154 loc) · 6.55 KB
/
sf_export
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/env python3
from cmdlineparse import CmdlineParseExport
from datetime import datetime, timezone
import os
import re
from pathlib import Path
import shutil
from sfconfig import SfConfig
from sfconn import SfConn
from sflogger import SfLogger
from snowflake.connector.errors import ProgrammingError
sf_cfg = SfConfig('config.json')
cmdline = CmdlineParseExport()
logger = SfLogger(cmdline.args.log_level, __file__)
sf_conn = SfConn(sf_cfg.config, logger)
db_sc = cmdline.args.database_schema
export_dir = cmdline.args.export_dir
# Global variables
file_stats = {}
objects = {
'TABLE': '.tbl',
'EXTERNAL TABLE': '.tbl',
'DYNAMIC TABLE': '.tbl',
'VIEW': '.vw',
'PROCEDURE': '.pr',
'TASK': '.tsk',
'STREAM': '.stm',
'FILE_FORMAT': '.ff',
'PIPE': '.pipe',
'FUNCTION': '.func',
'SEQUENCE': '.seq'
}
## Todo:
## 1 Validate that db_sc only contains valid db_sc
# Validated that the names are valid
# Not yet validated: that names exist
## 2 Support extracts of quoted database/schema + quoted objects
## requires fixing cmdlineparse.py:db_sc_validate (call it db_sc_quoted_validate)
## 3 Support extracts of % in db/sc like YOURNAMESPACE_PRD_%.%
# --list overrides everything else
if cmdline.args.list is not False:
db_sc = sf_conn.list_db_sc()
for row in db_sc:
print(f"{row[0]}.{row[1]}")
sf_conn.close_conn()
exit(0)
# --all uses same functionality as --list to derive the list of schemas to extract
if cmdline.args.all is not False:
db_sc = sf_conn.list_db_sc()
logger.debug(f"Creating {export_dir} if it does not exist")
os.makedirs(export_dir, 0o770, exist_ok=True)
script_dir = os.getcwd()
logger.debug(f"Changing directory to {export_dir}")
os.chdir(export_dir)
working_dir = os.getcwd()
logger.debug(f"Script Dir: {script_dir} Working Dir: {working_dir}")
db_sc_dir = ''
# TODO: additional check before we get here, if only validated from cmdlineparse.py
# We will need to check if they're valid names
if cmdline.args.list is False and cmdline.args.all is False:
# This was provided through --database_schema
# find a way to validate db_sc other than ensure valid names
logger.debug("Need to validate all entries in db_sc")
# Future implementation: allow for wildcards here
def write_file(file_nm, content, mode='w'):
fobj = open(file_nm, mode)
fobj.write(content)
fobj.write('\n')
fobj.close()
def populate_file_stats():
# Populate file_stats for the directory
# Last modified time for easy use to see if file is older than object in database
# Seen to easily check if there are files that need to be removed
file_stats.clear()
dir = Path(db_sc_dir).glob('*.*')
for file in dir:
if file.is_file():
file_name = str(os.path.relpath(file, db_sc_dir))
file_last_modified = datetime.fromtimestamp(file.stat().st_mtime, tz=timezone.utc)
file_stats[file_name] = { "seen": 0, "file_last_modified": file_last_modified }
def export_object(db_nm, sc_nm, obj_type, obj_name, obj_last_modified, arguments=None):
mode = 'w' # except if it is a stored proc/function and there are multiple
file_nm = f"{obj_name}{objects[obj_type]}"
file = Path(file_nm)
#logger.debug(f"{obj_name} as {file_nm} - {obj_last_modified}")
if file.is_file():
if file_nm in file_stats:
file_last_modified = file_stats[file_nm]['file_last_modified']
file_stats[file_nm]['seen'] = 1
if file_last_modified >= obj_last_modified:
logger.info(f"No change - {obj_type} {obj_name} modified {obj_last_modified} =< file modified {file_last_modified}")
return
else:
cur_last_modified = datetime.fromtimestamp(file.stat().st_mtime, tz=timezone.utc)
logger.warning(f"Changed: {obj_type} {obj_name} modified {obj_last_modified} > file modified {file_last_modified}")
if cur_last_modified > file_last_modified:
logger.warning(f"Multiple objects being appended to {file_nm} setting file write mode='a'")
mode='a'
else:
logger.warning(f"File {file_nm} exists, but has no file_stats entry - append for multiple procs/functions")
mode = 'a'
content = ''
try:
if arguments is not None:
sproc = re.sub(' RETURN .*','', arguments)
logger.info(f"Extracting {obj_type}:{obj_name}{sproc}")
content = sf_conn.get_ddl(db_nm, sc_nm, obj_type, f"{obj_name}{sproc}")
else:
content = sf_conn.get_ddl(db_nm, sc_nm, obj_type, obj_name)
except ProgrammingError as e:
logger.warning(f"Failure to get_ddl for {obj_type} {obj_name}")
# write it to the file
logger.info(f"Writing {obj_type}:{obj_name} to {file_nm}")
write_file(file_nm, content, mode) # append for proc/function
def export_schema(db_nm, sc_nm):
curs = sf_conn.cursor(db_nm, sc_nm)
populate_file_stats()
for obj_type in objects.keys():
logger.debug(f"Extracting {obj_type}s in {db_nm}.{sc_nm}")
schema_objects = []
try:
schema_objects = sf_conn.get_objs_by_type(db_nm, sc_nm, obj_type)
except ProgrammingError as e:
logger.warning(f"Failure to extract {obj_type} in {db_nm}.{sc_nm}")
pass
if schema_objects is not None:
for schema_obj in schema_objects:
# obj_nm, obj_last_modified, arguments
export_object(db_nm, sc_nm, obj_type, *schema_obj[:3])
# Check for files in db.sc directory that haven't been seen
for file in file_stats.keys():
if file_stats[file]['seen'] == 0:
if cmdline.args.delete:
logger.warning(f"{db_nm}.{sc_nm}:{file} not seen - deleting")
os.remove(file)
else:
logger.warning(f"{db_nm}.{sc_nm}:{file} not seen in this run")
# Main processing
for row in db_sc:
db, sc = row
logger.info(f"Extracting {db}.{sc}")
logger.debug(f"Creating directory {working_dir}/{db}.{sc}")
# will this work with uppercase/mixed case
os.makedirs(f"{db}.{sc}/", 0o770, exist_ok=True)
logger.debug(f"Changing directory to {db}.{sc}/")
os.chdir(f"{db}.{sc}")
db_sc_dir = os.getcwd()
logger.debug(f"Extracting {db}.{sc} to directory {db_sc_dir}")
# Dump out all the objects in the schema
export_schema(db, sc)
logger.debug(f"Changing directory to {working_dir}")
os.chdir(working_dir)
sf_conn.close_conn()
exit(0)