-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
208 lines (151 loc) · 5.72 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import os
import heapq
class HuffmanCoding:
def __init__(self, path):
self.path = path
self.heap = []
self.codes = {}
self.reverse_mapping = {}
class HeapNode:
def __init__(self, char, freq):
self.char = char
self.freq = freq
self.left = None
self.right = None
def __lt__(self, other):
return self.freq < other.freq
def __eq__(self, other):
if(other == None):
return False
if(not isinstance(other, HeapNode)):
return False
return self.freq == other.freq
def make_frequency_dict(self, text):
# calc freq and return
frequency = {}
for character in text:
if not character in frequency:
frequency[character] = 0
frequency[character] += 1
return frequency
def make_heap(self, frequency):
# make priority queue
for key in frequency:
node = self.HeapNode(key, frequency[key])
heapq.heappush(self.heap, node)
def merge_nodes(self):
# build huffman tree, save root node in heap
while(len(self.heap) > 1):
node1 = heapq.heappop(self.heap)
node2 = heapq.heappop(self.heap)
merged = self.HeapNode(None, node1.freq + node2.freq)
merged.left = node1
merged.right = node2
heapq.heappush(self.heap, merged)
def make_codes_helper(self, root, current_code):
if(root == None):
return
if(root.char != None):
self.codes[root.char] = current_code
self.reverse_mapping[current_code] = root.char
self.make_codes_helper(root.left, current_code + "0")
self.make_codes_helper(root.right, current_code + "1")
def make_codes(self):
# make codes for characters and save
root = heapq.heappop(self.heap)
current_code = ""
self.make_codes_helper(root, current_code)
def get_encoded_text(self, text):
# replace characters with code and return
encoded_text = ""
for character in text:
encoded_text += self.codes[character]
return encoded_text
def pad_encoded_text(self, encoded_text):
# pad encoded text and return
extra_padding = 8 - len(encoded_text) % 8
for i in range(extra_padding):
encoded_text += "0"
padded_info = "{0:08b}".format(extra_padding)
encoded_text = padded_info + encoded_text
return encoded_text
def get_byte_array(self, padded_encoded_text):
if(len(padded_encoded_text) % 8 != 0):
print("Encoded text not padded properly")
exit(0)
# convert bits into bytes, return byte array
b = bytearray()
for i in range(0, len(padded_encoded_text), 8):
byte = padded_encoded_text[i:i+8]
b.append(int(byte, 2))
return b
def compress(self):
filename, file_extension = os.path.splitext(self.path)
output_path = filename + ".bin"
with open(self.path, 'r+') as file, open(output_path, 'wb') as output:
text = file.read()
text = text.rstrip()
frequency = self.make_frequency_dict(text)
self.make_heap(frequency)
self.merge_nodes()
self.make_codes()
encoded_text = self.get_encoded_text(text)
padded_encoded_text = self.pad_encoded_text(encoded_text)
b = self.get_byte_array(padded_encoded_text)
output.write(bytes(b))
print("Compressed")
return output_path
def remove_padding(self, bit_string):
# remove padding and return
padded_info = bit_string[:8]
extra_padding = int(padded_info, 2)
bit_string = bit_string[8:]
encoded_text = bit_string[:-1 * extra_padding]
return encoded_text
def decode_text(self, encoded_text):
# decode and return
current_code = ""
decoded_text = ""
for bit in encoded_text:
current_code += bit
if(current_code in self.reverse_mapping):
character = self.reverse_mapping[current_code]
decoded_text += character
current_code = ""
return decoded_text
def decompress(self, input_path):
filename, file_extension = os.path.splitext(self.path)
output_path = filename + "_decompress" + ".txt"
with open(input_path, 'rb') as file, open(output_path, 'w') as output:
bit_string = ""
byte = file.read(1)
while(len(byte) > 0):
byte = ord(byte)
bits = bin(byte)[2:].rjust(8, '0')
bit_string += bits
byte = file.read(1)
encoded_text = self.remove_padding(bit_string)
decoded_text = self.decode_text(encoded_text)
output.write(decoded_text)
print("Decompressed")
return output_path
path = ('sample.txt')
H = HuffmanCoding(path)
H.compress()
H.decompress('sample.bin')
f = open("sample.bin", mode="rb")
# Reading file data with read() method
data = f.read()
# Knowing the Type of our data
print(type(data))
# Printing our byte sequenced data
print(data)
# Closing the opened file
f.close()
file1 = open('sample_decompress.txt', 'r')
Lines = file1.readlines()
count = 0
# Strips the newline character
for line in Lines:
count += 1
print("Line{}: {}".format(count, line.strip()))