-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathmeta_row.py
137 lines (118 loc) · 4.6 KB
/
meta_row.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import csv
import os
from pathlib import Path
from typing import Union, List, Generator
class MetaRow:
"""Class represented meta markup row structure"""
Id: int
FileID: str
Domain: str
RepoName: str
FilePath: str
LineStart: int
LineEnd: int
GroundTruth: str
WithWords: str
ValueStart: int
ValueEnd: int
InURL: str
InRuntimeParameter: str
CharacterSet: str
CryptographyKey: str
PredefinedPattern: str
VariableNameType: str
Entropy: float
Length: int
Base64Encode: str
HexEncode: str
URLEncode: str
Category: str
def __init__(self, row: dict):
if not isinstance(row, dict) or self.__annotations__.keys() != row.keys():
raise RuntimeError(f"ERROR: wrong row {row}")
for key, typ in self.__annotations__.items():
if key.startswith("__"):
continue
row_val = row.get(key)
if row_val is not None:
if typ is int:
if row_val:
val = typ(row_val)
else:
val = -1
elif typ is float:
if row_val:
val = typ(row_val)
else:
val = 0.0
elif typ is str and isinstance(row_val, str):
val = row_val
else:
raise RuntimeError(f"ERROR: Unsupported {typ}")
self.__setattr__(key, val)
if not self.Category:
raise RuntimeError(f"ERROR: Category must be set {row}")
if ':' in self.Category:
rules = self.Category.split(':')
rule_set=set(rules)
if len(rules) != len(rule_set):
raise RuntimeError(f"ERROR: Each rule must be once in Category {row}")
if "Other" in rule_set:
raise RuntimeError(f"ERROR: 'Other' Category must be single rule in markup {row}")
allowed_GroundTruth = ['T', 'F', "Template"]
if self.GroundTruth not in allowed_GroundTruth:
raise RuntimeError(f"ERROR: GroundTruth must be in {allowed_GroundTruth} {row}")
if 0 > self.LineStart or 0 > self.LineEnd:
raise RuntimeError(f"ERROR: LineStart and LineEnd must be positive {row}")
elif self.LineStart > self.LineEnd:
raise RuntimeError(f"ERROR: LineStart must be lower than LineEnd {row}")
elif self.LineStart == self.LineEnd and 0 <= self.ValueStart and 0 <= self.ValueEnd < self.ValueStart:
# multiline value positions are independent
raise RuntimeError(f"ERROR: ValueStart must be lower than ValueEnd for single line {row}")
def __str__(self) -> str:
dict_values = self.__dict__.values()
_str = ','.join(str(x) for x in dict_values)
return _str
def __repr__(self):
return str(self)
def _meta_from_file(meta_path: Path) -> Generator[dict, None, None]:
if ".csv" != meta_path.suffix:
# *.csv.orig artifacts after git merge
print(f"WARNING: skip {meta_path} file")
return
with open(meta_path) as f:
reader = csv.DictReader(f)
for row in reader:
if not isinstance(row, dict):
raise RuntimeError(f"ERROR: wrong row '{row}' in {meta_path}")
yield row
def _meta_from_dir(meta_path: Path) -> Generator[dict, None, None]:
for root, dirs, files in os.walk(meta_path):
root_path = Path(root)
for file in files:
yield from _meta_from_file(root_path / file)
# meta dir is flat
break
def _get_source_gen(meta_path: Union[Path]) -> Generator[dict, None, None]:
if not isinstance(meta_path, Path):
raise RuntimeError(f"ERROR: unsupported source {meta_path} type {type(meta_path)}")
if not meta_path.exists():
raise RuntimeError(f"ERROR: {meta_path} does not exist")
if meta_path.is_dir():
source_gen = _meta_from_dir
elif meta_path.is_file():
source_gen = _meta_from_file
else:
raise RuntimeError(f"ERROR: unsupported {meta_path} file type")
yield from source_gen(meta_path)
def read_meta(meta_dir: Union[str, Path]) -> List[MetaRow]:
"""Returns list of MetaRow read from file or directory. The same approach may be used to obtain a dict."""
meta = []
meta_ids = set()
for row in _get_source_gen(Path(meta_dir)):
meta_row = MetaRow(row)
if meta_row.Id in meta_ids:
raise RuntimeError(f"ERROR: duplicate Id row {row}")
meta_ids.add(meta_row.Id)
meta.append(meta_row)
return meta