|
14 | 14 |
|
15 | 15 | PRINTER_NAME = "sdw-printer"
|
16 | 16 | PRINTER_WAIT_TIMEOUT = 60
|
17 |
| -DEVICE = "/dev/sda1" |
| 17 | +DEVICE = "/dev/sda" |
18 | 18 | MOUNTPOINT = "/media/usb"
|
19 | 19 | ENCRYPTED_DEVICE = "encrypted_volume"
|
20 | 20 | BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv"
|
@@ -164,76 +164,84 @@ def extract_tarball(self):
|
164 | 164 | self.exit_gracefully(msg)
|
165 | 165 |
|
166 | 166 | def check_usb_connected(self):
|
167 |
| - |
168 | 167 | # If the USB is not attached via qvm-usb attach, lsusb will return empty string and a
|
169 | 168 | # return code of 1
|
170 | 169 | logging.info('Performing usb preflight')
|
171 | 170 | try:
|
172 |
| - p = subprocess.check_output(["lsusb", "-s", "{}:".format(self.pci_bus_id)]) |
173 |
| - logging.info("lsusb -s {} : {}".format(self.pci_bus_id, p.decode("utf-8"))) |
| 171 | + subprocess.check_output( |
| 172 | + ["lsblk", "-p", "-o", "KNAME", "--noheadings", "--inverse", DEVICE], |
| 173 | + stderr=subprocess.PIPE) |
| 174 | + self.exit_gracefully("USB_CONNECTED") |
174 | 175 | except subprocess.CalledProcessError:
|
175 |
| - msg = "ERROR_USB_CONFIGURATION" |
176 |
| - self.exit_gracefully(msg) |
177 |
| - n_usb = len(p.decode("utf-8").rstrip().split("\n")) |
178 |
| - # If there is one device, it is the root hub. |
179 |
| - if n_usb == 1: |
180 |
| - logging.info('usb preflight - no external devices connected') |
181 |
| - msg = "USB_NOT_CONNECTED" |
182 |
| - self.exit_gracefully(msg) |
183 |
| - # If there are two devices, it's the root hub and another device (presumably for export) |
184 |
| - elif n_usb == 2: |
185 |
| - logging.info('usb preflight - external device connected') |
186 |
| - msg = "USB_CONNECTED" |
187 |
| - self.exit_gracefully(msg) |
188 |
| - # Else the result is unexpected |
189 |
| - else: |
190 |
| - msg = "ERROR_USB_CHECK" |
| 176 | + self.exit_gracefully("USB_NOT_CONNECTED") |
| 177 | + |
| 178 | + def set_extracted_device_name(self): |
| 179 | + try: |
| 180 | + device_and_partitions = subprocess.check_output( |
| 181 | + ["lsblk", "-o", "TYPE", "--noheadings", DEVICE], stderr=subprocess.PIPE) |
| 182 | + |
| 183 | + # we don't support multiple partitions |
| 184 | + partition_count = device_and_partitions.decode('utf-8').split('\n').count('part') |
| 185 | + if partition_count > 1: |
| 186 | + logging.debug("multiple partitions not supported") |
| 187 | + self.exit_gracefully("USB_NO_SUPPORTED_ENCRYPTION") |
| 188 | + |
| 189 | + # set device to /dev/sda if disk is encrypted, /dev/sda1 if partition encrypted |
| 190 | + self.device = DEVICE if partition_count == 0 else DEVICE + '1' |
| 191 | + except subprocess.CalledProcessError: |
| 192 | + msg = "USB_NO_SUPPORTED_ENCRYPTION" |
191 | 193 | self.exit_gracefully(msg)
|
192 | 194 |
|
193 | 195 | def check_luks_volume(self):
|
194 | 196 | logging.info('Checking if volume is luks-encrypted')
|
195 | 197 | try:
|
196 |
| - # cryptsetup isLuks returns 0 if the device is a luks volume |
197 |
| - # subprocess with throw if the device is not luks (rc !=0) |
198 |
| - subprocess.check_call(["sudo", "cryptsetup", "isLuks", DEVICE]) |
199 |
| - msg = "USB_ENCRYPTED" |
200 |
| - self.exit_gracefully(msg) |
| 198 | + self.set_extracted_device_name() |
| 199 | + logging.debug("checking if {} is luks encrypted".format(self.device)) |
| 200 | + subprocess.check_call(["sudo", "cryptsetup", "isLuks", self.device]) |
| 201 | + self.exit_gracefully("USB_ENCRYPTED") |
201 | 202 | except subprocess.CalledProcessError:
|
202 | 203 | msg = "USB_NO_SUPPORTED_ENCRYPTION"
|
203 | 204 | self.exit_gracefully(msg)
|
204 | 205 |
|
205 | 206 | def unlock_luks_volume(self, encryption_key):
|
206 |
| - # the luks device is not already unlocked |
207 |
| - logging.info('Unlocking luks volume {}'.format(self.encrypted_device)) |
208 |
| - if not os.path.exists(os.path.join("/dev/mapper/", self.encrypted_device)): |
209 |
| - p = subprocess.Popen( |
210 |
| - ["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device], |
211 |
| - stdin=subprocess.PIPE, |
212 |
| - stdout=subprocess.PIPE, |
213 |
| - stderr=subprocess.PIPE |
214 |
| - ) |
215 |
| - logging.info('Passing key') |
216 |
| - p.communicate(input=str.encode(encryption_key, "utf-8")) |
217 |
| - rc = p.returncode |
218 |
| - if rc != 0: |
219 |
| - logging.error('Bad phassphrase for {}'.format(self.encrypted_device)) |
220 |
| - msg = "USB_BAD_PASSPHRASE" |
221 |
| - self.exit_gracefully(msg) |
| 207 | + try: |
| 208 | + # get the encrypted device name |
| 209 | + self.set_extracted_device_name() |
| 210 | + luks_header = subprocess.check_output(["sudo", "cryptsetup", "luksDump", self.device]) |
| 211 | + luks_header_list = luks_header.decode('utf-8').split('\n') |
| 212 | + for line in luks_header_list: |
| 213 | + items = line.split('\t') |
| 214 | + if 'UUID' in items[0]: |
| 215 | + self.encrypted_device = 'luks-' + items[1] |
| 216 | + |
| 217 | + # the luks device is not already unlocked |
| 218 | + if not os.path.exists(os.path.join("/dev/mapper/", self.encrypted_device)): |
| 219 | + logging.debug('Unlocking luks volume {}'.format(self.encrypted_device)) |
| 220 | + p = subprocess.Popen( |
| 221 | + ["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device], |
| 222 | + stdin=subprocess.PIPE, |
| 223 | + stdout=subprocess.PIPE, |
| 224 | + stderr=subprocess.PIPE |
| 225 | + ) |
| 226 | + logging.debug('Passing key') |
| 227 | + p.communicate(input=str.encode(encryption_key, "utf-8")) |
| 228 | + rc = p.returncode |
| 229 | + if rc != 0: |
| 230 | + logging.error('Bad phassphrase for {}'.format(self.encrypted_device)) |
| 231 | + msg = "USB_BAD_PASSPHRASE" |
| 232 | + self.exit_gracefully(msg) |
| 233 | + except subprocess.CalledProcessError: |
| 234 | + self.exit_gracefully("USB_NO_SUPPORTED_ENCRYPTION") |
222 | 235 |
|
223 | 236 | def mount_volume(self):
|
224 |
| - # mount target not created |
225 |
| - if not os.path.exists(self.mountpoint): |
226 |
| - subprocess.check_call(["sudo", "mkdir", self.mountpoint]) |
227 | 237 | try:
|
228 |
| - logging.info('Mounting {} to {}'.format(self.encrypted_device, self.mountpoint)) |
229 |
| - subprocess.check_call( |
230 |
| - [ |
231 |
| - "sudo", |
232 |
| - "mount", |
233 |
| - os.path.join("/dev/mapper/", self.encrypted_device), |
234 |
| - self.mountpoint, |
235 |
| - ] |
236 |
| - ) |
| 238 | + # mount target not created |
| 239 | + if not os.path.exists(self.mountpoint): |
| 240 | + subprocess.check_call(["sudo", "mkdir", self.mountpoint]) |
| 241 | + |
| 242 | + mapped_device_path = os.path.join("/dev/mapper/", self.encrypted_device) |
| 243 | + logging.info('Mounting {}'.format(mapped_device_path)) |
| 244 | + subprocess.check_call(["sudo", "mount", mapped_device_path, self.mountpoint]) |
237 | 245 | subprocess.check_call(["sudo", "chown", "-R", "user:user", self.mountpoint])
|
238 | 246 | except subprocess.CalledProcessError:
|
239 | 247 | # clean up
|
|
0 commit comments