-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathutil.py
171 lines (120 loc) · 4.49 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env python3
# Source: https://github.com/O-X-L/nftables_addon_dns
# Copyright (C) 2024 Rath Pascal
# License: MIT
from os import listdir, getuid
from time import time
from pathlib import Path
from hashlib import md5 as md5_hash
from subprocess import Popen as subprocess_popen
from subprocess import PIPE as subprocess_pipe
from json import loads as json_loads
from json import JSONDecodeError
CONFIG = '/etc/nftables.conf'
BASE_DIR = '/etc/nftables.d'
ADDON_DIR = '/etc/nftables.d/addons'
CONFIG_EXT = 'nft'
APPENDIX_4 = 'v4'
APPENDIX_6 = 'v6'
IS_ROOT = getuid() == 0
SUDO = '' if IS_ROOT else 'sudo '
CMD_RELOAD = f'{SUDO}systemctl reload nftables.service' # has to be changed if no systemd is available
VAR_SINGLE_END = '_1'
if not CONFIG_EXT.startswith('.'):
CONFIG_EXT = f'.{CONFIG_EXT}'
FALLBACK_VAR_VALUE = {
4: '0.0.0.0',
6: '::',
}
FILE_TMP_PREFIX = '/tmp/nftables_'
FILE_HEADER = '# Auto-Generated config - DO NOT EDIT MANUALLY!\n\n'
def ensure_list(data: (str, list)) -> list:
if isinstance(data, list):
return data
return [data]
def format_var(name: str, data: list, version: int, fallback: str = None) -> str:
if version not in FALLBACK_VAR_VALUE:
version = 4
if name.endswith(VAR_SINGLE_END) and len(data) > 0:
data = [data[0]]
append = APPENDIX_4 if version == 4 else APPENDIX_6
if append not in [None, ' ', '']:
name = f'{name}_{append}'
if len(data) > 1:
raw = f"define { name } = {{ %s }}"
else:
raw = f"define { name } = %s"
if len(data) == 0:
if fallback is None:
return raw % FALLBACK_VAR_VALUE[version]
return raw % fallback
return raw % ', '.join(map(str, data))
def load_config(key: str) -> (dict, list, None):
with open(f'{ADDON_DIR}/{key}.json', 'r', encoding='utf-8') as _cnf:
try:
if key is None:
return json_loads(_cnf.read())
return json_loads(_cnf.read())[key]
except JSONDecodeError:
return None
def _exec(cmd: (str, list)) -> int:
if isinstance(cmd, str):
cmd = cmd.split(' ')
with subprocess_popen(cmd, stdout=subprocess_pipe) as p:
_ = p.communicate()[0]
return p.returncode
def _reload() -> bool:
print('INFO: Reloading NFTables!')
return _exec(CMD_RELOAD) == 0
def _validate(file: str) -> bool:
cmd = f'{SUDO}/usr/sbin/nft -cf {file}'
return _exec(cmd.split(' ')) == 0
def _write(file: str, content: str):
with open(file, 'w', encoding='utf-8') as config:
config.write(content + '\n\n')
_exec(['chmod', '640', file])
def _file_hash(file: str) -> str:
if Path(file).exists():
with open(file, 'rb') as _c:
return md5_hash(_c.read()).hexdigest()
else:
return md5_hash(b'').hexdigest()
def validate_and_write(key: str, lines: list):
file_out = f'{key}{CONFIG_EXT}'
file_out_path = f'{ADDON_DIR}/{file_out}'
file_tmp = f'{FILE_TMP_PREFIX}{key}_{time()}{CONFIG_EXT}'
file_tmp_main = f'{FILE_TMP_PREFIX}main_{time()}{CONFIG_EXT}'
content = FILE_HEADER + '\n'.join(lines) + '\n'
_write(file=file_tmp, content=content)
config_hash = {
'before': _file_hash(file=file_out_path),
'after': _file_hash(file=file_tmp),
}
config_changed = config_hash['before'] != config_hash['after']
if config_changed:
# create config to include existing main-config; must be valid in combination with new one
addon_includes = ''
for inc in listdir(ADDON_DIR):
if inc.endswith(CONFIG_EXT) and inc != file_out:
addon_includes += f'include "{ADDON_DIR}/{inc}"\n'
if BASE_DIR not in ['', ' ']:
addon_includes += f'include "{BASE_DIR}/*{CONFIG_EXT}"\n'
_write(
file=file_tmp_main,
content=f'include "{file_tmp}"\n'
f'{addon_includes}\n'
)
if _validate(file=file_tmp_main):
print('INFO: Test-config validated successfully!')
_write(file=file_out_path, content=content)
if _validate(file=CONFIG):
print('INFO: Real-config validated successfully!')
_reload()
else:
raise SystemExit('ERROR: Failed to validate real-config!')
else:
raise SystemExit('WARN: Failed to validate test-config!')
_exec(['rm', file_tmp_main])
else:
print('INFO: Config unchanged - nothing to do.')
_exec(['rm', file_tmp])