-
-
Notifications
You must be signed in to change notification settings - Fork 569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Extract Android backups, yield devices instead of just echoing #80
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,226 @@ | ||
#!/usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
# This is a fork from original work by BlueC0re <coding@bluec0re.eu> | ||
# https://github.com/bluec0re/android-backup-tools | ||
# License: GPLv3 | ||
import tarfile | ||
import zlib | ||
import enum | ||
import io | ||
import pickle | ||
import os | ||
import binascii | ||
|
||
try: | ||
from Crypto.Cipher import AES | ||
from Crypto.Protocol.KDF import PBKDF2 | ||
from Crypto import Random | ||
except ImportError: | ||
AES = None | ||
|
||
|
||
class CompressionType(enum.IntEnum): | ||
NONE = 0 | ||
ZLIB = 1 | ||
|
||
|
||
class EncryptionType(enum.Enum): | ||
NONE = 'none' | ||
AES256 = 'AES-256' | ||
|
||
|
||
class AndroidBackup: | ||
def __init__(self, fname=None): | ||
if fname: | ||
self.open(fname) | ||
|
||
def open(self, fname, mode='rb'): | ||
self.fp = open(fname, mode) | ||
|
||
def close(self): | ||
self.fp.close() | ||
|
||
def parse(self): | ||
self.fp.seek(0) | ||
magic = self.fp.readline() | ||
assert magic == b'ANDROID BACKUP\n' | ||
self.version = int(self.fp.readline().strip()) | ||
self.compression = CompressionType(int(self.fp.readline().strip())) | ||
self.encryption = EncryptionType(self.fp.readline().strip().decode()) | ||
|
||
def is_encrypted(self): | ||
return self.encryption == EncryptionType.AES256 | ||
|
||
def _decrypt(self, enc, password): | ||
if AES is None: | ||
raise ImportError("PyCrypto required") | ||
|
||
user_salt, enc = enc.split(b'\n', 1) | ||
user_salt = binascii.a2b_hex(user_salt) | ||
ck_salt, enc = enc.split(b'\n', 1) | ||
ck_salt = binascii.a2b_hex(ck_salt) | ||
rounds, enc = enc.split(b'\n', 1) | ||
rounds = int(rounds) | ||
iv, enc = enc.split(b'\n', 1) | ||
iv = binascii.a2b_hex(iv) | ||
master_key, enc = enc.split(b'\n', 1) | ||
master_key = binascii.a2b_hex(master_key) | ||
|
||
user_key = PBKDF2(password, user_salt, dkLen=256//8, count=rounds) | ||
cipher = AES.new(user_key, | ||
mode=AES.MODE_CBC, | ||
IV=iv) | ||
|
||
master_key = list(cipher.decrypt(master_key)) | ||
l = master_key.pop(0) | ||
master_iv = bytes(master_key[:l]) | ||
master_key = master_key[l:] | ||
l = master_key.pop(0) | ||
mk = bytes(master_key[:l]) | ||
master_key = master_key[l:] | ||
l = master_key.pop(0) | ||
master_ck = bytes(master_key[:l]) | ||
|
||
# gen checksum | ||
|
||
# double encode utf8 | ||
utf8mk = self.encode_utf8(mk) | ||
calc_ck = PBKDF2(utf8mk, ck_salt, dkLen=256//8, count=rounds) | ||
assert calc_ck == master_ck | ||
|
||
cipher = AES.new(mk, | ||
mode=AES.MODE_CBC, | ||
IV=master_iv) | ||
|
||
dec = cipher.decrypt(enc) | ||
pad = dec[-1] | ||
|
||
return dec[:-pad] | ||
|
||
@staticmethod | ||
def encode_utf8(mk): | ||
utf8mk = mk.decode('raw_unicode_escape') | ||
utf8mk = list(utf8mk) | ||
for i in range(len(utf8mk)): | ||
c = ord(utf8mk[i]) | ||
# fix java encoding (add 0xFF00 to non ascii chars) | ||
if 0x7f < c < 0x100: | ||
c += 0xff00 | ||
utf8mk[i] = chr(c) | ||
return ''.join(utf8mk).encode('utf-8') | ||
|
||
def _encrypt(self, dec, password): | ||
if AES is None: | ||
raise ImportError("PyCrypto required") | ||
|
||
master_key = Random.get_random_bytes(32) | ||
master_salt = Random.get_random_bytes(64) | ||
user_salt = Random.get_random_bytes(64) | ||
master_iv = Random.get_random_bytes(16) | ||
user_iv = Random.get_random_bytes(16) | ||
rounds = 10000 | ||
|
||
l = len(dec) | ||
pad = 16 - (l % 16) | ||
dec += bytes([pad] * pad) | ||
cipher = AES.new(master_key, IV=master_iv, mode=AES.MODE_CBC) | ||
enc = cipher.encrypt(dec) | ||
|
||
master_ck = PBKDF2(self.encode_utf8(master_key), | ||
master_salt, dkLen=256//8, count=rounds) | ||
|
||
user_key = PBKDF2(password, | ||
user_salt, dkLen=256//8, count=rounds) | ||
|
||
master_dec = b"\x10" + master_iv + b"\x20" + master_key + b"\x20" + master_ck | ||
l = len(master_dec) | ||
pad = 16 - (l % 16) | ||
master_dec += bytes([pad] * pad) | ||
cipher = AES.new(user_key, IV=user_iv, mode=AES.MODE_CBC) | ||
master_enc = cipher.encrypt(master_dec) | ||
|
||
enc = binascii.b2a_hex(user_salt).upper() + b"\n" + \ | ||
binascii.b2a_hex(master_salt).upper() + b"\n" + \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. continuation line over-indented for visual indent |
||
str(rounds).encode() + b"\n" + \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. continuation line over-indented for visual indent |
||
binascii.b2a_hex(user_iv).upper() + b"\n" + \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. continuation line over-indented for visual indent |
||
binascii.b2a_hex(master_enc).upper() + b"\n" + enc | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. continuation line over-indented for visual indent |
||
|
||
return enc | ||
|
||
def read_data(self, password): | ||
"""Reads from the file and returns a TarFile object.""" | ||
data = self.fp.read() | ||
|
||
if self.encryption == EncryptionType.AES256: | ||
if password is None: | ||
raise Exception("Password need to be provided to extract encrypted archives") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. line too long (93 > 79 characters) |
||
data = self._decrypt(data, password) | ||
|
||
if self.compression == CompressionType.ZLIB: | ||
data = zlib.decompress(data, zlib.MAX_WBITS) | ||
|
||
tar = tarfile.TarFile(fileobj=io.BytesIO(data)) | ||
return tar | ||
|
||
def unpack(self, target_dir=None, password=None): | ||
tar = self.read_data(password) | ||
|
||
members = tar.getmembers() | ||
|
||
if target_dir is None: | ||
target_dir = os.path.basename(self.fp.name) + '_unpacked' | ||
pickle_fname = os.path.basename(self.fp.name) + '.pickle' | ||
if not os.path.exists(target_dir): | ||
os.mkdir(target_dir) | ||
|
||
tar.extractall(path=target_dir) | ||
|
||
with open(pickle_fname, 'wb') as fp: | ||
pickle.dump(members, fp) | ||
|
||
def list(self, password=None): | ||
tar = self.read_tar(password) | ||
return tar.list() | ||
|
||
def pack(self, fname, password=None): | ||
target_dir = os.path.basename(fname) + '_unpacked' | ||
pickle_fname = os.path.basename(fname) + '.pickle' | ||
|
||
data = io.BytesIO() | ||
tar = tarfile.TarFile(name=fname, | ||
fileobj=data, | ||
mode='w', | ||
format=tarfile.PAX_FORMAT) | ||
|
||
with open(pickle_fname, 'rb') as fp: | ||
members = pickle.load(fp) | ||
|
||
os.chdir(target_dir) | ||
for member in members: | ||
if member.isreg(): | ||
tar.addfile(member, open(member.name, 'rb')) | ||
else: | ||
tar.addfile(member) | ||
|
||
tar.close() | ||
|
||
data.seek(0) | ||
if self.compression == CompressionType.ZLIB: | ||
data = zlib.compress(data.read()) | ||
if self.encryption == EncryptionType.AES256: | ||
data = self._encrypt(data, password) | ||
|
||
with open(fname, 'wb') as fp: | ||
fp.write(b'ANDROID BACKUP\n') | ||
fp.write('{}\n'.format(self.version).encode()) | ||
fp.write('{:d}\n'.format(self.compression).encode()) | ||
fp.write('{}\n'.format(self.encryption.value).encode()) | ||
|
||
fp.write(data) | ||
|
||
def __exit__(self, *args, **kwargs): | ||
self.close() | ||
|
||
def __enter__(self): | ||
self.parse() | ||
return self |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,33 @@ | ||
import logging | ||
import click | ||
import tarfile | ||
import tempfile | ||
import sqlite3 | ||
from Crypto.Cipher import AES | ||
from pprint import pformat as pf | ||
import attr | ||
from .android_backup import AndroidBackup | ||
|
||
logging.basicConfig(level=logging.INFO) | ||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
@attr.s | ||
class DeviceConfig: | ||
name = attr.ib() | ||
mac = attr.ib() | ||
ip = attr.ib() | ||
token = attr.ib() | ||
model = attr.ib() | ||
|
||
|
||
class BackupDatabaseReader: | ||
def __init__(self, dump_all=False, dump_raw=False): | ||
self.dump_all = dump_all | ||
def __init__(self, dump_raw=False): | ||
self.dump_raw = dump_raw | ||
|
||
@staticmethod | ||
def dump_raw(dev): | ||
raw = {k: dev[k] for k in dev.keys()} | ||
click.echo(pf(raw)) | ||
_LOGGER.info(pf(raw)) | ||
|
||
@staticmethod | ||
def decrypt_ztoken(ztoken): | ||
|
@@ -29,7 +42,7 @@ def decrypt_ztoken(ztoken): | |
return token.decode() | ||
|
||
def read_apple(self): | ||
click.echo("Reading tokens from Apple DB") | ||
_LOGGER.info("Reading tokens from Apple DB") | ||
c = self.conn.execute("SELECT * FROM ZDEVICE WHERE ZTOKEN IS NOT '';") | ||
for dev in c.fetchall(): | ||
if self.dump_raw: | ||
|
@@ -39,11 +52,12 @@ def read_apple(self): | |
model = dev['ZMODEL'] | ||
name = dev['ZNAME'] | ||
token = BackupDatabaseReader.decrypt_ztoken(dev['ZTOKEN']) | ||
if ip or self.dump_all: | ||
click.echo("%s\n\tModel: %s\n\tIP address: %s\n\tToken: %s\n\tMAC: %s" % (name, model, ip, token, mac)) | ||
|
||
config = DeviceConfig(name=name, mac=mac, ip=ip, model=model, token=token) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. line too long (86 > 79 characters) |
||
yield config | ||
|
||
def read_android(self): | ||
click.echo("Reading tokens from Android DB") | ||
_LOGGER.info("Reading tokens from Android DB") | ||
c = self.conn.execute("SELECT * FROM devicerecord WHERE token IS NOT '';") | ||
for dev in c.fetchall(): | ||
if self.dump_raw: | ||
|
@@ -53,57 +67,77 @@ def read_android(self): | |
model = dev['model'] | ||
name = dev['name'] | ||
token = dev['token'] | ||
if ip or self.dump_all: | ||
click.echo("%s\n\tModel: %s\n\tIP address: %s\n\tToken: %s\n\tMAC: %s" % (name, model, ip, token, mac)) | ||
|
||
def dump_to_file(self, fp): | ||
fp.open() | ||
self.db.seek(0) # go to the beginning | ||
click.echo("Saving db to %s" % fp) | ||
fp.write(self.db.read()) | ||
config = DeviceConfig(name=name, ip=ip, mac=mac, | ||
model=model, token=token) | ||
yield config | ||
|
||
def read_tokens(self, db): | ||
self.db = db | ||
_LOGGER.info("Reading database from %s" % db) | ||
self.conn = sqlite3.connect(db) | ||
|
||
self.conn.row_factory = sqlite3.Row | ||
with self.conn: | ||
is_android = self.conn.execute( | ||
"SELECT name FROM sqlite_master WHERE type='table' AND name='devicerecord';").fetchone() is not None | ||
is_apple = self.conn.execute( | ||
"SELECT name FROM sqlite_master WHERE type='table' AND name='ZDEVICE'").fetchone() is not None | ||
if is_android: | ||
self.read_android() | ||
yield from self.read_android() | ||
elif is_apple: | ||
self.read_apple() | ||
yield from self.read_apple() | ||
else: | ||
click.echo("Error, unknown database type!") | ||
_LOGGER.error("Error, unknown database type!") | ||
|
||
|
||
@click.command() | ||
@click.argument('backup') | ||
@click.option('--write-to-disk', type=click.File('wb'), help='writes sqlite3 db to a file for debugging') | ||
@click.option('--dump-all', is_flag=True, default=False, help='dump devices without ip addresses') | ||
@click.option('--write-to-disk', type=click.File('wb'), | ||
help='writes sqlite3 db to a file for debugging') | ||
@click.option('--password', type=str, | ||
help='password if the android database is encrypted') | ||
@click.option('--dump-all', is_flag=True, default=False, | ||
help='dump devices without ip addresses') | ||
@click.option('--dump-raw', is_flag=True, help='dumps raw rows') | ||
def main(backup, write_to_disk, dump_all, dump_raw): | ||
def main(backup, write_to_disk, password, dump_all, dump_raw): | ||
"""Reads device information out from an sqlite3 DB. | ||
If the given file is a .tar file, the file will be extracted | ||
and the database automatically located (out of Android backups). | ||
If the given file is an Android backup (.ab), the database | ||
will be extracted automatically. | ||
If the given file is an iOS backup, the tokens will be | ||
extracted (and decrypted if needed) automatically. | ||
""" | ||
reader = BackupDatabaseReader(dump_all, dump_raw) | ||
if backup.endswith(".tar"): | ||
|
||
reader = BackupDatabaseReader(dump_raw) | ||
if backup.endswith(".ab"): | ||
DBFILE = "apps/com.xiaomi.smarthome/db/miio2.db" | ||
with tarfile.open(backup) as f: | ||
click.echo("Opened %s" % backup) | ||
db = f.extractfile(DBFILE) | ||
with tempfile.NamedTemporaryFile() as fp: | ||
click.echo("Extracting to %s" % fp.name) | ||
with AndroidBackup(backup) as f: | ||
tar = f.read_data(password) | ||
try: | ||
db = tar.extractfile(DBFILE) | ||
except KeyError as ex: | ||
click.echo("Unable to extract the database file %s: %s" % (DBFILE, ex)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. line too long (87 > 79 characters) |
||
return | ||
if write_to_disk: | ||
file = write_to_disk | ||
else: | ||
file = tempfile.NamedTemporaryFile() | ||
with file as fp: | ||
click.echo("Saving database to %s" % fp.name) | ||
fp.write(db.read()) | ||
if write_to_disk: | ||
reader.dump_to_file(write_to_disk) | ||
|
||
reader.read_tokens(fp.name) | ||
devices = list(reader.read_tokens(fp.name)) | ||
else: | ||
reader.read_tokens(backup) | ||
devices = list(reader.read_tokens(backup)) | ||
|
||
for dev in devices: | ||
if dev.ip or dump_all: | ||
click.echo("%s\n" | ||
"\tModel: %s\n" | ||
"\tIP address: %s\n" | ||
"\tToken: %s\n" | ||
"\tMAC: %s" % (dev.name, dev.model, | ||
dev.ip, dev.token, dev.mac)) | ||
|
||
|
||
if __name__ == "__main__": | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line too long (85 > 79 characters)