Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Extract Android backups, yield devices instead of just echoing #80

Merged
merged 3 commits into from
Oct 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 226 additions & 0 deletions mirobo/android_backup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This is a fork from original work by BlueC0re <coding@bluec0re.eu>
# https://github.com/bluec0re/android-backup-tools
# License: GPLv3
import tarfile
import zlib
import enum
import io
import pickle
import os
import binascii

try:
from Crypto.Cipher import AES
from Crypto.Protocol.KDF import PBKDF2
from Crypto import Random
except ImportError:
AES = None


class CompressionType(enum.IntEnum):
NONE = 0
ZLIB = 1


class EncryptionType(enum.Enum):
NONE = 'none'
AES256 = 'AES-256'


class AndroidBackup:
def __init__(self, fname=None):
if fname:
self.open(fname)

def open(self, fname, mode='rb'):
self.fp = open(fname, mode)

def close(self):
self.fp.close()

def parse(self):
self.fp.seek(0)
magic = self.fp.readline()
assert magic == b'ANDROID BACKUP\n'
self.version = int(self.fp.readline().strip())
self.compression = CompressionType(int(self.fp.readline().strip()))
self.encryption = EncryptionType(self.fp.readline().strip().decode())

def is_encrypted(self):
return self.encryption == EncryptionType.AES256

def _decrypt(self, enc, password):
if AES is None:
raise ImportError("PyCrypto required")

user_salt, enc = enc.split(b'\n', 1)
user_salt = binascii.a2b_hex(user_salt)
ck_salt, enc = enc.split(b'\n', 1)
ck_salt = binascii.a2b_hex(ck_salt)
rounds, enc = enc.split(b'\n', 1)
rounds = int(rounds)
iv, enc = enc.split(b'\n', 1)
iv = binascii.a2b_hex(iv)
master_key, enc = enc.split(b'\n', 1)
master_key = binascii.a2b_hex(master_key)

