Skip to content

Commit

Permalink
oboe: automatically stop and close a stream when disconnected
Browse files Browse the repository at this point in the history
WARNING: API change
Modifies OboeStreamCallback onError methods.

Adds protection against stopping or closing a stream at the same time
that the error callback is being processed.

Fix issue #1
  • Loading branch information
Phil Burk committed Nov 23, 2017
1 parent 618bda0 commit cfd1a01
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 49 deletions.
9 changes: 9 additions & 0 deletions include/oboe/OboeStreamBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,15 @@ class OboeStreamBuilder : public OboeStreamBase {
return this;
}

/**
* Specifies an object to handle data or error related callbacks from the underlying API.
*
* When an error callback occurs, the associated stream will be stopped
* and closed in a separate thread.
*
* @param streamCallback
* @return
*/
OboeStreamBuilder *setCallback(OboeStreamCallback *streamCallback) {
mStreamCallback = streamCallback;
return this;
Expand Down
32 changes: 26 additions & 6 deletions include/oboe/OboeStreamCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,42 @@ class OboeStream;
class OboeStreamCallback {
public:
virtual ~OboeStreamCallback() = default;

/**
* A buffer is ready for processing.
*
* @param input buffer containing recorded audio, may be NULL
* @param output fill this with audio data to be played, may be NULL
* @param number of frames of input or output
* @param oboeStream pointer to the associated stream
* @param audioData buffer containing input data or a place to put output data
* @param numFrames number of frames to be processed
* @return OBOE_CALLBACK_RESULT_CONTINUE or OBOE_CALLBACK_RESULT_STOP
*/
virtual oboe_data_callback_result_t onAudioReady(
OboeStream *audioStream,
OboeStream *oboeStream,
void *audioData,
int32_t numFrames) = 0;

virtual void onError(OboeStream *audioStream, oboe_result_t error) {}
/**
* This will be called when an error occurs on a stream or when the stream is discomnnected.
* The underlying stream will already be stopped by Oboe but not yet closed.
* So the stream can be queried.
*
* @param oboeStream pointer to the associated stream
* @param error
*/
virtual void onErrorBeforeClose(OboeStream *oboeStream, oboe_result_t error) {}

};
/**
* This will be called when an error occurs on a stream or when the stream is disconnected.
* The underlying stream will already be stopped AND closed by Oboe.
* So the underlyng stream cannot be referenced.
*
* This callback could be used to reopen a new stream on another device.
*
* @param oboeStream pointer to the associated stream
* @param error
*/
virtual void onErrorAfterClose(OboeStream *oboeStream, oboe_result_t error) {}

};

#endif //OBOE_OBOE_STREAM_CALLBACK_H
164 changes: 132 additions & 32 deletions src/aaudio/OboeStreamAAudio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
* limitations under the License.
*/

#include <assert.h>
#include <stdint.h>

#include "oboe/OboeUtilities.h"
#include "common/OboeDebug.h"
#include "aaudio/AAudioLoader.h"
#include "aaudio/OboeStreamAAudio.h"
#include "common/OboeDebug.h"
#include "oboe/OboeUtilities.h"

AAudioLoader *OboeStreamAAudio::mLibLoader = nullptr;

Expand Down Expand Up @@ -64,6 +65,14 @@ static aaudio_data_callback_result_t oboe_aaudio_data_callback_proc(
}
}

static void oboe_aaudio_error_thread_proc(OboeStreamAAudio *oboeStream,
AAudioStream *stream,
oboe_result_t error) {
if (oboeStream != NULL) {
oboeStream->onErrorInThread(stream, error);
}
}

