Skip to content

Commit

Permalink
Catch exceptions deserializing on-disk messages from the queue
Browse files Browse the repository at this point in the history
Fixes #1129
  • Loading branch information
growse committed Nov 24, 2021
1 parent 2c1174c commit bd009ec
Showing 1 changed file with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.squareup.tape2.ObjectQueue
import com.squareup.tape2.QueueFile
import org.owntracks.android.model.messages.MessageBase
import org.owntracks.android.model.messages.MessageEncrypted
import org.owntracks.android.model.messages.MessageUnknown
import org.owntracks.android.support.Parser
import timber.log.Timber
import java.io.File
Expand All @@ -12,11 +13,11 @@ import java.io.OutputStream
import java.util.concurrent.LinkedBlockingDeque

class BlockingDequeThatAlsoSometimesPersistsThingsToDiskMaybe(
capacity: Int,
path: File,
parser: Parser
capacity: Int,
path: File,
parser: Parser
) :
LinkedBlockingDeque<MessageBase>(capacity) {
LinkedBlockingDeque<MessageBase>(capacity) {
private val parallelDiskQueueHead: ObjectQueue<MessageBase>
private val parallelDiskQueue: ObjectQueue<MessageBase>

Expand All @@ -29,27 +30,32 @@ class BlockingDequeThatAlsoSometimesPersistsThingsToDiskMaybe(
val headQueueFile = diskBackedQueueOrNull(headSlotFile)

val messageBaseConverter = object : ObjectQueue.Converter<MessageBase> {
override fun from(source: ByteArray): MessageBase = parser.fromUnencryptedJson(source)
override fun from(source: ByteArray): MessageBase = try {
parser.fromUnencryptedJson(source)
} catch (exception: Exception) {
Timber.w("Unable to recover message from queue: ${source.toString(Charsets.UTF_8)}")
MessageUnknown
}

override fun toStream(value: MessageBase, sink: OutputStream) {
sink.write(parser.toUnencryptedJsonBytes(value))
}
}

parallelDiskQueue = queueFile?.run { ObjectQueue.create(this, messageBaseConverter) }
?: ObjectQueue.createInMemory()
?: ObjectQueue.createInMemory()

parallelDiskQueueHead =
headQueueFile?.run { ObjectQueue.create(this, messageBaseConverter) }
?: ObjectQueue.createInMemory()
headQueueFile?.run { ObjectQueue.create(this, messageBaseConverter) }
?: ObjectQueue.createInMemory()

(parallelDiskQueueHead.asList() + parallelDiskQueue.asList())
.filter { it !is MessageEncrypted }
.forEach {
if (!offerLast(it)) {
Timber.w("On-disk queue contains message that won't fit into queue. Dropping: $it")
.filter { it !is MessageEncrypted }
.forEach {
if (!offerLast(it)) {
Timber.w("On-disk queue contains message that won't fit into queue. Dropping: $it")
}
}
}
resyncQueueToDisk()
}

Expand Down

0 comments on commit bd009ec

Please sign in to comment.