user_key = PBKDF2(password, user_salt, dkLen=256//8, count=rounds)
cipher = AES.new(user_key,
mode=AES.MODE_CBC,
IV=iv)

master_key = list(cipher.decrypt(master_key))
l = master_key.pop(0)
master_iv = bytes(master_key[:l])
master_key = master_key[l:]
l = master_key.pop(0)
mk = bytes(master_key[:l])
master_key = master_key[l:]
l = master_key.pop(0)
master_ck = bytes(master_key[:l])

# gen checksum

# double encode utf8
utf8mk = self.encode_utf8(mk)
calc_ck = PBKDF2(utf8mk, ck_salt, dkLen=256//8, count=rounds)
assert calc_ck == master_ck

cipher = AES.new(mk,
mode=AES.MODE_CBC,
IV=master_iv)

dec = cipher.decrypt(enc)
pad = dec[-1]

return dec[:-pad]

@staticmethod
def encode_utf8(mk):
utf8mk = mk.decode('raw_unicode_escape')
utf8mk = list(utf8mk)
for i in range(len(utf8mk)):
c = ord(utf8mk[i])
# fix java encoding (add 0xFF00 to non ascii chars)
if 0x7f < c < 0x100:
c += 0xff00
utf8mk[i] = chr(c)
return ''.join(utf8mk).encode('utf-8')

def _encrypt(self, dec, password):
if AES is None:
raise ImportError("PyCrypto required")

master_key = Random.get_random_bytes(32)
master_salt = Random.get_random_bytes(64)
user_salt = Random.get_random_bytes(64)
master_iv = Random.get_random_bytes(16)
user_iv = Random.get_random_bytes(16)
rounds = 10000

l = len(dec)
pad = 16 - (l % 16)
dec += bytes([pad] * pad)
cipher = AES.new(master_key, IV=master_iv, mode=AES.MODE_CBC)
enc = cipher.encrypt(dec)

master_ck = PBKDF2(self.encode_utf8(master_key),
master_salt, dkLen=256//8, count=rounds)

user_key = PBKDF2(password,
user_salt, dkLen=256//8, count=rounds)

master_dec = b"\x10" + master_iv + b"\x20" + master_key + b"\x20" + master_ck

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (85 > 79 characters)

l = len(master_dec)
pad = 16 - (l % 16)
master_dec += bytes([pad] * pad)
cipher = AES.new(user_key, IV=user_iv, mode=AES.MODE_CBC)
master_enc = cipher.encrypt(master_dec)

enc = binascii.b2a_hex(user_salt).upper() + b"\n" + \
binascii.b2a_hex(master_salt).upper() + b"\n" + \

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continuation line over-indented for visual indent

str(rounds).encode() + b"\n" + \

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continuation line over-indented for visual indent

binascii.b2a_hex(user_iv).upper() + b"\n" + \

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continuation line over-indented for visual indent

binascii.b2a_hex(master_enc).upper() + b"\n" + enc

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continuation line over-indented for visual indent


return enc

def read_data(self, password):
"""Reads from the file and returns a TarFile object."""
data = self.fp.read()

if self.encryption == EncryptionType.AES256:
if password is None:
raise Exception("Password need to be provided to extract encrypted archives")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (93 > 79 characters)

data = self._decrypt(data, password)

if self.compression == CompressionType.ZLIB:
data = zlib.decompress(data, zlib.MAX_WBITS)

tar = tarfile.TarFile(fileobj=io.BytesIO(data))
return tar

def unpack(self, target_dir=None, password=None):
tar = self.read_data(password)

members = tar.getmembers()

if target_dir is None:
target_dir = os.path.basename(self.fp.name) + '_unpacked'
pickle_fname = os.path.basename(self.fp.name) + '.pickle'
if not os.path.exists(target_dir):
os.mkdir(target_dir)

tar.extractall(path=target_dir)

with open(pickle_fname, 'wb') as fp:
pickle.dump(members, fp)

def list(self, password=None):
tar = self.read_tar(password)
return tar.list()

def pack(self, fname, password=None):
target_dir = os.path.basename(fname) + '_unpacked'
pickle_fname = os.path.basename(fname) + '.pickle'

data = io.BytesIO()
tar = tarfile.TarFile(name=fname,
fileobj=data,
mode='w',
format=tarfile.PAX_FORMAT)

with open(pickle_fname, 'rb') as fp:
members = pickle.load(fp)

os.chdir(target_dir)
for member in members:
if member.isreg():
tar.addfile(member, open(member.name, 'rb'))
else:
tar.addfile(member)

tar.close()

data.seek(0)
if self.compression == CompressionType.ZLIB:
data = zlib.compress(data.read())
if self.encryption == EncryptionType.AES256:
data = self._encrypt(data, password)

with open(fname, 'wb') as fp:
fp.write(b'ANDROID BACKUP\n')
fp.write('{}\n'.format(self.version).encode())
fp.write('{:d}\n'.format(self.compression).encode())
fp.write('{}\n'.format(self.encryption.value).encode())

fp.write(data)

def __exit__(self, *args, **kwargs):
self.close()

def __enter__(self):
self.parse()
return self
102 changes: 68 additions & 34 deletions mirobo/extract_tokens.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
import logging
import click
import tarfile
import tempfile
import sqlite3
from Crypto.Cipher import AES
from pprint import pformat as pf
import attr
from .android_backup import AndroidBackup

logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)


@attr.s
class DeviceConfig:
name = attr.ib()
mac = attr.ib()
ip = attr.ib()
token = attr.ib()
model = attr.ib()


class BackupDatabaseReader:
def __init__(self, dump_all=False, dump_raw=False):
self.dump_all = dump_all
def __init__(self, dump_raw=False):
self.dump_raw = dump_raw

@staticmethod
def dump_raw(dev):
raw = {k: dev[k] for k in dev.keys()}
click.echo(pf(raw))
_LOGGER.info(pf(raw))

@staticmethod
def decrypt_ztoken(ztoken):
Expand All @@ -29,7 +42,7 @@ def decrypt_ztoken(ztoken):
return token.decode()

def read_apple(self):
click.echo("Reading tokens from Apple DB")
_LOGGER.info("Reading tokens from Apple DB")
c = self.conn.execute("SELECT * FROM ZDEVICE WHERE ZTOKEN IS NOT '';")
for dev in c.fetchall():
if self.dump_raw:
Expand All @@ -39,11 +52,12 @@ def read_apple(self):
model = dev['ZMODEL']
name = dev['ZNAME']
token = BackupDatabaseReader.decrypt_ztoken(dev['ZTOKEN'])
if ip or self.dump_all:
click.echo("%s\n\tModel: %s\n\tIP address: %s\n\tToken: %s\n\tMAC: %s" % (name, model, ip, token, mac))

config = DeviceConfig(name=name, mac=mac, ip=ip, model=model, token=token)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (86 > 79 characters)

yield config

def read_android(self):
click.echo("Reading tokens from Android DB")
_LOGGER.info("Reading tokens from Android DB")
c = self.conn.execute("SELECT * FROM devicerecord WHERE token IS NOT '';")
for dev in c.fetchall():
if self.dump_raw:
Expand All @@ -53,57 +67,77 @@ def read_android(self):
model = dev['model']
name = dev['name']
token = dev['token']
if ip or self.dump_all:
click.echo("%s\n\tModel: %s\n\tIP address: %s\n\tToken: %s\n\tMAC: %s" % (name, model, ip, token, mac))

def dump_to_file(self, fp):
fp.open()
self.db.seek(0) # go to the beginning
click.echo("Saving db to %s" % fp)
fp.write(self.db.read())
config = DeviceConfig(name=name, ip=ip, mac=mac,
model=model, token=token)
yield config

def read_tokens(self, db):
self.db = db
_LOGGER.info("Reading database from %s" % db)
self.conn = sqlite3.connect(db)

self.conn.row_factory = sqlite3.Row
with self.conn:
is_android = self.conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='devicerecord';").fetchone() is not None
is_apple = self.conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='ZDEVICE'").fetchone() is not None
if is_android:
self.read_android()
yield from self.read_android()
elif is_apple:
self.read_apple()
yield from self.read_apple()
else:
click.echo("Error, unknown database type!")
_LOGGER.error("Error, unknown database type!")


@click.command()
@click.argument('backup')
@click.option('--write-to-disk', type=click.File('wb'), help='writes sqlite3 db to a file for debugging')
@click.option('--dump-all', is_flag=True, default=False, help='dump devices without ip addresses')
@click.option('--write-to-disk', type=click.File('wb'),
help='writes sqlite3 db to a file for debugging')
@click.option('--password', type=str,
help='password if the android database is encrypted')
@click.option('--dump-all', is_flag=True, default=False,
help='dump devices without ip addresses')
@click.option('--dump-raw', is_flag=True, help='dumps raw rows')
def main(backup, write_to_disk, dump_all, dump_raw):
def main(backup, write_to_disk, password, dump_all, dump_raw):
"""Reads device information out from an sqlite3 DB.
If the given file is a .tar file, the file will be extracted
and the database automatically located (out of Android backups).
If the given file is an Android backup (.ab), the database
will be extracted automatically.
If the given file is an iOS backup, the tokens will be
extracted (and decrypted if needed) automatically.
"""
reader = BackupDatabaseReader(dump_all, dump_raw)
if backup.endswith(".tar"):

reader = BackupDatabaseReader(dump_raw)
if backup.endswith(".ab"):
DBFILE = "apps/com.xiaomi.smarthome/db/miio2.db"
with tarfile.open(backup) as f:
click.echo("Opened %s" % backup)
db = f.extractfile(DBFILE)
with tempfile.NamedTemporaryFile() as fp:
click.echo("Extracting to %s" % fp.name)
with AndroidBackup(backup) as f:
tar = f.read_data(password)
try:
db = tar.extractfile(DBFILE)
except KeyError as ex:
click.echo("Unable to extract the database file %s: %s" % (DBFILE, ex))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (87 > 79 characters)

return
if write_to_disk:
file = write_to_disk
else:
file = tempfile.NamedTemporaryFile()
with file as fp:
click.echo("Saving database to %s" % fp.name)
fp.write(db.read())
if write_to_disk:
reader.dump_to_file(write_to_disk)

reader.read_tokens(fp.name)
devices = list(reader.read_tokens(fp.name))
else:
reader.read_tokens(backup)
devices = list(reader.read_tokens(backup))

for dev in devices:
if dev.ip or dump_all:
click.echo("%s\n"
"\tModel: %s\n"
"\tIP address: %s\n"
"\tToken: %s\n"
"\tMAC: %s" % (dev.name, dev.model,
dev.ip, dev.token, dev.mac))


if __name__ == "__main__":
Expand Down