// 'C' wrapper for the error callback method
static void oboe_aaudio_error_callback_proc(
AAudioStream *stream,
Expand All @@ -72,7 +81,9 @@ static void oboe_aaudio_error_callback_proc(

OboeStreamAAudio *oboeStream = (OboeStreamAAudio *)userData;
if (oboeStream != NULL) {
oboeStream->callOnError(stream, error);
// Handle error on a separate thread
std::thread t(oboe_aaudio_error_thread_proc, oboeStream, stream, error);
t.detach();
}
}

Expand Down Expand Up @@ -105,15 +116,19 @@ oboe_result_t OboeStreamAAudio::open() {
mLibLoader->builder_setSharingMode(aaudioBuilder, mSharingMode);
mLibLoader->builder_setPerformanceMode(aaudioBuilder, mPerformanceMode);

// TODO get more parameters from the builder
// TODO get more parameters from the builder?

if (mStreamCallback != nullptr) {
mLibLoader->builder_setDataCallback(aaudioBuilder, oboe_aaudio_data_callback_proc, this);
mLibLoader->builder_setErrorCallback(aaudioBuilder, oboe_aaudio_error_callback_proc, this);
mLibLoader->builder_setFramesPerDataCallback(aaudioBuilder, getFramesPerCallback());
}
mLibLoader->builder_setErrorCallback(aaudioBuilder, oboe_aaudio_error_callback_proc, this);

result = mLibLoader->builder_openStream(aaudioBuilder, &mAAudioStream);
{
AAudioStream *stream = nullptr;
result = mLibLoader->builder_openStream(aaudioBuilder, &stream);
mAAudioStream.store(stream);
}
if (result != AAUDIO_OK) {
goto error2;
}
Expand All @@ -128,23 +143,33 @@ oboe_result_t OboeStreamAAudio::open() {
}
mSharingMode = mLibLoader->stream_getSharingMode(mAAudioStream);
mPerformanceMode = mLibLoader->stream_getPerformanceMode(mAAudioStream);
mBufferCapacityInFrames = getBufferCapacityInFrames();
mBufferCapacityInFrames = mLibLoader->stream_getBufferCapacity(mAAudioStream);

LOGD("OboeStreamAAudio.open() app format = %d", (int) mFormat);
LOGD("OboeStreamAAudio.open() native format = %d", (int) mNativeFormat);
LOGD("OboeStreamAAudio.open() sample rate = %d", (int) mSampleRate);
LOGD("OboeStreamAAudio.open() capacity = %d", (int) mBufferCapacityInFrames);

error2:
mLibLoader->builder_delete(aaudioBuilder);
LOGD("OboeStreamAAudio.open: AAudioStream_Open() returned %s, mAAudioStream = %p",
mLibLoader->convertResultToText(result), mAAudioStream);
mLibLoader->convertResultToText(result), mAAudioStream.load());
return result;
}

oboe_result_t OboeStreamAAudio::close()
{
oboe_result_t result = mLibLoader->stream_close(mAAudioStream);
mAAudioStream = nullptr;
// The main reason we have this mutex if to prevent a collision between a call
// by the application to stop a stream at the same time that an onError callback
// is being executed because of a disconnect. The close will delete the stream,
// which could otherwise cause the requestStop() to crash.
std::lock_guard<std::mutex> lock(mLock);
oboe_result_t result = OBOE_OK;
// This will delete the AAudio stream object so we need to null out the pointer.
AAudioStream *stream = mAAudioStream.exchange(nullptr);
if (stream != nullptr) {
result = mLibLoader->stream_close(stream);
}
return result;
}

Expand All @@ -157,8 +182,18 @@ aaudio_data_callback_result_t OboeStreamAAudio::callOnAudioReady(AAudioStream *s
numFrames);
}

