-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
89 lines (72 loc) · 2.97 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from __future__ import absolute_import
import sys
import os
import signal
sys.stderr = None
from scapy.all import wrpcap
sys.stderr = sys.__stderr__
from dx_to_pcap.arg_parser import DxToPcapArgParser
from dx_to_pcap.dx_packet_parser import DxPacketParser
from collections import OrderedDict
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
def signal_handler(signal, frame):
print('Caught Ctrl+C! Quitting...')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
parser = DxToPcapArgParser()
args = parser.parse_args()
in_files = args.dx_files
max_output_size = args.max_pcap_size
dx_format = args.dx_format
mp_nums = OrderedDict()
output_file_name = 'mp{}.sp{}.{}.pcap'
def fill_mp_sp_arrays():
try:
for dx_file in in_files:
file_name_arr = dx_file.split('.')
mp_num = int(file_name_arr[2][2:])
sp_num = int(file_name_arr[3][2:])
mp_sps = mp_nums.get(mp_num, [])
if sp_num not in mp_sps:
mp_sps.append(sp_num)
mp_nums[mp_num] = mp_sps
except:
print('One or more of the file names are in the wrong convention, Quitting...')
sys.exit(1)
def create_new_pcap_file(pcap_name):
with open(pcap_name, 'w'):
pass
fill_mp_sp_arrays()
for mp in mp_nums.keys():
for sp in mp_nums[mp]:
output_file_index = 1
create_new_pcap_file(pcap_name=output_file_name.format(mp, sp, output_file_index))
for i in range(50, -1, -1):
dx_file = 'dx_write.tmp.mp{}.sp{}{}'.format(mp, sp, '.{}'.format(i) if i > 0 else '')
try:
with open(dx_file, 'rb') as f:
print('Parsing ', dx_file)
packet_parser = DxPacketParser(dx_plugin=dx_format)
for line in f.readlines():
try:
line = line.decode('utf8')
except UnicodeDecodeError:
new_line = []
for c in line:
try:
c = chr(c)
except UnicodeDecodeError:
c = format(c, '02x')
new_line.append(c)
line = ''.join(new_line)
res = packet_parser.consume_line(line.strip())
if res is not None:
if os.stat(output_file_name.format(mp, sp, output_file_index)).st_size >= max_output_size:
output_file_index += 1
create_new_pcap_file(pcap_name=output_file_name.format(mp, sp, output_file_index))
wrpcap(output_file_name.format(mp, sp, output_file_index), res, append=True)
except FileNotFoundError:
pass