diff --git a/.travis.yml b/.travis.yml
index 8fcdb272..befcb768 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -11,7 +11,7 @@ install:
- sudo apt-get -qq update
- sudo pip install --upgrade -qq pip
- sudo apt-get -qq install cdparanoia cdrdao flac libcdio-dev libiso9660-dev libsndfile1-dev python-cddb python-gobject python-musicbrainzngs python-mutagen python-setuptools sox swig
- - sudo pip install pycdio
+ - sudo pip install pycdio requests
# Testing dependencies
- sudo apt-get -qq install python-twisted-core
diff --git a/README.md b/README.md
index 72195f07..e1dd7baf 100644
--- a/README.md
+++ b/README.md
@@ -75,6 +75,7 @@ Whipper relies on the following packages in order to run correctly and provide a
- [python-cddb](http://cddb-py.sourceforge.net/), for showing but not using metadata if disc not available in the MusicBrainz DB
- [pycdio](https://pypi.python.org/pypi/pycdio/) (to avoid bugs please use `pycdio` **0.20** & `libcdio` >= **0.90** or, with previous `libcdio` versions, `pycdio` **0.17**), for drive identification
- Required for drive offset and caching behavior to be stored in the configuration file
+- [requests](https://pypi.python.org/pypi/requests) for retrieving AccurateRip database entries
- [libsndfile](http://www.mega-nerd.com/libsndfile/), for reading wav files
- [flac](https://xiph.org/flac/), for reading flac files
- [sox](http://sox.sourceforge.net/), for track peak detection
diff --git a/whipper/command/accurip.py b/whipper/command/accurip.py
index 29b473db..a7b5b80e 100644
--- a/whipper/command/accurip.py
+++ b/whipper/command/accurip.py
@@ -21,7 +21,7 @@
import sys
from whipper.command.basecommand import BaseCommand
-from whipper.common import accurip
+from whipper.common.accurip import get_db_entry, ACCURATERIP_URL
import logging
logger = logging.getLogger(__name__)
@@ -38,20 +38,18 @@ def add_arguments(self):
help="accuraterip URL to load data from")
def do(self):
- url = self.options.url
- cache = accurip.AccuCache()
- responses = cache.retrieve(url)
+ responses = get_db_entry(self.options.url.lstrip(ACCURATERIP_URL))
- count = responses[0].trackCount
+ count = responses[0].num_tracks
sys.stdout.write("Found %d responses for %d tracks\n\n" % (
len(responses), count))
for (i, r) in enumerate(responses):
- if r.trackCount != count:
+ if r.num_tracks != count:
sys.stdout.write(
"Warning: response %d has %d tracks instead of %d\n" % (
- i, r.trackCount, count))
+ i, r.num_tracks, count))
# checksum and confidence by track
for track in range(count):
@@ -59,11 +57,11 @@ def do(self):
checksums = {}
for (i, r) in enumerate(responses):
- if r.trackCount != count:
+ if r.num_tracks != count:
continue
- assert len(r.checksums) == r.trackCount
- assert len(r.confidences) == r.trackCount
+ assert len(r.checksums) == r.num_tracks
+ assert len(r.confidences) == r.num_tracks
entry = {}
entry["confidence"] = r.confidences[track]
diff --git a/whipper/command/cd.py b/whipper/command/cd.py
index 037d9c6d..9c86de7b 100644
--- a/whipper/command/cd.py
+++ b/whipper/command/cd.py
@@ -19,16 +19,15 @@
# along with whipper. If not, see .
import argparse
+import cdio
import os
import glob
-import urllib2
-import socket
import sys
import logging
import gobject
from whipper.command.basecommand import BaseCommand
from whipper.common import (
- accurip, common, config, drive, program, task
+ accurip, config, drive, program, task
)
from whipper.program import cdrdao, cdparanoia, utils
from whipper.result import result
@@ -41,7 +40,7 @@
SILENT = 1e-10
MAX_TRIES = 5
-DEFAULT_TRACK_TEMPLATE = u'%r/%A - %d/%t. %a - %n'
+DEFAULT_TRACK_TEMPLATE = u'%r/%A - %d/%t. %a - %n.%x'
DEFAULT_DISC_TEMPLATE = u'%r/%A - %d/%A - %d'
TEMPLATE_DESCRIPTION = '''
@@ -68,12 +67,6 @@
class _CD(BaseCommand):
-
- """
- @type program: L{program.Program}
- @ivar eject: whether to eject the drive after completing
- """
-
eject = True
@staticmethod
@@ -150,21 +143,11 @@ def do(self):
"--cdr not passed")
return -1
- # FIXME ?????
- # Hackish fix for broken commit
- offset = 0
- info = drive.getDeviceInfo(self.device)
- if info:
- try:
- offset = self.config.getReadOffset(*info)
- except KeyError:
- pass
-
# now, read the complete index table, which is slower
self.itable = self.program.getTable(self.runner,
self.ittoc.getCDDBDiscId(),
self.ittoc.getMusicBrainzDiscId(),
- self.device, offset)
+ self.device, self.options.offset)
assert self.itable.getCDDBDiscId() == self.ittoc.getCDDBDiscId(), \
"full table's id %s differs from toc id %s" % (
@@ -174,10 +157,10 @@ def do(self):
"full table's mb id %s differs from toc id mb %s" % (
self.itable.getMusicBrainzDiscId(),
self.ittoc.getMusicBrainzDiscId())
- assert self.itable.getAccurateRipURL() == \
- self.ittoc.getAccurateRipURL(), \
+ assert self.itable.accuraterip_path() == \
+ self.ittoc.accuraterip_path(), \
"full table's AR URL %s differs from toc AR URL %s" % (
- self.itable.getAccurateRipURL(), self.ittoc.getAccurateRipURL())
+ self.itable.accuraterip_url(), self.ittoc.accuraterip_url())
if self.program.metadata:
self.program.metadata.discid = self.ittoc.getMusicBrainzDiscId()
@@ -200,15 +183,9 @@ def do(self):
self.program.result.title = self.program.metadata \
and self.program.metadata.title \
or 'Unknown Title'
- try:
- import cdio
- _, self.program.result.vendor, self.program.result.model, \
- self.program.result.release = \
- cdio.Device(self.device).get_hwinfo()
- except ImportError:
- raise ImportError("Pycdio module import failed.\n"
- "This is a hard dependency: if not "
- "available please install it")
+ _, self.program.result.vendor, self.program.result.model, \
+ self.program.result.release = \
+ cdio.Device(self.device).get_hwinfo()
self.doCommand()
@@ -348,41 +325,27 @@ def doCommand(self):
self.program.result.overread = self.options.overread
self.program.result.logger = self.options.logger
- # write disc files
- disambiguate = False
- while True:
- discName = self.program.getPath(self.program.outdir,
- self.options.disc_template,
- self.mbdiscid, 0,
- disambiguate=disambiguate)
- dirname = os.path.dirname(discName)
- if os.path.exists(dirname):
- sys.stdout.write("Output directory %s already exists\n" %
- dirname.encode('utf-8'))
- logs = glob.glob(os.path.join(dirname, '*.log'))
- if logs:
- sys.stdout.write(
- "Output directory %s is a finished rip\n" %
- dirname.encode('utf-8'))
- if not disambiguate:
- disambiguate = True
- continue
- return
- else:
- break
-
+ discName = self.program.getPath(self.program.outdir,
+ self.options.disc_template,
+ self.mbdiscid,
+ self.program.metadata)
+ dirname = os.path.dirname(discName)
+ if os.path.exists(dirname):
+ logs = glob.glob(os.path.join(dirname, '*.log'))
+ if logs:
+ msg = ("output directory %s is a finished rip" %
+ dirname.encode('utf-8'))
+ logger.critical(msg)
+ raise RuntimeError(msg)
else:
- sys.stdout.write("Creating output directory %s\n" %
+ sys.stdout.write("output directory %s already exists\n" %
dirname.encode('utf-8'))
- os.makedirs(dirname)
- break
-
- # FIXME: say when we're continuing a rip
- # FIXME: disambiguate if the pre-existing rip is different
+ print("creating output directory %s" % dirname.encode('utf-8'))
+ os.makedirs(dirname)
# FIXME: turn this into a method
- def ripIfNotRipped(number):
+ def _ripIfNotRipped(number):
logger.debug('ripIfNotRipped for track %d' % number)
# we can have a previous result
trackResult = self.program.result.getTrackResult(number)
@@ -395,9 +358,9 @@ def ripIfNotRipped(number):
path = self.program.getPath(self.program.outdir,
self.options.track_template,
- self.mbdiscid, number,
- disambiguate=disambiguate) \
- + '.' + 'flac'
+ self.mbdiscid,
+ self.program.metadata,
+ track_number=number)
logger.debug('ripIfNotRipped: path %r' % path)
trackResult.number = number
@@ -464,13 +427,11 @@ def ripIfNotRipped(number):
"track can't be ripped. "
"Rip attempts number is equal to 'MAX_TRIES'")
if trackResult.testcrc == trackResult.copycrc:
- sys.stdout.write('Checksums match for track %d\n' %
- number)
+ sys.stdout.write('CRCs match for track %d\n' % number)
else:
- sys.stdout.write(
- 'ERROR: checksums did not match for track %d\n' %
- number)
- raise
+ raise RuntimeError(
+ "CRCs did not match for track %d\n" % number
+ )
sys.stdout.write(
'Peak level: {:.2%} \n'.format(trackResult.peak))
@@ -503,113 +464,37 @@ def ripIfNotRipped(number):
self.program.saveRipResult()
# check for hidden track one audio
- htoapath = None
htoa = self.program.getHTOA()
if htoa:
start, stop = htoa
- sys.stdout.write(
- 'Found Hidden Track One Audio from frame %d to %d\n' % (
- start, stop))
-
- # rip it
- ripIfNotRipped(0)
- htoapath = self.program.result.tracks[0].filename
+ print('found Hidden Track One Audio from frame %d to %d' % (
+ start, stop))
+ _ripIfNotRipped(0)
for i, track in enumerate(self.itable.tracks):
# FIXME: rip data tracks differently
if not track.audio:
- sys.stdout.write(
- 'WARNING: skipping data track %d, not implemented\n' % (
- i + 1, ))
+ print 'skipping data track %d, not implemented' % (i + 1)
# FIXME: make it work for now
track.indexes[1].relative = 0
continue
-
- ripIfNotRipped(i + 1)
-
- # write disc files
- discName = self.program.getPath(self.program.outdir,
- self.options.disc_template,
- self.mbdiscid, 0,
- disambiguate=disambiguate)
- dirname = os.path.dirname(discName)
- if not os.path.exists(dirname):
- os.makedirs(dirname)
+ _ripIfNotRipped(i + 1)
logger.debug('writing cue file for %r', discName)
self.program.writeCue(discName)
- # write .m3u file
logger.debug('writing m3u file for %r', discName)
- m3uPath = u'%s.m3u' % discName
- handle = open(m3uPath, 'w')
- u = u'#EXTM3U\n'
- handle.write(u.encode('utf-8'))
-
- def writeFile(handle, path, length):
- targetPath = common.getRelativePath(path, m3uPath)
- u = u'#EXTINF:%d,%s\n' % (length, targetPath)
- handle.write(u.encode('utf-8'))
- u = '%s\n' % targetPath
- handle.write(u.encode('utf-8'))
-
- if htoapath:
- writeFile(handle, htoapath,
- self.itable.getTrackStart(1) / common.FRAMES_PER_SECOND)
+ self.program.write_m3u(discName)
- for i, track in enumerate(self.itable.tracks):
- if not track.audio:
- continue
-
- path = self.program.getPath(self.program.outdir,
- self.options.track_template,
- self.mbdiscid, i + 1,
- disambiguate=disambiguate
- ) + '.' + 'flac'
- writeFile(handle, path,
- (self.itable.getTrackLength(i + 1) /
- common.FRAMES_PER_SECOND))
-
- handle.close()
-
- # verify using accuraterip
- url = self.ittoc.getAccurateRipURL()
- sys.stdout.write("AccurateRip URL %s\n" % url)
-
- accucache = accurip.AccuCache()
try:
- responses = accucache.retrieve(url)
- except urllib2.URLError, e:
- if isinstance(e.args[0], socket.gaierror):
- if e.args[0].errno == -2:
- sys.stdout.write("Warning: network error: %r\n" % (
- e.args[0], ))
- responses = None
- else:
- raise
- else:
- raise
-
- if not responses:
- sys.stdout.write('Album not found in AccurateRip database\n')
-
- if responses:
- sys.stdout.write('%d AccurateRip reponses found\n' %
- len(responses))
-
- if responses[0].cddbDiscId != self.itable.getCDDBDiscId():
- sys.stdout.write(
- "AccurateRip response discid different: %s\n" %
- responses[0].cddbDiscId)
-
- self.program.verifyImage(self.runner, responses)
+ self.program.verifyImage(self.runner, self.ittoc)
+ except accurip.EntryNotFound:
+ print('AccurateRip entry not found')
- sys.stdout.write("\n".join(
- self.program.getAccurateRipResults()) + "\n")
+ accurip.print_report(self.program.result)
self.program.saveRipResult()
- # write log file
self.program.writeLog(discName, self.logger)
diff --git a/whipper/command/image.py b/whipper/command/image.py
index 510a56b0..25d4f6d2 100644
--- a/whipper/command/image.py
+++ b/whipper/command/image.py
@@ -117,16 +117,12 @@ def add_arguments(self):
def do(self):
prog = program.Program(config.Config())
runner = task.SyncRunner()
- cache = accurip.AccuCache()
for arg in self.options.cuefile:
arg = arg.decode('utf-8')
cueImage = image.Image(arg)
cueImage.setup(runner)
- url = cueImage.table.getAccurateRipURL()
- responses = cache.retrieve(url)
-
# FIXME: this feels like we're poking at internals.
prog.cuePath = arg
prog.result = result.RipResult()
@@ -135,9 +131,14 @@ def do(self):
tr.number = track.number
prog.result.tracks.append(tr)
- prog.verifyImage(runner, responses)
-
- print "\n".join(prog.getAccurateRipResults()) + "\n"
+ verified = False
+ try:
+ verified = prog.verifyImage(runner, cueImage.table)
+ except accurip.EntryNotFound:
+ print('AccurateRip entry not found')
+ accurip.print_report(prog.result)
+ if not verified:
+ sys.exit(1)
class Image(BaseCommand):
diff --git a/whipper/command/main.py b/whipper/command/main.py
index 553f7c87..6e025478 100644
--- a/whipper/command/main.py
+++ b/whipper/command/main.py
@@ -36,8 +36,13 @@ def main():
cmd.options.eject in ('failure', 'always')):
eject_device(e.device)
return 255
+ except RuntimeError, e:
+ print(e)
+ return 1
+ except KeyboardInterrupt:
+ return 2
except ImportError, e:
- raise ImportError(e)
+ raise
except task.TaskException, e:
if isinstance(e.exception, ImportError):
raise ImportError(e.exception)
diff --git a/whipper/command/offset.py b/whipper/command/offset.py
index 75207577..77585ee4 100644
--- a/whipper/command/offset.py
+++ b/whipper/command/offset.py
@@ -27,8 +27,7 @@
from whipper.command.basecommand import BaseCommand
from whipper.common import accurip, common, config, drive
from whipper.common import task as ctask
-from whipper.program import cdrdao, cdparanoia, utils
-from whipper.common import checksum
+from whipper.program import arc, cdrdao, cdparanoia, utils
from whipper.extern.task import task
gobject.threads_init()
@@ -92,27 +91,14 @@ def do(self):
table = t.table
logger.debug("CDDB disc id: %r", table.getCDDBDiscId())
- url = table.getAccurateRipURL()
- logger.debug("AccurateRip URL: %s", url)
-
- # FIXME: download url as a task too
- responses = []
- import urllib2
+ responses = None
try:
- handle = urllib2.urlopen(url)
- data = handle.read()
- responses = accurip.getAccurateRipResponses(data)
- except urllib2.HTTPError, e:
- if e.code == 404:
- sys.stdout.write(
- 'Album not found in AccurateRip database.\n')
- return 1
- else:
- raise
+ responses = accurip.get_db_entry(table.accuraterip_path())
+ except accurip.EntryNotFound:
+ print('Accuraterip entry not found')
if responses:
logger.debug('%d AccurateRip responses found.' % len(responses))
-
if responses[0].cddbDiscId != table.getCDDBDiscId():
logger.warning("AccurateRip response discid different: %s",
responses[0].cddbDiscId)
@@ -120,17 +106,19 @@ def do(self):
# now rip the first track at various offsets, calculating AccurateRip
# CRC, and matching it against the retrieved ones
- def match(archecksum, track, responses):
+ # archecksums is a tuple of accuraterip checksums: (v1, v2)
+ def match(archecksums, track, responses):
for i, r in enumerate(responses):
- if archecksum == r.checksums[track - 1]:
- return archecksum, i
+ for checksum in archecksums:
+ if checksum == r.checksums[track - 1]:
+ return checksum, i
return None, None
for offset in self._offsets:
sys.stdout.write('Trying read offset %d ...\n' % offset)
try:
- archecksum = self._arcs(runner, table, 1, offset)
+ archecksums = self._arcs(runner, table, 1, offset)
except task.TaskException, e:
# let MissingDependency fall through
@@ -149,9 +137,9 @@ def match(archecksum, track, responses):
'WARNING: cannot rip with offset %d...\n' % offset)
continue
- logger.debug('AR checksum calculated: %s' % archecksum)
+ logger.debug('AR checksums calculated: %s %s' % archecksums)
- c, i = match(archecksum, 1, responses)
+ c, i = match(archecksums, 1, responses)
if c:
count = 1
logger.debug('MATCHED against response %d' % i)
@@ -163,7 +151,7 @@ def match(archecksum, track, responses):
# last one (to avoid readers that can't do overread
for track in range(2, (len(table.tracks) + 1) - 1):
try:
- archecksum = self._arcs(runner, table, track, offset)
+ archecksums = self._arcs(runner, table, track, offset)
except task.TaskException, e:
if isinstance(e.exception, cdparanoia.FileSizeError):
sys.stdout.write(
@@ -171,7 +159,7 @@ def match(archecksum, track, responses):
offset)
continue
- c, i = match(archecksum, track, responses)
+ c, i = match(archecksums, track, responses)
if c:
logger.debug('MATCHED track %d against response %d' % (
track, i))
@@ -188,9 +176,8 @@ def match(archecksum, track, responses):
sys.stdout.write('No matching offset found.\n')
sys.stdout.write('Consider trying again with a different disc.\n')
- # TODO MW: Update this further for ARv2 code
def _arcs(self, runner, table, track, offset):
- # rips the track with the given offset, return the arcs checksum
+ # rips the track with the given offset, return the arcs checksums
logger.debug('Ripping track %r with offset %d ...', track, offset)
fd, path = tempfile.mkstemp(
@@ -207,15 +194,15 @@ def _arcs(self, runner, table, track, offset):
track, offset)
runner.run(t)
- # TODO MW: Update this to also use the v2 checksum(s)
- t = checksum.FastAccurateRipChecksumTask(path,
- trackNumber=track,
- trackCount=len(table.tracks),
- wave=True, v2=False)
- runner.run(t)
+ v1 = arc.accuraterip_checksum(
+ path, track, len(table.tracks), wave=True, v2=False
+ )
+ v2 = arc.accuraterip_checksum(
+ path, track, len(table.tracks), wave=True, v2=True
+ )
os.unlink(path)
- return "%08x" % t.checksum
+ return ("%08x" % v1, "%08x" % v2)
def _foundOffset(self, device, offset):
sys.stdout.write('\nRead offset of device is: %d.\n' %
diff --git a/whipper/common/accurip.py b/whipper/common/accurip.py
index 1ff2e03a..0c6c1ebe 100644
--- a/whipper/common/accurip.py
+++ b/whipper/common/accurip.py
@@ -1,6 +1,7 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_accurip -*-
# vi:si:et:sw=4:sts=4:ts=4
+# Copyright (C) 2017 Samantha Baldwin
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of whipper.
@@ -18,128 +19,261 @@
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see .
-import errno
-import os
+import requests
import struct
-import urlparse
-import urllib2
+from errno import EEXIST
+from os import makedirs
+from os.path import dirname, exists, join
from whipper.common import directory
+from whipper.program.arc import accuraterip_checksum
import logging
logger = logging.getLogger(__name__)
-_CACHE_DIR = directory.cache_path()
+ACCURATERIP_URL = "http://www.accuraterip.com/accuraterip/"
+_CACHE_DIR = join(directory.cache_path(), 'accurip')
-class AccuCache:
- def __init__(self):
- if not os.path.exists(_CACHE_DIR):
- logger.debug('Creating cache directory %s', _CACHE_DIR)
- os.makedirs(_CACHE_DIR)
+class EntryNotFound(Exception):
+ pass
- def _getPath(self, url):
- # split path starts with /
- return os.path.join(_CACHE_DIR, urlparse.urlparse(url)[2][1:])
- def retrieve(self, url, force=False):
- logger.debug("Retrieving AccurateRip URL %s", url)
- path = self._getPath(url)
- logger.debug("Cached path: %s", path)
- if force:
- logger.debug("forced to download")
- self.download(url)
- elif not os.path.exists(path):
- logger.debug("%s does not exist, downloading", path)
- self.download(url)
-
- if not os.path.exists(path):
- logger.debug("%s does not exist, not in database", path)
- return None
-
- data = self._read(url)
-
- return getAccurateRipResponses(data)
-
- def download(self, url):
- # FIXME: download url as a task too
- try:
- handle = urllib2.urlopen(url)
- data = handle.read()
-
- except urllib2.HTTPError, e:
- if e.code == 404:
- return None
- else:
- raise
-
- self._cache(url, data)
- return data
-
- def _cache(self, url, data):
- path = self._getPath(url)
- try:
- os.makedirs(os.path.dirname(path))
- except OSError, e:
- logger.debug('Could not make dir %s: %r' % (
- path, str(e)))
- if e.errno != errno.EEXIST:
- raise
-
- handle = open(path, 'wb')
- handle.write(data)
- handle.close()
-
- def _read(self, url):
- logger.debug("Reading %s from cache", url)
- path = self._getPath(url)
- handle = open(path, 'rb')
- data = handle.read()
- handle.close()
- return data
-
-
-def getAccurateRipResponses(data):
- ret = []
-
- while data:
- trackCount = struct.unpack("B", data[0])[0]
- nbytes = 1 + 12 + trackCount * (1 + 8)
-
- ret.append(AccurateRipResponse(data[:nbytes]))
- data = data[nbytes:]
-
- return ret
-
-
-class AccurateRipResponse(object):
+class _AccurateRipResponse(object):
"""
- I represent the response of the AccurateRip online database.
+ An AccurateRip response contains a collection of metadata identifying a
+ particular digital audio compact disc.
- @type checksums: list of str
- """
+ For disc level metadata it contains the track count, two internal disc
+ IDs, and the CDDB disc ID.
- trackCount = None
- discId1 = ""
- discId2 = ""
- cddbDiscId = ""
- confidences = None
- checksums = None
+ A checksum and a confidence score is stored sequentially for each track in
+ the disc index, which excludes any audio hidden in track pre-gaps (such as
+ HTOA).
+ The response is stored as a packed binary structure.
+ """
def __init__(self, data):
- self.trackCount = struct.unpack("B", data[0])[0]
+ """
+ The checksums and confidences arrays are indexed by relative track
+ position, so track 1 will have array index 0, track 2 will have array
+ index 1, and so forth. HTOA and other hidden tracks are not included.
+ """
+ self.num_tracks = struct.unpack("B", data[0])[0]
self.discId1 = "%08x" % struct.unpack(" track.AR[v]['DBConfidence']:
+ track.AR[v]['DBCRC'] = r.checksums[i]
+ track.AR[v]['DBConfidence'] = r.confidences[i]
+ logger.debug(
+ 'track %d matched response %s in AccurateRip'
+ ' database: %s crc %s confidence %s' %
+ (i, r.cddbDiscId, v, track.AR[v]['DBCRC'],
+ track.AR[v]['DBConfidence'])
+ )
+ return any((
+ all([t.AR['v1']['DBCRC'] for t in tracks]),
+ all([t.AR['v2']['DBCRC'] for t in tracks])
+ ))
+
+
+def verify_result(result, responses, checksums):
+ """
+ Verify track AccurateRip checksums against database responses.
+ Stores track checksums and database values on result.
+ """
+ if not (result and responses and checksums):
+ return False
+ # exclude HTOA from AccurateRip verification
+ # NOTE: if pre-gap hidden audio support is expanded to include
+ # tracks other than HTOA, this is invalid.
+ tracks = filter(lambda t: t.number != 0, result.tracks)
+ if not tracks:
+ return False
+ _assign_checksums_and_confidences(tracks, checksums, responses)
+ return _match_responses(tracks, responses)
+
+
+def print_report(result):
+ """
+ Print AccurateRip verification results to stdout.
+ """
+ for i, track in enumerate(result.tracks):
+ status = 'rip NOT accurate'
+ conf = '(not found)'
+ db = 'notfound'
+ if track.AR['DBMaxConfidence'] is not None:
+ db = track.AR['DBMaxConfidenceCRC']
+ conf = '(max confidence %3d)' % track.AR['DBMaxConfidence']
+ if track.AR['v1']['DBCRC'] or track.AR['v2']['DBCRC']:
+ status = 'rip accurate'
+ db = ', '.join(filter(None, (
+ track.AR['v1']['DBCRC'],
+ track.AR['v2']['DBCRC']
+ )))
+ max_conf = max(
+ [track.AR[v]['DBConfidence'] for v in ('v1', 'v2')]
+ )
+ if max_conf:
+ if max_conf < track.AR['DBMaxConfidence']:
+ conf = '(confidence %3d of %3d)' % (
+ max_conf, track.AR['DBMaxConfidence']
+ )
+ # htoa tracks (i == 0) do not have an ARCRC
+ if track.number == 0:
+ print('track 0: unknown (not tracked)')
+ continue
+ if not (track.AR['v1']['CRC'] or track.AR['v2']['CRC']):
+ logger.error(
+ 'no track AR CRC on non-HTOA track %d' % track.number
+ )
+ print('track %2d: unknown (error)' % track.number)
+ else:
+ print('track %2d: %-16s %-23s v1 [%s], v2 [%s], DB [%s]' % (
+ track.number, status, conf,
+ track.AR['v1']['CRC'], track.AR['v2']['CRC'], db
+ ))
diff --git a/whipper/common/checksum.py b/whipper/common/checksum.py
index 790d7377..56ed7b45 100644
--- a/whipper/common/checksum.py
+++ b/whipper/common/checksum.py
@@ -24,8 +24,6 @@
from whipper.extern.task import task as etask
-from whipper.program.arc import accuraterip_checksum
-
import logging
logger = logging.getLogger(__name__)
@@ -49,27 +47,3 @@ def _crc32(self):
self.checksum = binascii.crc32(d) & 0xffffffff
self.stop()
-
-
-class FastAccurateRipChecksumTask(etask.Task):
- description = 'Calculating (Fast) AccurateRip checksum'
-
- def __init__(self, path, trackNumber, trackCount, wave, v2=False):
- self.path = path
- self.trackNumber = trackNumber
- self.trackCount = trackCount
- self._wave = wave
- self._v2 = v2
- self.checksum = None
-
- def start(self, runner):
- etask.Task.start(self, runner)
- self.schedule(0.0, self._arc)
-
- def _arc(self):
- arc = accuraterip_checksum(self.path, self.trackNumber,
- self.trackCount,
- self._wave, self._v2)
- self.checksum = arc
-
- self.stop()
diff --git a/whipper/common/program.py b/whipper/common/program.py
index ea223de3..4a3f3fdb 100644
--- a/whipper/common/program.py
+++ b/whipper/common/program.py
@@ -23,12 +23,12 @@
"""
import musicbrainzngs
+import re
import os
import sys
import time
-from whipper.common import common, mbngs, cache, path
-from whipper.common import checksum
+from whipper.common import accurip, cache, checksum, common, mbngs, path
from whipper.program import cdrdao, cdparanoia
from whipper.image import image
from whipper.extern.task import task
@@ -178,34 +178,34 @@ def addDisambiguation(self, template_part, metadata):
template_part += ' (%s)' % metadata.barcode
return template_part
- def getPath(self, outdir, template, mbdiscid, i, disambiguate=False):
+ def getPath(self, outdir, template, mbdiscid, metadata, track_number=None):
"""
- Based on the template, get a complete path for the given track,
- minus extension.
- Also works for the disc name, using disc variables for the template.
-
- @param outdir: the directory where to write the files
- @type outdir: unicode
- @param template: the template for writing the file
- @type template: unicode
- @param i: track number (0 for HTOA, or for disc)
- @type i: int
-
- @rtype: unicode
+ Return disc or track path relative to outdir according to
+ template. Track paths do not include extension.
+
+ Tracks are named according to the track template, filling in
+ the variables and adding the file extension. Variables
+ exclusive to the track template are:
+ - %t: track number
+ - %a: track artist
+ - %n: track title
+ - %s: track sort name
+
+ Disc files (.cue, .log, .m3u) are named according to the disc
+ template, filling in the variables and adding the file
+ extension. Variables for both disc and track template are:
+ - %A: album artist
+ - %S: album sort name
+ - %d: disc title
+ - %y: release year
+ - %r: release type, lowercase
+ - %R: Release type, normal case
+ - %x: audio extension, lowercase
+ - %X: audio extension, uppercase
"""
assert type(outdir) is unicode, "%r is not unicode" % outdir
assert type(template) is unicode, "%r is not unicode" % template
-
- # the template is similar to grip, except for %s/%S/%r/%R
- # see #gripswitches
-
- # returns without extension
-
v = {}
-
- v['t'] = '%02d' % i
-
- # default values
v['A'] = 'Unknown Artist'
v['d'] = mbdiscid # fallback for title
v['r'] = 'unknown'
@@ -215,59 +215,38 @@ def getPath(self, outdir, template, mbdiscid, i, disambiguate=False):
v['x'] = 'flac'
v['X'] = v['x'].upper()
v['y'] = '0000'
+ if track_number is not None:
+ v['a'] = v['A']
+ v['t'] = '%02d' % track_number
+ if track_number == 0:
+ v['n'] = 'Hidden Track One Audio'
+ else:
+ v['n'] = 'Unknown Track %d' % track_number
- v['a'] = v['A']
- if i == 0:
- v['n'] = 'Hidden Track One Audio'
- else:
- v['n'] = 'Unknown Track %d' % i
-
- if self.metadata:
- release = self.metadata.release or '0000'
+ if metadata:
+ release = metadata.release or '0000'
v['y'] = release[:4]
- v['A'] = self._filter.filter(self.metadata.artist)
- v['S'] = self._filter.filter(self.metadata.sortName)
- v['d'] = self._filter.filter(self.metadata.title)
- v['B'] = self.metadata.barcode
- v['C'] = self.metadata.catalogNumber
- if self.metadata.releaseType:
- v['R'] = self.metadata.releaseType
- v['r'] = self.metadata.releaseType.lower()
- if i > 0:
- try:
- v['a'] = self._filter.filter(
- self.metadata.tracks[i - 1].artist)
- v['s'] = self._filter.filter(
- self.metadata.tracks[i - 1].sortName)
- v['n'] = self._filter.filter(
- self.metadata.tracks[i - 1].title)
- except IndexError, e:
- print 'ERROR: no track %d found, %r' % (i, e)
- raise
- else:
+ v['A'] = self._filter.filter(metadata.artist)
+ v['S'] = self._filter.filter(metadata.sortName)
+ v['d'] = self._filter.filter(metadata.title)
+ v['B'] = metadata.barcode
+ v['C'] = metadata.catalogNumber
+ if metadata.releaseType:
+ v['R'] = metadata.releaseType
+ v['r'] = metadata.releaseType.lower()
+ if track_number > 0:
+ v['a'] = self._filter.filter(
+ metadata.tracks[track_number - 1].artist)
+ v['s'] = self._filter.filter(
+ metadata.tracks[track_number - 1].sortName)
+ v['n'] = self._filter.filter(
+ metadata.tracks[track_number - 1].title)
+ elif track_number == 0:
# htoa defaults to disc's artist
- v['a'] = self._filter.filter(self.metadata.artist)
-
- # when disambiguating, use catalogNumber then barcode
- if disambiguate:
- templateParts = template.split(os.sep)
- # Find the section of the template with the release name
- for i, part in enumerate(templateParts):
- if "%d" in part:
- templateParts[i] = self.addDisambiguation(part, self.metadata) # noqa: E501
- break
- else:
- # No parts of the template contain the release
- templateParts[-1] = self.addDisambiguation(templateParts[-1], self.metadata) # noqa: E501
- template = os.path.join(*templateParts)
- logger.debug('Disambiguated template to %r' % template)
+ v['a'] = self._filter.filter(metadata.artist)
- import re
template = re.sub(r'%(\w)', r'%(\1)s', template)
-
- ret = os.path.join(outdir, template % v)
-
- return ret
+ return os.path.join(outdir, template % v)
def getCDDB(self, cddbdiscid):
"""
@@ -579,118 +558,55 @@ def retagImage(self, runner, taglists):
t = image.ImageRetagTask(cueImage, taglists)
runner.run(t)
- def verifyImage(self, runner, responses):
+ def verifyImage(self, runner, table):
"""
+ verify table against accuraterip and cue_path track lengths
Verify our image against the given AccurateRip responses.
Needs an initialized self.result.
Will set accurip and friends on each TrackResult.
- """
-
- logger.debug('verifying Image against %d AccurateRip responses',
- len(responses or []))
+ Populates self.result.tracks with above TrackResults.
+ """
cueImage = image.Image(self.cuePath)
+ # assigns track lengths
verifytask = image.ImageVerifyTask(cueImage)
- cuetask = image.AccurateRipChecksumTask(cueImage)
runner.run(verifytask)
- runner.run(cuetask)
-
- self._verifyImageWithChecksums(responses, cuetask.checksums)
-
- def _verifyImageWithChecksums(self, responses, checksums):
- # loop over tracks to set our calculated AccurateRip CRC's
- for i, csum in enumerate(checksums):
- trackResult = self.result.getTrackResult(i + 1)
- trackResult.ARCRC = csum
-
- if not responses:
- logger.warning('No AccurateRip responses, cannot verify.')
- return
-
- # now loop to match responses
- for i, csum in enumerate(checksums):
- trackResult = self.result.getTrackResult(i + 1)
-
- confidence = None
- response = None
-
- # match against each response's checksum for this track
- for j, r in enumerate(responses):
- if "%08x" % csum == r.checksums[i]:
- response = r
- logger.debug(
- "Track %02d matched response %d of %d in "
- "AccurateRip database",
- i + 1, j + 1, len(responses))
- trackResult.accurip = True
- # FIXME: maybe checksums should be ints
- trackResult.ARDBCRC = int(r.checksums[i], 16)
- # arsum = csum
- confidence = r.confidences[i]
- trackResult.ARDBConfidence = confidence
-
- if not trackResult.accurip:
- logger.warning("Track %02d: not matched in "
- "AccurateRip database", i + 1)
-
- # I have seen AccurateRip responses with 0 as confidence
- # for example, Best of Luke Haines, disc 1, track 1
- maxConfidence = -1
- maxResponse = None
- for r in responses:
- if r.confidences[i] > maxConfidence:
- maxConfidence = r.confidences[i]
- maxResponse = r
-
- logger.debug('Track %02d: found max confidence %d' % (
- i + 1, maxConfidence))
- trackResult.ARDBMaxConfidence = maxConfidence
- if not response:
- logger.warning('Track %02d: none of the responses matched.',
- i + 1)
- trackResult.ARDBCRC = int(
- maxResponse.checksums[i], 16)
- else:
- trackResult.ARDBCRC = int(response.checksums[i], 16)
-
- # TODO MW: Update this further for ARv2 code
- def getAccurateRipResults(self):
- """
- @rtype: list of str
- """
- res = []
-
- # loop over tracks
- for i, trackResult in enumerate(self.result.tracks):
- status = 'rip NOT accurate'
-
- if trackResult.accurip:
- status = 'rip accurate '
-
- c = "(not found) "
- ar = ", DB [notfound]"
- if trackResult.ARDBMaxConfidence:
- c = "(max confidence %3d)" % trackResult.ARDBMaxConfidence
- if trackResult.ARDBConfidence is not None:
- if trackResult.ARDBConfidence \
- < trackResult.ARDBMaxConfidence:
- c = "(confidence %3d of %3d)" % (
- trackResult.ARDBConfidence,
- trackResult.ARDBMaxConfidence)
-
- ar = ", DB [%08x]" % trackResult.ARDBCRC
- # htoa tracks (i == 0) do not have an ARCRC
- if trackResult.ARCRC is None:
- assert trackResult.number == 0, \
- 'no trackResult.ARCRC on non-HTOA track %d' % \
- trackResult.number
- res.append("Track 0: unknown (not tracked)")
- else:
- res.append("Track %2d: %s %s [%08x]%s" % (
- trackResult.number, status, c, trackResult.ARCRC, ar))
-
- return res
+ if verifytask.exception:
+ logger.error(verifytask.exceptionMessage)
+ return False
+
+ responses = accurip.get_db_entry(table.accuraterip_path())
+ logger.info('%d AccurateRip response(s) found' % len(responses))
+
+ checksums = accurip.calculate_checksums([
+ os.path.join(os.path.dirname(self.cuePath), t.indexes[1].path)
+ for t in filter(lambda t: t.number != 0, cueImage.cue.table.tracks)
+ ])
+ if not (checksums and any(checksums['v1']) and any(checksums['v2'])):
+ return False
+ return accurip.verify_result(self.result, responses, checksums)
+
+ def write_m3u(self, discname):
+ m3uPath = u'%s.m3u' % discname
+ with open(m3uPath, 'w') as f:
+ f.write(u'#EXTM3U\n'.encode('utf-8'))
+ for track in self.result.tracks:
+ if not track.filename:
+ # false positive htoa
+ continue
+ if track.number == 0:
+ length = (self.result.table.getTrackStart(1) /
+ common.FRAMES_PER_SECOND)
+ else:
+ length = (self.result.table.getTrackLength(track.number) /
+ common.FRAMES_PER_SECOND)
+
+ target_path = common.getRelativePath(track.filename, m3uPath)
+ u = u'#EXTINF:%d,%s\n' % (length, target_path)
+ f.write(u.encode('utf-8'))
+ u = '%s\n' % target_path
+ f.write(u.encode('utf-8'))
def writeCue(self, discName):
assert self.result.table.canCue()
diff --git a/whipper/image/image.py b/whipper/image/image.py
index dfa7d66e..95549611 100644
--- a/whipper/image/image.py
+++ b/whipper/image/image.py
@@ -26,7 +26,6 @@
from whipper.common import encode
from whipper.common import common
-from whipper.common import checksum
from whipper.image import cue, table
from whipper.extern.task import task
from whipper.program.soxi import AudioLengthTask
@@ -108,47 +107,6 @@ def setup(self, runner):
logger.debug('setup image done')
-class AccurateRipChecksumTask(task.MultiSeparateTask):
- """
- I calculate the AccurateRip checksums of all tracks.
- """
-
- description = "Checksumming tracks"
-
- # TODO MW: Update this further for V2 code
- def __init__(self, image):
- task.MultiSeparateTask.__init__(self)
-
- self._image = image
- cue = image.cue
- self.checksums = []
-
- logger.debug('Checksumming %d tracks' % len(cue.table.tracks))
- for trackIndex, track in enumerate(cue.table.tracks):
- index = track.indexes[1]
- length = cue.getTrackLength(track)
- if length < 0:
- logger.debug('track %d has unknown length' %
- (trackIndex + 1, ))
- else:
- logger.debug('track %d is %d samples long' % (
- trackIndex + 1, length))
-
- path = image.getRealPath(index.path)
-
- checksumTask = checksum.FastAccurateRipChecksumTask(
- path,
- trackNumber=trackIndex + 1,
- trackCount=len(cue.table.tracks),
- wave=True, v2=False)
-
- self.addTask(checksumTask)
-
- def stop(self):
- self.checksums = [t.checksum for t in self.tasks]
- task.MultiSeparateTask.stop(self)
-
-
class ImageVerifyTask(task.MultiSeparateTask):
"""
I verify a disk image and get the necessary track lengths.
diff --git a/whipper/image/table.py b/whipper/image/table.py
index 3c8cd4f8..28dd04eb 100644
--- a/whipper/image/table.py
+++ b/whipper/image/table.py
@@ -475,51 +475,6 @@ def _getMusicBrainzValues(self):
logger.debug('MusicBrainz values: %r', result)
return result
- def getAccurateRipIds(self):
- """
- Calculate the two AccurateRip ID's.
-
- @returns: the two 8-character hexadecimal disc ID's
- @rtype: tuple of (str, str)
- """
- # AccurateRip does not take into account data tracks,
- # but does count the data track to determine the leadout offset
- discId1 = 0
- discId2 = 0
-
- for track in self.tracks:
- if not track.audio:
- continue
- offset = self.getTrackStart(track.number)
- discId1 += offset
- discId2 += (offset or 1) * track.number
-
- # also add end values, where leadout offset is one past the end
- # of the last track
- last = self.tracks[-1]
- offset = self.getTrackEnd(last.number) + 1
- discId1 += offset
- discId2 += offset * (self.getAudioTracks() + 1)
-
- discId1 &= 0xffffffff
- discId2 &= 0xffffffff
-
- return ("%08x" % discId1, "%08x" % discId2)
-
- def getAccurateRipURL(self):
- """
- Return the full AccurateRip URL.
-
- @returns: the AccurateRip URL
- @rtype: str
- """
- discId1, discId2 = self.getAccurateRipIds()
-
- return "http://www.accuraterip.com/accuraterip/" \
- "%s/%s/%s/dBAR-%.3d-%s-%s-%s.bin" % (
- discId1[-1], discId1[-2], discId1[-3],
- self.getAudioTracks(), discId1, discId2, self.getCDDBDiscId())
-
def cue(self, cuePath='', program='whipper'):
"""
@param cuePath: path to the cue file to be written. If empty,
@@ -851,6 +806,41 @@ def hasTOC(self):
return True
+ def accuraterip_ids(self):
+ """
+ returns both AccurateRip disc ids as a tuple of 8-char
+ hexadecimal strings (discid1, discid2)
+ """
+ # AccurateRip does not take into account data tracks,
+ # but does count the data track to determine the leadout offset
+ discId1 = 0
+ discId2 = 0
+
+ for track in self.tracks:
+ if not track.audio:
+ continue
+ offset = self.getTrackStart(track.number)
+ discId1 += offset
+ discId2 += (offset or 1) * track.number
+
+ # also add end values, where leadout offset is one past the end
+ # of the last track
+ offset = self.getTrackEnd(self.tracks[-1].number) + 1
+ discId1 += offset
+ discId2 += offset * (self.getAudioTracks() + 1)
+
+ discId1 &= 0xffffffff
+ discId2 &= 0xffffffff
+
+ return ("%08x" % discId1, "%08x" % discId2)
+
+ def accuraterip_path(self):
+ discId1, discId2 = self.accuraterip_ids()
+ return "%s/%s/%s/dBAR-%.3d-%s-%s-%s.bin" % (
+ discId1[-1], discId1[-2], discId1[-3],
+ self.getAudioTracks(), discId1, discId2, self.getCDDBDiscId()
+ )
+
def canCue(self):
"""
Check if this table can be used to generate a .cue file
diff --git a/whipper/program/arc.py b/whipper/program/arc.py
index 02ba8662..b5f41adb 100644
--- a/whipper/program/arc.py
+++ b/whipper/program/arc.py
@@ -7,21 +7,26 @@
FLAC = 'flac'
-def accuraterip_checksum(f, track, tracks, wave=False, v2=False):
+def _execute(cmd, **redirects):
+ logger.debug('executing %r', cmd)
+ return Popen(cmd, **redirects)
+
+
+def accuraterip_checksum(f, track_number, total_tracks, wave=False, v2=False):
v = '--accuraterip-v1'
if v2:
v = '--accuraterip-v2'
- track, tracks = str(track), str(tracks)
+ track_number, total_tracks = str(track_number), str(total_tracks)
- if not wave:
- flac = Popen([FLAC, '-cds', f], stdout=PIPE)
-
- arc = Popen([ARB, v, '/dev/stdin', track, tracks],
- stdin=flac.stdout, stdout=PIPE, stderr=PIPE)
+ if wave:
+ cmd = [ARB, v, f, track_number, total_tracks]
+ redirects = dict(stdout=PIPE, stderr=PIPE)
else:
- arc = Popen([ARB, v, f, track, tracks],
- stdout=PIPE, stderr=PIPE)
+ flac = _execute([FLAC, '-cds', f], stdout=PIPE)
+ cmd = [ARB, v, '/dev/stdin', track_number, total_tracks]
+ redirects = dict(stdin=flac.stdout, stdout=PIPE, stderr=PIPE)
+ arc = _execute(cmd, **redirects)
if not wave:
flac.stdout.close()
@@ -30,23 +35,24 @@ def accuraterip_checksum(f, track, tracks, wave=False, v2=False):
if not wave:
flac.wait()
- flac_rc = flac.returncode
-
- arc_rc = arc.returncode
-
- if not wave and flac_rc != 0:
- logger.warning('ARC calculation failed: flac return code is non zero')
- return None
-
- if arc_rc != 0:
- logger.warning('ARC calculation failed: arc return code is non zero')
+ if flac.returncode != 0:
+ logger.warning(
+ 'ARC calculation failed: flac return code is non zero: %r' %
+ flac.returncode
+ )
+ return None
+
+ if arc.returncode != 0:
+ logger.warning(
+ 'ARC calculation failed: arc return code is non zero: %r' %
+ arc.returncode
+ )
return None
- out = out.strip()
try:
- outh = int('0x%s' % out, base=16)
+ checksum = int('0x%s' % out.strip(), base=16)
+ logger.debug('returned %r', checksum)
+ return checksum
except ValueError:
logger.warning('ARC output is not usable')
return None
-
- return outh
diff --git a/whipper/result/logger.py b/whipper/result/logger.py
index 6a4b0bc1..e5eaabc3 100644
--- a/whipper/result/logger.py
+++ b/whipper/result/logger.py
@@ -202,23 +202,29 @@ def trackLog(self, trackResult):
lines.append(" Copy CRC: %08X" % trackResult.copycrc)
# AccurateRip track status
- # Currently there's no support for AccurateRip V2
- if trackResult.accurip:
- lines.append(" AccurateRip V1:")
- self._inARDatabase += 1
- if trackResult.ARCRC == trackResult.ARDBCRC:
- lines.append(" Result: Found, exact match")
- self._accuratelyRipped += 1
- else:
- lines.append(" Result: Found, NO exact match")
- lines.append(" Confidence: %d" %
- trackResult.ARDBConfidence)
- lines.append(" Local CRC: %08X" % trackResult.ARCRC)
- lines.append(" Remote CRC: %08X" % trackResult.ARDBCRC)
- elif trackResult.number != 0:
- lines.append(" AccurateRip V1:")
- lines.append(" Result: Track not present in "
- "AccurateRip database")
+ for v in ('v1', 'v2'):
+ if trackResult.AR[v]['DBCRC']:
+ lines.append(" AccurateRip %s:" % v)
+ self._inARDatabase += 1
+ if trackResult.AR[v]['CRC'] == trackResult.AR[v]['DBCRC']:
+ lines.append(" Result: Found, exact match")
+ self._accuratelyRipped += 1
+ else:
+ lines.append(" Result: Found, NO exact match")
+ lines.append(
+ " Confidence: %d" % trackResult.AR[v]['DBConfidence']
+ )
+ lines.append(
+ " Local CRC: %s" % trackResult.AR[v]['CRC'].upper()
+ )
+ lines.append(
+ " Remote CRC: %s" % trackResult.AR[v]['DBCRC'].upper()
+ )
+ elif trackResult.number != 0:
+ lines.append(" AccurateRip %s:" % v)
+ lines.append(
+ " Result: Track not present in AccurateRip database"
+ )
# Check if Test & Copy CRCs are equal
if trackResult.testcrc == trackResult.copycrc:
diff --git a/whipper/result/result.py b/whipper/result/result.py
index f7250a7a..f56d91e3 100644
--- a/whipper/result/result.py
+++ b/whipper/result/result.py
@@ -23,56 +23,46 @@
class TrackResult:
- """
- @type filename: unicode
- @ivar testcrc: 4-byte CRC for the test read
- @type testcrc: int
- @ivar copycrc: 4-byte CRC for the copy read
- @type copycrc: int
-
- @var accurip: whether this track's AR CRC was found in the
- database, and thus whether the track is considered
- accurately ripped.
- If false, it can be ripped wrong, not exist in
- the database, ...
- @type accurip: bool
-
- @var ARCRC: our calculated 4 byte AccurateRip CRC for this
- track.
- @type ARCRC: int
-
- @var ARDBCRC: the 4-byte AccurateRip CRC this
- track did or should have matched in the database.
- If None, the track is not in the database.
- @type ARDBCRC: int
- @var ARDBConfidence: confidence for the matched AccurateRip CRC for
- this track in the database.
- If None, the track is not in the database.
- @var ARDBMaxConfidence: maximum confidence in the AccurateRip database for
- this track; can still be 0.
- If None, the track is not in the database.
- """
number = None
filename = None
pregap = 0 # in frames
pre_emphasis = None
-
peak = 0.0
quality = 0.0
testspeed = 0.0
copyspeed = 0.0
testduration = 0.0
copyduration = 0.0
+ # 4 byte CRCs for the test and copy reads
testcrc = None
copycrc = None
- accurip = False # whether it's in the database
- ARCRC = None
- ARDBCRC = None
- ARDBConfidence = None
- ARDBMaxConfidence = None
-
+ AR = None
classVersion = 3
+ def __init__(self):
+ """
+ CRC: calculated 4 byte AccurateRip CRC
+ DBCRC: 4 byte AccurateRip CRC from the AR database
+ DBConfidence: confidence for the matched AccurateRip DB CRC
+
+ DBMaxConfidence: track's maximum confidence in the AccurateRip DB
+ DBMaxConfidenceCRC: maximum confidence CRC
+ """
+ self.AR = {
+ 'v1': {
+ 'CRC': None,
+ 'DBCRC': None,
+ 'DBConfidence': None,
+ },
+ 'v2': {
+ 'CRC': None,
+ 'DBCRC': None,
+ 'DBConfidence': None,
+ },
+ 'DBMaxConfidence': None,
+ 'DBMaxConfidenceCRC': None,
+ }
+
class RipResult:
"""
diff --git a/whipper/test/dBAR-002-0000f21c-00027ef8-05021002.bin b/whipper/test/dBAR-002-0000f21c-00027ef8-05021002.bin
new file mode 100644
index 00000000..6ff761c6
Binary files /dev/null and b/whipper/test/dBAR-002-0000f21c-00027ef8-05021002.bin differ
diff --git a/whipper/test/test_common_accurip.py b/whipper/test/test_common_accurip.py
index 9affb5b0..316ab93d 100644
--- a/whipper/test/test_common_accurip.py
+++ b/whipper/test/test_common_accurip.py
@@ -1,31 +1,316 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_accurip -*-
# vi:si:et:sw=4:sts=4:ts=4
-import os
+import sys
+from StringIO import StringIO
+from os import chmod, makedirs
+from os.path import dirname, exists, join
+from shutil import copy, rmtree
+from tempfile import mkdtemp
+from unittest import TestCase
from whipper.common import accurip
+from whipper.common.accurip import (
+ calculate_checksums, get_db_entry, print_report, verify_result,
+ _split_responses, EntryNotFound
+)
+from whipper.result.result import RipResult, TrackResult
-from whipper.test import common as tcommon
+class TestAccurateRipResponse(TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.path = 'c/1/2/dBAR-002-0000f21c-00027ef8-05021002.bin'
+ cls.entry = _split_responses(
+ open(join(dirname(__file__), cls.path[6:])).read()
+ )
+ cls.other_path = '4/8/2/dBAR-011-0010e284-009228a3-9809ff0b.bin'
-class AccurateRipResponseTestCase(tcommon.TestCase):
+ def setUp(self):
+ self.cache_dir = mkdtemp(suffix='whipper_accurip_cache_test')
+ accurip._CACHE_DIR = self.cache_dir
- def testResponse(self):
- path = os.path.join(os.path.dirname(__file__),
- 'dBAR-011-0010e284-009228a3-9809ff0b.bin')
- data = open(path, "rb").read()
+ def cleanup(cachedir):
+ chmod(cachedir, 0755)
+ rmtree(cachedir)
+ self.addCleanup(cleanup, self.cache_dir)
- responses = accurip.getAccurateRipResponses(data)
- self.assertEquals(len(responses), 3)
+ def test_uses_cache_dir(self):
+ # copy normal entry into other entry's place
+ makedirs(dirname(join(self.cache_dir, self.other_path)))
+ copy(
+ join(dirname(__file__), self.path[6:]),
+ join(self.cache_dir, self.other_path)
+ )
+ # ask cache for other entry and assert cached entry equals normal entry
+ self.assertEquals(self.entry, get_db_entry(self.other_path))
- response = responses[0]
+ def test_raises_entrynotfound_for_no_entry(self):
+ with self.assertRaises(EntryNotFound):
+ get_db_entry('definitely_a_404')
- self.assertEquals(response.trackCount, 11)
- self.assertEquals(response.discId1, "0010e284")
- self.assertEquals(response.discId2, "009228a3")
- self.assertEquals(response.cddbDiscId, "9809ff0b")
+ def test_can_return_entry_without_saving(self):
+ chmod(self.cache_dir, 0)
+ self.assertEqual(get_db_entry(self.path), self.entry)
+ chmod(self.cache_dir, 0755)
+ self.assertFalse(exists(join(self.cache_dir, self.path)))
- for i in range(11):
- self.assertEquals(response.confidences[i], 35)
- self.assertEquals(response.checksums[0], "beea32c8")
- self.assertEquals(response.checksums[10], "acee98ca")
+ def test_retrieves_and_saves_accuraterip_entry(self):
+ # for path, entry in zip(self.paths[0], self.entries):
+ self.assertFalse(exists(join(self.cache_dir, self.path)))
+ self.assertEquals(get_db_entry(self.path), self.entry)
+ self.assertTrue(exists(join(self.cache_dir, self.path)))
+
+ def test_AccurateRipResponse_parses_correctly(self):
+ responses = get_db_entry(self.path)
+ self.assertEquals(len(responses), 2)
+
+ self.assertEquals(responses[0].num_tracks, 2)
+ self.assertEquals(responses[0].discId1, '0000f21c')
+ self.assertEquals(responses[0].discId2, '00027ef8')
+ self.assertEquals(responses[0].cddbDiscId, '05021002')
+ self.assertEquals(responses[0].confidences[0], 12)
+ self.assertEquals(responses[0].confidences[1], 20)
+ self.assertEquals(responses[0].checksums[0], '284fc705')
+ self.assertEquals(responses[0].checksums[1], '9cc1f32e')
+
+ self.assertEquals(responses[1].num_tracks, 2)
+ self.assertEquals(responses[1].discId1, '0000f21c')
+ self.assertEquals(responses[1].discId2, '00027ef8')
+ self.assertEquals(responses[1].cddbDiscId, '05021002')
+ self.assertEquals(responses[1].confidences[0], 4)
+ self.assertEquals(responses[1].confidences[1], 4)
+ self.assertEquals(responses[1].checksums[0], 'dc77f9ab')
+ self.assertEquals(responses[1].checksums[1], 'dd97d2c3')
+
+# XXX: test arc.py
+
+
+class TestCalculateChecksums(TestCase):
+ def test_returns_none_for_bad_files(self):
+ self.assertEquals(
+ calculate_checksums(['/does/not/exist']),
+ {'v1': [None], 'v2': [None]}
+ )
+
+ # TODO: test success when file exists
+
+
+class TestVerifyResult(TestCase):
+ @classmethod
+ def setUpClass(cls):
+ path = 'c/1/2/dBAR-002-0000f21c-00027ef8-05021002.bin'
+ cls.responses = _split_responses(
+ open(join(dirname(__file__), path[6:])).read()
+ )
+ cls.checksums = {
+ 'v1': ['284fc705', '9cc1f32e'],
+ 'v2': ['dc77f9ab', 'dd97d2c3'],
+ }
+
+ def setUp(self):
+ self.result = RipResult()
+ for n in range(1, 2+1):
+ track = TrackResult()
+ track.number = n
+ self.result.tracks.append(track)
+
+ def test_empty_result_returns_false(self):
+ self.assertEquals(
+ verify_result(RipResult(), self.responses, self.checksums),
+ False
+ )
+
+ def test_empty_responses_returns_false(self):
+ self.assertEquals(
+ verify_result(self.result, [], self.checksums),
+ False
+ )
+
+ # XXX: would this happen?
+ def test_empty_checksums_returns_false(self):
+ self.assertEquals(
+ verify_result(self.result, self.responses, {}),
+ False
+ )
+
+ def test_wrong_checksums_returns_false(self):
+ self.assertEquals(
+ verify_result(self.result, self.responses, {
+ 'v1': ['deadbeef', '89abcdef'],
+ 'v2': ['76543210', '01234567']
+ }),
+ False
+ )
+
+ def test_incomplete_checksums(self):
+ self.assertEquals(
+ verify_result(self.result, self.responses, {
+ 'v1': ['284fc705', '9cc1f32e'],
+ 'v2': [None, 'dd97d2c3'],
+ }),
+ True
+ )
+ self.assertEquals(
+ verify_result(self.result, self.responses, {
+ 'v1': ['284fc705', None],
+ 'v2': ['dc77f9ab', 'dd97d2c3'],
+ }),
+ True
+ )
+ self.assertEquals(
+ verify_result(self.result, self.responses, {
+ 'v1': ['284fc705', None],
+ 'v2': [None, 'dd97d2c3'],
+ }),
+ True
+ )
+
+ def test_matches_only_v1_or_v2_responses(self):
+ self.assertEquals(
+ verify_result(
+ self.result, [self.responses[0]], self.checksums
+ ),
+ True
+ )
+ self.assertEquals(
+ verify_result(
+ self.result, [self.responses[1]], self.checksums
+ ),
+ True
+ )
+
+ def test_passes_with_htoa(self):
+ htoa = TrackResult()
+ htoa.number = 0
+ self.result.tracks.append(htoa)
+ self.assertEquals(
+ verify_result(self.result, self.responses, self.checksums),
+ True
+ )
+
+ def test_stores_accuraterip_results_on_result(self):
+ self.assertEquals(
+ verify_result(self.result, self.responses, self.checksums),
+ True
+ )
+ self.assertEquals(self.result.tracks[0].AR, {
+ 'v1': {
+ 'CRC': '284fc705',
+ 'DBCRC': '284fc705',
+ 'DBConfidence': 12,
+ },
+ 'v2': {
+ 'CRC': 'dc77f9ab',
+ 'DBCRC': 'dc77f9ab',
+ 'DBConfidence': 4,
+ },
+ 'DBMaxConfidence': 12,
+ 'DBMaxConfidenceCRC': '284fc705',
+ })
+ self.assertEquals(self.result.tracks[1].AR, {
+ 'v1': {
+ 'CRC': '9cc1f32e',
+ 'DBCRC': '9cc1f32e',
+ 'DBConfidence': 20,
+ },
+ 'v2': {
+ 'CRC': 'dd97d2c3',
+ 'DBCRC': 'dd97d2c3',
+ 'DBConfidence': 4,
+ },
+ 'DBMaxConfidence': 20,
+ 'DBMaxConfidenceCRC': '9cc1f32e',
+ })
+
+
+class TestAccurateRipReport(TestCase):
+ def setUp(self):
+ sys.stdout = StringIO()
+ self.result = RipResult()
+ track = TrackResult()
+ track.number = 1
+ track.AR = {
+ 'v1': {
+ 'CRC': '284fc705',
+ 'DBCRC': '284fc705',
+ 'DBConfidence': 12,
+ },
+ 'v2': {
+ 'CRC': 'dc77f9ab',
+ 'DBCRC': 'dc77f9ab',
+ 'DBConfidence': 4,
+ },
+ 'DBMaxConfidence': 12,
+ 'DBMaxConfidenceCRC': '284fc705',
+ }
+ self.result.tracks.append(track)
+
+ def tearDown(self):
+ sys.stdout = sys.__stdout__
+
+ def test_report_no_result(self):
+ track = TrackResult()
+ track.number = 1
+ self.result.tracks[0] = track
+ print_report(self.result)
+ self.assertEquals(
+ sys.stdout.getvalue(),
+ 'track 1: unknown (error)\n'
+ )
+
+ def test_track_not_found(self):
+ self.result.tracks[0].AR['DBMaxConfidence'] = None
+ print_report(self.result)
+ self.assertEquals(
+ sys.stdout.getvalue(),
+ 'track 1: rip NOT accurate (not found) '
+ ' v1 [284fc705], v2 [dc77f9ab], DB [notfound]\n'
+ )
+
+ def test_htoa_not_tracked(self):
+ self.result.tracks[0].number = 0
+ self.result.tracks[0].AR['v1']['CRC'] = None
+ self.result.tracks[0].AR['v2']['CRC'] = None
+ print_report(self.result)
+ self.assertEquals(
+ sys.stdout.getvalue(),
+ 'track 0: unknown (not tracked)\n'
+ )
+
+ def test_report_v1_only(self):
+ self.result.tracks[0].AR['v2']['DBCRC'] = None
+ self.result.tracks[0].AR['v2']['DBConfidence'] = None
+ print_report(self.result)
+ self.assertEquals(
+ sys.stdout.getvalue(),
+ 'track 1: rip accurate (max confidence 12)'
+ ' v1 [284fc705], v2 [dc77f9ab], DB [284fc705]\n'
+ )
+
+ def test_report_v2_only(self):
+ self.result.tracks[0].AR['v1']['DBCRC'] = None
+ self.result.tracks[0].AR['v1']['DBConfidence'] = None
+ print_report(self.result)
+ self.assertEquals(
+ sys.stdout.getvalue(),
+ 'track 1: rip accurate (confidence 4 of 12)'
+ ' v1 [284fc705], v2 [dc77f9ab], DB [dc77f9ab]\n'
+ )
+
+ def test_report_v1_and_v2_max_confidence(self):
+ print_report(self.result)
+ self.assertEquals(
+ sys.stdout.getvalue(),
+ 'track 1: rip accurate (max confidence 12)'
+ ' v1 [284fc705], v2 [dc77f9ab], DB [284fc705, dc77f9ab]\n'
+ )
+
+ def test_report_v1_and_v2(self):
+ self.result.tracks[0].AR['DBMaxConfidence'] = 66
+ print_report(self.result)
+ self.assertEquals(
+ sys.stdout.getvalue(),
+ 'track 1: rip accurate (confidence 12 of 66)'
+ ' v1 [284fc705], v2 [dc77f9ab], DB [284fc705, dc77f9ab]\n'
+ )
diff --git a/whipper/test/test_common_program.py b/whipper/test/test_common_program.py
index dfa1867c..2ba2a84d 100644
--- a/whipper/test/test_common_program.py
+++ b/whipper/test/test_common_program.py
@@ -2,96 +2,19 @@
# vi:si:et:sw=4:sts=4:ts=4
-import os
-import pickle
-
import unittest
-from whipper.result import result
-from whipper.common import program, accurip, mbngs, config
+from whipper.common import program, mbngs, config
from whipper.command.cd import DEFAULT_DISC_TEMPLATE
-class TrackImageVerifyTestCase(unittest.TestCase):
- # example taken from a rip of Luke Haines Is Dead, disc 1
- # AccurateRip database has 0 confidence for 1st track
- # Rip had a wrong result for track 9
-
- def testVerify(self):
- path = os.path.join(os.path.dirname(__file__),
- 'dBAR-020-002e5023-029d8e49-040eaa14.bin')
- data = open(path, "rb").read()
- responses = accurip.getAccurateRipResponses(data)
-
- # these crc's were calculated from an actual rip
- checksums = [1644890007, 2945205445, 3983436658, 1528082495,
- 1203704270, 1163423644, 3649097244, 100524219,
- 1583356174, 373652058, 1842579359, 2850056507,
- 1329730252, 2526965856, 2525886806, 209743350,
- 3184062337, 2099956663, 2943874164, 2321637196]
-
- prog = program.Program(config.Config())
- prog.result = result.RipResult()
- # fill it with empty trackresults
- for i, c in enumerate(checksums):
- r = result.TrackResult()
- r.number = i + 1
- prog.result.tracks.append(r)
-
- prog._verifyImageWithChecksums(responses, checksums)
-
- # now check if the results were filled in properly
- tr = prog.result.getTrackResult(1)
- self.assertEquals(tr.accurip, False)
- self.assertEquals(tr.ARDBMaxConfidence, 0)
- self.assertEquals(tr.ARDBCRC, 0)
- self.assertEquals(tr.ARDBCRC, 0)
-
- tr = prog.result.getTrackResult(2)
- self.assertEquals(tr.accurip, True)
- self.assertEquals(tr.ARDBMaxConfidence, 2)
- self.assertEquals(tr.ARDBCRC, checksums[2 - 1])
-
- tr = prog.result.getTrackResult(10)
- self.assertEquals(tr.accurip, False)
- self.assertEquals(tr.ARDBMaxConfidence, 2)
- # we know track 10 was ripped wrong
- self.assertNotEquals(tr.ARDBCRC, checksums[10 - 1])
-
- res = prog.getAccurateRipResults()
- self.assertEquals(res[1 - 1],
- "Track 1: rip NOT accurate (not found) "
- "[620b0797], DB [notfound]")
- self.assertEquals(res[2 - 1],
- "Track 2: rip accurate (max confidence 2) "
- "[af8c44c5], DB [af8c44c5]")
- self.assertEquals(res[10 - 1],
- "Track 10: rip NOT accurate (max confidence 2) "
- "[16457a5a], DB [eb6e55b4]")
-
-
-class HTOATestCase(unittest.TestCase):
-
- def setUp(self):
- path = os.path.join(os.path.dirname(__file__),
- 'silentalarm.result.pickle')
- self._tracks = pickle.load(open(path, 'rb'))
-
- def testGetAccurateRipResults(self):
- prog = program.Program(config.Config())
- prog.result = result.RipResult()
- prog.result.tracks = self._tracks
-
- prog.getAccurateRipResults()
-
-
class PathTestCase(unittest.TestCase):
def testStandardTemplateEmpty(self):
prog = program.Program(config.Config())
path = prog.getPath(u'/tmp', DEFAULT_DISC_TEMPLATE,
- 'mbdiscid', 0)
+ 'mbdiscid', None)
self.assertEquals(path,
unicode('/tmp/unknown/Unknown Artist - mbdiscid/'
'Unknown Artist - mbdiscid'))
@@ -101,10 +24,9 @@ def testStandardTemplateFilled(self):
md = mbngs.DiscMetadata()
md.artist = md.sortName = 'Jeff Buckley'
md.title = 'Grace'
- prog.metadata = md
path = prog.getPath(u'/tmp', DEFAULT_DISC_TEMPLATE,
- 'mbdiscid', 0)
+ 'mbdiscid', md, 0)
self.assertEquals(path,
unicode('/tmp/unknown/Jeff Buckley - Grace/'
'Jeff Buckley - Grace'))
@@ -114,92 +36,7 @@ def testIssue66TemplateFilled(self):
md = mbngs.DiscMetadata()
md.artist = md.sortName = 'Jeff Buckley'
md.title = 'Grace'
- prog.metadata = md
- path = prog.getPath(u'/tmp', u'%A/%d', 'mbdiscid', 0)
+ path = prog.getPath(u'/tmp', u'%A/%d', 'mbdiscid', md, 0)
self.assertEquals(path,
u'/tmp/Jeff Buckley/Grace')
-
- def testDisambiguateOnRelease(self):
- """Test that disambiguation gets placed in the same part of the path
- as the release name.
-
- See https://github.com/JoeLametta/whipper/issues/127"""
- prog = program.Program(config.Config())
- md = mbngs.DiscMetadata()
- md.artist = 'Guy Davis'
- md.sortName = 'Davis, Guy'
- md.title = 'Call Down the Thunder'
- md.release = '1996'
- md.catalogNumber = 'RHR CD 89'
- prog.metadata = md
- templates = {
- u'%A/%d - %y': u'Guy Davis/Call Down the Thunder - 1996 (RHR CD 89)', # noqa: E501
- u'%A - %d - %y': u'Guy Davis - Call Down the Thunder - 1996 (RHR CD 89)', # noqa: E501
- u'%A/%y/%d': u'Guy Davis/1996/Call Down the Thunder (RHR CD 89)',
- u'%y/%d/%A': u'1996/Call Down the Thunder (RHR CD 89)/Guy Davis',
- u'%d/%A/%y': u'Call Down the Thunder (RHR CD 89)/Guy Davis/1996',
- }
-
- for template, expected_path in templates.iteritems():
- path = prog.getPath(u'/tmp', template, 'mbdiscid', 0, disambiguate=True) # noqa: E501
- self.assertEquals(path, u'/tmp/' + expected_path)
-
- def testDisambiguateOnReleaseOnlyOnce(self):
- """Test that disambiguation gets added only once."""
- prog = program.Program(config.Config())
- md = mbngs.DiscMetadata()
- md.artist = 'Guy Davis'
- md.sortName = 'Davis, Guy'
- md.title = 'Call Down the Thunder'
- md.release = '1996'
- md.catalogNumber = 'RHR CD 89'
- prog.metadata = md
- template = u'%A/%d - %y/%d/%d'
-
- path = prog.getPath(u'/tmp', template, 'mbdiscid', 0, disambiguate=True) # noqa: E501
- self.assertEquals(path,
- u'/tmp/Guy Davis/Call Down the Thunder - 1996 (RHR CD 89)/Call Down the Thunder/Call Down the Thunder') # noqa: E501
-
- def testDisambiguateOnNoReleaseTitle(self):
- """Test that disambiguation gets added even if there's no release
- title in the template."""
- prog = program.Program(config.Config())
- md = mbngs.DiscMetadata()
- md.artist = 'Guy Davis'
- md.sortName = 'Davis, Guy'
- md.title = 'Call Down the Thunder'
- md.release = '1996'
- md.catalogNumber = 'RHR CD 89'
- prog.metadata = md
- templates = {
- u'%A/%y': u'Guy Davis/1996 (RHR CD 89)',
- u'%A - %y': u'Guy Davis - 1996 (RHR CD 89)',
- u'%y/%A': u'1996/Guy Davis (RHR CD 89)',
- }
-
- for template, expected_path in templates.iteritems():
- path = prog.getPath(u'/tmp', template, 'mbdiscid', 0, disambiguate=True) # noqa: E501
- self.assertEquals(path, u'/tmp/' + expected_path)
-
- def testAddDisambiguationUnitTest(self):
- """Unit test for Program.addDisambiguation()."""
- prog = program.Program(config.Config())
- md = mbngs.DiscMetadata()
-
- # No relevant disambiguation metadata
- self.assertEquals(
- prog.addDisambiguation(u'Test', md),
- u'Test')
-
- # Only barcode available
- md.barcode = '033651008927'
- self.assertEquals(
- prog.addDisambiguation(u'Test', md),
- u'Test (033651008927)')
-
- # Both catalog number and barcode available
- md.catalogNumber = 'RHR CD 89'
- self.assertEquals(
- prog.addDisambiguation(u'Test', md),
- u'Test (RHR CD 89)')
diff --git a/whipper/test/test_image_table.py b/whipper/test/test_image_table.py
index 68455c25..e6b1b6ce 100644
--- a/whipper/test/test_image_table.py
+++ b/whipper/test/test_image_table.py
@@ -62,11 +62,10 @@ def testMusicBrainz(self):
"KnpGsLhvH.lPrNc1PBL21lb9Bg4-")
def testAccurateRip(self):
- self.assertEquals(self.table.getAccurateRipIds(), (
+ self.assertEquals(self.table.accuraterip_ids(), (
"0013bd5a", "00b8d489"))
- self.assertEquals(self.table.getAccurateRipURL(),
- "http://www.accuraterip.com/accuraterip/a/5/d/"
- "dBAR-012-0013bd5a-00b8d489-c60af50d.bin")
+ self.assertEquals(self.table.accuraterip_path(),
+ "a/5/d/dBAR-012-0013bd5a-00b8d489-c60af50d.bin")
def testDuration(self):
self.assertEquals(self.table.duration(), 2761413)
diff --git a/whipper/test/test_image_toc.py b/whipper/test/test_image_toc.py
index 926b7f80..4a27c822 100644
--- a/whipper/test/test_image_toc.py
+++ b/whipper/test/test_image_toc.py
@@ -89,8 +89,8 @@ def testConvertCue(self):
common.diffStrings(ref, cue)
# we verify it because it has failed in readdisc in the past
- self.assertEquals(self.toc.table.getAccurateRipURL(),
- 'http://www.accuraterip.com/accuraterip/3/c/4/dBAR-013-0019d4c3-00fe8924-b90c650d.bin') # noqa: E501
+ self.assertEquals(self.toc.table.accuraterip_path(),
+ '3/c/4/dBAR-013-0019d4c3-00fe8924-b90c650d.bin')
def testGetRealPath(self):
self.assertRaises(KeyError, self.toc.getRealPath, u'track01.wav')
@@ -164,8 +164,8 @@ def testCDDBId(self):
def testAccurateRip(self):
# we verify it because it has failed in readdisc in the past
- self.assertEquals(self.toc.table.getAccurateRipURL(),
- 'http://www.accuraterip.com/accuraterip/e/d/2/dBAR-013-001af2de-0105994e-ad0be00d.bin') # noqa: E501
+ self.assertEquals(self.toc.table.accuraterip_path(),
+ 'e/d/2/dBAR-013-001af2de-0105994e-ad0be00d.bin')
# The Breeders - Mountain Battles has CDText