-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathterminalreplay.py
218 lines (183 loc) · 7.88 KB
/
terminalreplay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import logging
import sys
from time import sleep
from terminalparser import TermLogParser
from vtparser import VT500Parser
LOG = logging.getLogger()
class VT2Output(VT500Parser.DefaultTerminalOutputHandler, VT500Parser.DefaultControlSequenceHandler,
TermLogParser.DefaultEventListener):
"""
Output class that writes the console session log to stdout, recreating coloring, etc.
It suppresses vim's terminal query control functions that would trigger terminal responses.
The output of commands entered at the prompt are printed out simulating typing with delays.
"""
def __init__(self):
self.speed = 3
self.cleanup_cmdline = True
self.print_vim = False
self.command_line = []
self.cmd_line_pos = 0
self.in_prompt = False
self.in_vim = False
def print(self, code):
"""The current code should be mapped to a glyph according to the character set mappings and shift states
in effect, and that glyph should be displayed.
We print normal output to stdout. Only delayed when in the prompt."""
if self.in_prompt:
if self.cleanup_cmdline:
self.build_cmd_line_print(code)
else:
sleep(0.2 * (1.0/self.speed))
sys.stdout.write(chr(code))
sys.stdout.flush()
elif self.in_vim:
if self.print_vim:
if 0x21 <= code <= 0x7d:
sleep(0.2 * (0.5 / self.speed))
sys.stdout.write(chr(code))
sys.stdout.flush()
else:
sys.stdout.write(chr(code))
def execute(self, code):
"""The C0 or C1 control function should be executed, which may have any one of a variety of effects,
including changing the cursor position, suspending or resuming communications or changing the
shift states in effect. There are no parameters to this action.
We print control directly to stdout. Except when in the prompt of when ending the prompt. Then
a delay is added."""
if self.in_prompt:
if self.cleanup_cmdline:
self.build_cmd_line_ctrl(code)
else:
if code == 0x0d: # Wait at CR, because this might be the end of the command input
sleep(0.8)
sys.stdout.write(chr(code))
sleep(0.1 * (1.0/self.speed))
sys.stdout.flush()
elif self.in_vim and not self.print_vim:
pass
else:
sys.stdout.write(chr(code))
def esc_dispatch(self, intermediate, final):
"""Execute all control sequences"""
if self.in_vim and not self.print_vim:
return
ctrlstring = f"\x1b{intermediate}{final}"
LOG.info("Emit to stdout full ESC control function: %s", ctrlstring)
sys.stdout.write(ctrlstring)
def csi_dispatch(self, private, param, interm, final):
"""Only certain control sequences are caught an discarded. Namely the ones that would trigger
terminal responses. These are used by vim, but since vim is not running, no one is listening to
the responses."""
if final == "n":
LOG.info("Discard Device Status Report CSI sequence %s%s", interm, final)
return
elif final == "c" and (param == '' or param == '0'):
LOG.info("Discard Device Status Report CSI sequence %s%s", interm, final)
return
ctrlstring = f"\x1b[{private}{param}{interm}{final}"
LOG.info("Emit to stdout full CSI control function: %s", ctrlstring)
if self.in_prompt:
if self.cleanup_cmdline:
self.build_cmd_line_csi(private, param, interm, final)
else:
sleep(0.1 * (1.0/self.speed))
sys.stdout.write(ctrlstring)
sys.stdout.flush()
elif self.in_vim and not self.print_vim:
pass
else:
sys.stdout.write(ctrlstring)
def build_cmd_line_print(self, code):
if self.cmd_line_pos >= len(self.command_line):
self.command_line.insert(self.cmd_line_pos, code)
else:
self.command_line[self.cmd_line_pos] = code
self.cmd_line_pos += 1
def build_cmd_line_ctrl(self, code):
if code == 0x08: # BS
self.cmd_line_pos -= (1 if self.cmd_line_pos > 0 else 0) # Go back one character
elif code == 0x0D: # CR
self.cmd_line_pos = 0 # Back to start of line
elif code == 0x0A: # LF
# This should terminate the command line. Add it so it gets printed.
self.command_line.insert(len(self.command_line), code)
self.cmd_line_pos +=1
# Everything else is discarded, as we do not need it to build the command line
def build_cmd_line_csi(self, private, param, interm, final):
# Now it get's interesting.
# Filter out the codes that effect the command line and discard all the rest.
if final == '@' and interm == '': # Insert blank characters
self.command_line.insert(self.cmd_line_pos, ' ' if param == '' else ' ' * int(param))
elif final == 'C': # Cursor forward
self.cmd_line_pos += 1 if param == '' else int(param)
elif final == 'D': # Cursor backward
p = 1 if param == '' else int(param)
while self.cmd_line_pos >= 0 and p:
self.cmd_line_pos -= 1
p -= 1
elif final == 'K': # Erase in line
if param == '' or param == '0':
del self.command_line[self.cmd_line_pos:]
else:
# We need to handle this somehow should it appear
raise NotImplementedError("Control sequence for Erase in Line not implemented: " + param + final)
elif final == 'P': # Delete Character
p = 1 if param == '' else int(param)
self.command_line[self.cmd_line_pos:self.cmd_line_pos+p] = []
def print_cmd_line(self):
# Start with the prompt and pause
i = self.command_line.index(ord(' '))
for code in self.command_line[:i+1]:
sys.stdout.write(chr(code))
sys.stdout.flush()
sleep(0.8)
for code in self.command_line[i+1:]:
if code == 0x0A:
# Pause before we end the line
sleep(0.8)
sys.stdout.write(chr(code))
sleep(0.2 * (1.0/self.speed))
sys.stdout.flush()
def prompt_active(self):
if not self.cleanup_cmdline:
sys.stdout.flush()
sleep(0.8)
self.in_prompt = True
self.command_line = []
self.cmd_line_pos = 0
def prompt_end(self):
if self.cleanup_cmdline:
self.print_cmd_line()
sys.stdout.flush()
self.in_prompt = False
def vim_start(self):
self.in_vim = True
def vim_end(self):
self.in_vim = False
def parse(logfile):
"""Read the input file byte by byte and output as plain text to stdout"""
parser = TermLogParser()
output_processor = VT2Output()
parser.terminal_output_handler = output_processor
parser.control_sequence_handler = output_processor
parser.tlp_event_listener = output_processor
line = logfile.readline()
while line:
parser.parse(line)
line = logfile.readline()
# Gather statistics and dump to log
parser.log_statistics()
def main():
if len(sys.argv) <= 1:
print("Log file missing. Specify log file to parse.")
exit()
with open(sys.argv[1], 'rb') as logfile:
LOG.info("PlainOut:: Parsing file %s", sys.argv[1])
parse(logfile)
if __name__ == '__main__':
LOG_FORMAT = "%(levelname)s :%(module)s - %(message)s"
logging.basicConfig(filename="parser.log",
level=logging.DEBUG,
format=LOG_FORMAT,
filemode='w')
main()