This repository has been archived by the owner on Sep 20, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 57
/
IoManager.py
289 lines (260 loc) · 13.6 KB
/
IoManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# -*- coding: utf-8 -*-
"""
:mod:`EdgarRenderer.IoManager`
~~~~~~~~~~~~~~~~~~~
Edgar(tm) Renderer was created by staff of the U.S. Securities and Exchange Commission.
Data and content created by government employees within the scope of their employment
are not subject to domestic copyright protection. 17 U.S.C. 105.
"""
from os import getpid, remove, makedirs, listdir # , getenv
from os.path import basename, isfile, abspath, isdir, dirname, exists, join, splitext, normpath
from io import IOBase
import json, shutil, sys, datetime, os, zipfile
import regex as re
import arelle.XbrlConst
from lxml.etree import tostring as treeToString
from . import Utils
jsonIndent = 1 # None for most compact, 0 for left aligned
def genpath(filename):
if filename == '.':
filename = basename(abspath(filename))
return "{0}-{1}-{2:06d}".format(re.sub(r'[:\-\\.]', '', str(datetime.datetime.now())).replace(' ', '-')
# .translate(str.maketrans(" :.","---"))
, splitext(basename(filename))[0], getpid())
def createNewFolder(controller,path,stubname="."):
newpath = join(path, genpath(stubname))
controller.createdFolders += [newpath]
return newpath
def cleanupNewfolders(controller):
for f in controller.createdFolders:
shutil.rmtree(f,ignore_errors=True)
def absPathOnPythonPath(controller, filename): # if filename is relative, find it on the PYTHONPATH, otherwise, just return it.
if filename is None: return None
if os.path.isabs(filename): return filename
# for plugin configuration look in the plugin's own directory first
pathdirs = [p
for p in [os.path.dirname(__file__)] + sys.path
if os.path.isdir(p)]
for path in pathdirs:
result = os.path.join(path, filename)
if exists(result): return os.path.abspath(result)
controller.logDebug("No such location {} found in sys path dirs {}.".format(filename, pathdirs))
return None
def writeXmlDoc(filing, etree, reportZip, reportFolder, filename, zipDir=""):
xmlText = treeToString(etree.getroottree(), method='xml', with_tail=False, pretty_print=True, encoding='utf-8', xml_declaration=True)
if reportZip:
reportZip.writestr(zipDir + filename, xmlText)
elif reportFolder is not None:
filing.writeFile(os.path.join(reportFolder, filename), xmlText)
def writeHtmlDoc(filing, root, reportZip, reportFolder, filename, zipDir=""):
htmlText = treeToString(root, method='html', with_tail=False, pretty_print=True, encoding='utf-8')
if reportZip:
reportZip.writestr(zipDir + filename, htmlText)
elif reportFolder is not None:
filing.writeFile(os.path.join(reportFolder, filename), htmlText)
def writeJsonDoc(lines, pathOrStream, sort_keys=True):
if isinstance(pathOrStream, str):
with open(pathOrStream, mode='w') as f:
json.dump(lines, f, sort_keys=sort_keys, indent=jsonIndent)
elif isinstance(pathOrStream, IOBase): # path is an open file
json.dump(lines, pathOrStream, sort_keys=sort_keys, indent=jsonIndent)
def moveToZip(zf, abspath, zippath):
if isfile(abspath) and not isFileHidden(abspath):
zf.write(abspath, zippath, zipfile.ZIP_DEFLATED)
remove(abspath)
def move_clobbering_file(src, dst): # this works across Windows drives, simple rename does not.
if isdir(dst):
dstfolder = dst
dstfile = basename(src)
else:
dstfolder = dirname(dst)
dstfile = basename(dst)
if not exists(dstfolder): makedirs(dstfolder, exist_ok=True)
destination = join(dstfolder, dstfile)
if exists(join(dstfolder, dstfile)): remove(destination)
shutil.copy2(src, destination)
try:
remove(src)
except OSError as _err:
# HF: fix msg in next release ("Non fatal Cleanup problem: {}".format(_err))
return None
return destination
def handleFolder(controller, folderName, mustBeEmpty, forceClean): # return success
if not isdir(folderName): # must exist and be a directory, not file
if exists(folderName):
controller.logDebug(_("Folder {} exists and is not a directory.").format(folderName), file=basename(__file__))
else:
makedirs(folderName, exist_ok=True)
else:
fileList = listdir(folderName)
if forceClean:
for file in fileList:
fullfilepath = join(folderName , file)
if isdir(fullfilepath):
shutil.rmtree(fullfilepath)
else:
remove(fullfilepath)
elif mustBeEmpty and len(fileList) > 0 :
controller.logDebug(_("Folder {} exists and is not empty.").format(folderName), file=basename(__file__))
def getConfigFile(controller, options):
if options.configFile is None: return None
_localConfigFile = os.path.join(os.getcwd(), options.configFile)
if os.path.exists(_localConfigFile):
return(_localConfigFile)
configFile = absPathOnPythonPath(controller, options.configFile)
return configFile
def logConfigFile(controller, options):
configFileTemp = getConfigFile(controller, options)
if configFileTemp:
controller.logInfo("Contents of configuration file '{}':".format(configFileTemp), file=basename(__file__))
with open(configFileTemp, "r") as ins:
for line in ins:
controller.logInfo(line.strip(), file=basename(__file__))
controller.logInfo("sys.argv {}".format(sys.argv), file=basename(__file__))
def isFileHidden(p):
p = basename(p)
if p.startswith('.'):
return True
if sys.platform.startswith("win"):
# import win32api, win32con
# attribute = win32api.GetFileAttributes(p)
# return attribute & (win32con.FILE_ATTRIBUTE_HIDDEN | win32con.FILE_ATTRIBUTE_SYSTEM)
if p == "Thumbs.db":
return True
return False
def unpackInput(controller, options, filesource): # success
# with side effect on controller entrypointFolder, processingFolder, instanceList,otherXbrlList,inlineList,supplementList
# Process options, setting self.entrypointFolder and figuring out whether it is:
# 1. a zip file that may contain multiple instances
# 2. a single instance file
# 3. a folder that may contain multiple instances
# and unpack (i.e, copy) that input to a processing folder.
# return success (boolean)
unpacked = 0
controller.instanceList = []
controller.inlineList = []
controller.otherXbrlList = []
controller.supplementList = []
# an absolute path for processing folder root can be specified in the configuration file.
# HF: controller.originalProcessingFolder = join(getenv("TEMP"), controller.processingFolder)
# HF: controller.processingFolder = createNewFolder(controller,controller.originalProcessingFolder, options.entrypoint)
knownSingleInput = None
try:
# Case 1: entry point is a zip file.
if controller.processInZip:
for base in filesource.dir:
if not base.startswith('.'):
fileStream, _encoding = filesource.file(filesource.baseurl + "/" + base)
if isSurvivor(controller, "zip", base, None, fileStream):
unpacked += 1
fileStream.close()
elif filesource.isZip:
controller.logDebug(_("Extracting from zip file."), file=basename(__file__))
zf = zipfile.ZipFile(options.entrypoint, 'r')
for base in zf.namelist():
if base.startswith('./'): # prevent errors arising from windows file system foolishness
base = normpath(base)
target = join(controller.processingFolder, base)
with open(target, 'wb') as fp:
fp.write(zf.read(base)) # unzip to the processing folder.
if isSurvivor(controller, "zip", base, None, target):
unpacked += 1
else: # Not a zip file.
# Case 2: Entry point is a single file.
# Treat it as if the entrypoint were its parent folder.
# This does create a problem if there are multiple instances, because it
# will copy extra non-instance files for processing.
# TODO: give a warning when XBRL files are copied to target but then not used in the DTS.
if isfile(options.entrypoint) and not isdir(options.entrypoint):
knownSingleInput = basename(options.entrypoint)
# Case 1: Entry point is a folder. Copy everything except unknown instances and inlines
controller.logDebug(_("Copying from Input folder {}").format(controller.entrypointFolder), file=basename(__file__))
for base in listdir(controller.entrypointFolder):
if not base.startswith("."):
source = join(controller.entrypointFolder, base)
if isFileHidden(source) or isdir(source): continue
target = join(controller.processingFolder, base)
shutil.copy(source, target)
if isSurvivor(controller, "folder", base, knownSingleInput, target):
unpacked += 1
except Exception as e:
unpacked = 0
controller.logError(_("Exception raised during file unpacking: {}").format(e), file='IoManager.py')
return False
if len(controller.instanceList) == 0 and len(controller.inlineList) == 0:
controller.entrypoint = basename(options.entrypoint)
controller.logError(_("No instance or inline document found!"))
return False
controller.logDebug(_("{} Files copied to processing folder {}").format(unpacked, controller.processingFolder), file=basename(__file__))
return True
def isSurvivor(controller, original, base, entry, targetOrStream): # return boolean
oktocopy = Utils.isImageFilename(base) or Utils.isXmlFilename(base) or Utils.isInlineFilename(base)
if not oktocopy: # Found a file that doesn't fit
controller.logInfo(_("Ignoring file {} of unknown type found in folder or zip.").format(base), file=basename(__file__))
if isinstance(targetOrStream, str): # file name, not filesource
remove(targetOrStream)
return False
if Utils.isImageFilename(base):
controller.logDebug("Found Image in {0}: {1}".format(original, base), file=basename(__file__))
controller.supplementList += [base]
return True
result = getQName(controller, targetOrStream)
ns = ln = ixns = None
if result is not None:
ns, ln, ixns = result
if (ns, ln, ixns) == (arelle.XbrlConst.xhtml, 'html', arelle.XbrlConst.ixbrl11):
if entry is None or entry == base:
controller.logDebug("Found Inline 1.1 Doc in {0}: {1}".format(original, base), file=basename(__file__))
controller.inlineList += [base]
else:
controller.logDebug("Ignoring Inline 1.1 Doc in {0} not the specified entry {1}: {2}"
.format(original, entry, base), file=basename(__file__))
return False
elif (ns, ln) == (arelle.XbrlConst.xhtml, 'html') and ixns in arelle.XbrlConst.ixbrlAll:
controller.logDebug("Only Inline 1.1 is supported, ignoring Inline 1.0 doc {0} in {1}".format(original,base),file=basename(__file__))
elif (ns, ln) == (arelle.XbrlConst.xbrli, 'xbrl'):
if entry is None or entry == base:
controller.logDebug("Found Instance Doc in {0}: {1}".format(original, base), file=basename(__file__))
controller.instanceList += [base]
else:
controller.logDebug("Ignoring Instance Doc in {0} not the specified entry {1}: {2}"
.format(original, entry, base), file=basename(__file__))
return False
elif (ns, ln) == (arelle.XbrlConst.link, 'linkbase'):
controller.logDebug("Found Linkbase in {}: {}".format(original, base), file=basename(__file__))
controller.otherXbrlList += [base]
elif (ns, ln) == (arelle.XbrlConst.xsd, 'schema'):
controller.logDebug("Found schema in {}: {}".format(original, base), file=basename(__file__))
controller.otherXbrlList += [base]
else:
controller.logDebug("Ignoring unknown file {} in {}".format(base,original), file=basename(__file__))
if isinstance(targetOrStream, str): # file name, not filesource
remove(targetOrStream)
return False
return True # you made it
def getQName(controller, pathname): # return ns, localname, and inline namespace if found
from lxml import etree
rootElement = rootNamespace = inlineNamespaceBound = None
f = None
try:
if isinstance(pathname, str):
f = open(pathname)
else: # stream, already is open
f = pathname
for event, element in etree.iterparse(f.buffer, events=('start','start-ns')):
if event == 'start-ns':
_ignore, uri = element
if uri in arelle.XbrlConst.ixbrlAll:
inlineNamespaceBound = uri
elif event == 'start':
qname = etree.QName(element.tag)
rootNamespace = qname.namespace
rootElement = qname.localname
break
except Exception as e:
controller.logDebug("EXCEPTION ON {}: {}".format(pathname, e))
finally:
if isinstance(f, str):
f.close()
sys.stderr.flush()
return (rootNamespace, rootElement, inlineNamespaceBound)