void OboeStreamAAudio::callOnError(AAudioStream *stream, oboe_result_t error) {
mStreamCallback->onError( this, error);
void OboeStreamAAudio::onErrorInThread(AAudioStream *stream, oboe_result_t error) {
LOGD("onErrorInThread() - entering ===================================");
assert(stream == mAAudioStream.load());
requestStop();
if (mStreamCallback != nullptr) {
mStreamCallback->onErrorBeforeClose(this, error);
}
close();
if (mStreamCallback != nullptr) {
mStreamCallback->onErrorAfterClose(this, error);
}
LOGD("onErrorInThread() - exiting ===================================");
}

oboe_result_t OboeStreamAAudio::convertApplicationDataToNative(int32_t numFrames) {
Expand All @@ -180,81 +215,146 @@ oboe_result_t OboeStreamAAudio::convertApplicationDataToNative(int32_t numFrames

oboe_result_t OboeStreamAAudio::requestStart()
{
return mLibLoader->stream_requestStart(mAAudioStream);
std::lock_guard<std::mutex> lock(mLock);
AAudioStream *stream = mAAudioStream.load();
if (stream != nullptr) {
return mLibLoader->stream_requestStart(stream);
} else {
return OBOE_ERROR_NULL;
}
}

oboe_result_t OboeStreamAAudio::requestPause()
{
return mLibLoader->stream_requestPause(mAAudioStream);
std::lock_guard<std::mutex> lock(mLock);
AAudioStream *stream = mAAudioStream.load();
if (stream != nullptr) {
return mLibLoader->stream_requestPause(stream);
} else {
return OBOE_ERROR_NULL;
}
}

oboe_result_t OboeStreamAAudio::requestFlush() {
return mLibLoader->stream_requestFlush(mAAudioStream);
std::lock_guard<std::mutex> lock(mLock);
AAudioStream *stream = mAAudioStream.load();
if (stream != nullptr) {
return mLibLoader->stream_requestFlush(stream);
} else {
return OBOE_ERROR_NULL;
}
}

oboe_result_t OboeStreamAAudio::requestStop()
{
return mLibLoader->stream_requestStop(mAAudioStream);
std::lock_guard<std::mutex> lock(mLock);
AAudioStream *stream = mAAudioStream.load();
if (stream != nullptr) {
return mLibLoader->stream_requestStop(stream);
} else {
return OBOE_ERROR_NULL;
}
}

oboe_result_t OboeStreamAAudio::write(const void *buffer,
int32_t numFrames,
int64_t timeoutNanoseconds)
{
return mLibLoader->stream_write(mAAudioStream, buffer, numFrames, timeoutNanoseconds);
AAudioStream *stream = mAAudioStream.load();
if (stream != nullptr) {
return mLibLoader->stream_write(mAAudioStream, buffer, numFrames, timeoutNanoseconds);
} else {
return OBOE_ERROR_NULL;
}
}

oboe_result_t OboeStreamAAudio::waitForStateChange(oboe_stream_state_t currentState,
oboe_stream_state_t *nextState,
int64_t timeoutNanoseconds)
{
return mLibLoader->stream_waitForStateChange(mAAudioStream, currentState,
nextState, timeoutNanoseconds);
AAudioStream *stream = mAAudioStream.load();
if (stream != nullptr) {
return mLibLoader->stream_waitForStateChange(mAAudioStream, currentState,
nextState, timeoutNanoseconds);
} else {
return OBOE_ERROR_NULL;
}
}

oboe_result_t OboeStreamAAudio::setBufferSizeInFrames(int32_t requestedFrames)
{
if (requestedFrames > mBufferCapacityInFrames) {
requestedFrames = mBufferCapacityInFrames;
}
return mLibLoader->stream_setBufferSize(mAAudioStream, requestedFrames);
}

oboe_stream_state_t OboeStreamAAudio::getState()
{
if (mAAudioStream == nullptr) {
AAudioStream *stream = mAAudioStream.load();
if (stream != nullptr) {
return mLibLoader->stream_getState(stream);
} else {
return OBOE_STREAM_STATE_CLOSED;
}
return mLibLoader->stream_getState(mAAudioStream);
}

int32_t OboeStreamAAudio::getBufferSizeInFrames() const {
return mLibLoader->stream_getBufferSize(mAAudioStream);
}

int32_t OboeStreamAAudio::getBufferCapacityInFrames() const {
return mLibLoader->stream_getBufferCapacity(mAAudioStream);
AAudioStream *stream = mAAudioStream.load();
if (stream != nullptr) {
return mLibLoader->stream_getBufferSize(stream);
} else {
return OBOE_ERROR_NULL;
}
}

int32_t OboeStreamAAudio::getFramesPerBurst()
{
return mLibLoader->stream_getFramesPerBurst(mAAudioStream);
AAudioStream *stream = mAAudioStream.load();
if (stream != nullptr) {
return mLibLoader->stream_getFramesPerBurst(stream);
} else {
return OBOE_ERROR_NULL;
}
}

int64_t OboeStreamAAudio::getFramesRead()
{
return mLibLoader->stream_getFramesRead(mAAudioStream);
AAudioStream *stream = mAAudioStream.load();
if (stream != nullptr) {
return mLibLoader->stream_getFramesRead(stream);
} else {
return OBOE_ERROR_NULL;
}
}
int64_t OboeStreamAAudio::getFramesWritten()
{
return mLibLoader->stream_getFramesWritten(mAAudioStream);
AAudioStream *stream = mAAudioStream.load();
if (stream != nullptr) {
return mLibLoader->stream_getFramesWritten(stream);
} else {
return OBOE_ERROR_NULL;
}
}

int32_t OboeStreamAAudio::getXRunCount()
{
return mLibLoader->stream_getXRunCount(mAAudioStream);
AAudioStream *stream = mAAudioStream.load();
if (stream != nullptr) {
return mLibLoader->stream_getXRunCount(stream);
} else {
return OBOE_ERROR_NULL;
}
}

oboe_result_t OboeStreamAAudio::getTimestamp(clockid_t clockId,
int64_t *framePosition,
int64_t *timeNanoseconds) {
return mLibLoader->stream_getTimestamp(mAAudioStream, clockId,
framePosition, timeNanoseconds);
AAudioStream *stream = mAAudioStream.load();
if (stream != nullptr) {
return mLibLoader->stream_getTimestamp(stream, clockId,
framePosition, timeNanoseconds);
} else {
return OBOE_ERROR_NULL;
}
}
Loading

0 comments on commit cfd1a01

Please sign in to comment.