Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add openSharedStream() to prevent deletion while executing onError callbacks #821

Merged
merged 4 commits into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions apps/OboeTester/app/src/main/cpp/NativeAudioContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ int ActivityContext::callbackSize = 0;

oboe::AudioStream * ActivityContext::getOutputStream() {
for (auto entry : mOboeStreams) {
oboe::AudioStream *oboeStream = entry.second;
oboe::AudioStream *oboeStream = entry.second.get();
if (oboeStream->getDirection() == oboe::Direction::Output) {
return oboeStream;
}
Expand All @@ -69,7 +69,7 @@ oboe::AudioStream * ActivityContext::getOutputStream() {

oboe::AudioStream * ActivityContext::getInputStream() {
for (auto entry : mOboeStreams) {
oboe::AudioStream *oboeStream = entry.second;
oboe::AudioStream *oboeStream = entry.second.get();
if (oboeStream != nullptr) {
if (oboeStream->getDirection() == oboe::Direction::Input) {
return oboeStream;
Expand All @@ -80,6 +80,7 @@ oboe::AudioStream * ActivityContext::getInputStream() {
}

void ActivityContext::freeStreamIndex(int32_t streamIndex) {
mOboeStreams[streamIndex].reset();
mOboeStreams.erase(streamIndex);
}

Expand All @@ -88,13 +89,14 @@ int32_t ActivityContext::allocateStreamIndex() {
}

void ActivityContext::close(int32_t streamIndex) {
LOGD("ActivityContext::%s() called for stream %d ", __func__, streamIndex);
stopBlockingIOThread();
oboe::AudioStream *oboeStream = getStream(streamIndex);
LOGD("ActivityContext::%s() close stream %p ", __func__, oboeStream);
if (oboeStream != nullptr) {
oboeStream->close();
freeStreamIndex(streamIndex);
LOGD("ActivityContext::%s() delete stream %d ", __func__, streamIndex);
delete oboeStream;
freeStreamIndex(streamIndex);
}
}

Expand All @@ -110,7 +112,7 @@ oboe::Result ActivityContext::pause() {
oboe::Result result = oboe::Result::OK;
stopBlockingIOThread();
for (auto entry : mOboeStreams) {
oboe::AudioStream *oboeStream = entry.second;
oboe::AudioStream *oboeStream = entry.second.get();
result = oboeStream->requestPause();
printScheduler();
}
Expand All @@ -123,8 +125,8 @@ oboe::Result ActivityContext::stopAllStreams() {
LOGD("ActivityContext::stopAllStreams() called");
for (auto entry : mOboeStreams) {
LOGD("ActivityContext::stopAllStreams() handle = %d, stream %p",
entry.first, entry.second);
oboe::AudioStream *oboeStream = entry.second;
entry.first, entry.second.get());
oboe::AudioStream *oboeStream = entry.second.get();
result = oboeStream->requestStop();
printScheduler();
}
Expand Down Expand Up @@ -204,26 +206,24 @@ int ActivityContext::open(jint nativeApi,
AAudioExtensions::getInstance().setMMapEnabled(isMMap);

// Open a stream based on the builder settings.
oboe::AudioStream *oboeStream = nullptr;
oboe::Result result = builder.openStream(&oboeStream);
std::shared_ptr<oboe::AudioStream> oboeStream;
oboe::Result result = builder.openSharedStream(oboeStream);
LOGD("ActivityContext::open() builder.openStream() returned %d, oboeStream = %p",
result, oboeStream);
result, oboeStream.get());
AAudioExtensions::getInstance().setMMapEnabled(oldMMapEnabled);
if (result != oboe::Result::OK) {
delete oboeStream;
oboeStream = nullptr;
freeStreamIndex(streamIndex);
streamIndex = -1;
} else {
mOboeStreams[streamIndex] = oboeStream;
mOboeStreams[streamIndex] = oboeStream; // save shared_ptr

mChannelCount = oboeStream->getChannelCount(); // FIXME store per stream
mFramesPerBurst = oboeStream->getFramesPerBurst();
mSampleRate = oboeStream->getSampleRate();

createRecording();

finishOpen(isInput, oboeStream);
finishOpen(isInput, oboeStream.get());
}


Expand Down Expand Up @@ -606,6 +606,7 @@ void ActivityGlitches::finishOpen(bool isInput, oboe::AudioStream *oboeStream) {

// =================================================================== ActivityTestDisconnect
void ActivityTestDisconnect::close(int32_t streamIndex) {
LOGD("ActivityTestDisconnect::%s() called for stream %d ", __func__, streamIndex);
ActivityContext::close(streamIndex);
mSinkFloat.reset();
}
Expand Down
4 changes: 2 additions & 2 deletions apps/OboeTester/app/src/main/cpp/NativeAudioContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class ActivityContext {
oboe::AudioStream *getStream(int32_t streamIndex) {
auto it = mOboeStreams.find(streamIndex);
if (it != mOboeStreams.end()) {
return it->second;
return it->second.get();
} else {
return nullptr;
}
Expand Down Expand Up @@ -329,7 +329,7 @@ class ActivityContext {
std::unique_ptr<MultiChannelRecording> mRecording{};

int32_t mNextStreamHandle = 0;
std::unordered_map<int32_t, oboe::AudioStream *> mOboeStreams;
std::unordered_map<int32_t, std::shared_ptr<oboe::AudioStream>> mOboeStreams;
int32_t mFramesPerBurst = 0; // TODO per stream
int32_t mChannelCount = 0; // TODO per stream
int32_t mSampleRate = 0; // TODO per stream
Expand Down
2 changes: 2 additions & 0 deletions apps/OboeTester/app/src/main/cpp/OboeStreamCallbackProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ void OboeStreamCallbackProxy::onErrorBeforeClose(oboe::AudioStream *audioStream,
if (mCallback != nullptr) {
mCallback->onErrorBeforeClose(audioStream, error);
}
usleep(2000 * 1000); // FIXME - sleep to provoke a race condition
philburk marked this conversation as resolved.
Show resolved Hide resolved
LOGD("OboeStreamCallbackProxy::%s(%p, %d) returning after sleep", __func__, audioStream, error);
}

void OboeStreamCallbackProxy::onErrorAfterClose(oboe::AudioStream *audioStream, oboe::Result error) {
Expand Down
21 changes: 19 additions & 2 deletions include/oboe/AudioStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,22 @@ class AudioStream : public AudioStreamBase {
ResultWithValue<int32_t> waitForAvailableFrames(int32_t numFrames,
int64_t timeoutNanoseconds);

/*
philburk marked this conversation as resolved.
Show resolved Hide resolved
* INTERNAL USE ONLY
* Set a weak_ptr to this stream from the shared_ptr so that we can
* later use a shared_ptr in the error callback.
*/
void setWeakThis(std::shared_ptr<oboe::AudioStream> &sharedStream) {
mWeakThis = sharedStream;
}

/*
* Make a shared_ptr that will prevent this stream from being deleted.
*/
std::shared_ptr<oboe::AudioStream> lockWeakThis() {
return mWeakThis.lock();
}

protected:

/**
Expand Down Expand Up @@ -497,17 +513,18 @@ class AudioStream : public AudioStreamBase {

std::mutex mLock; // for synchronizing start/stop/close


private:
int mPreviousScheduler = -1;

std::atomic<bool> mDataCallbackEnabled{false};
std::atomic<bool> mErrorCallbackCalled{false};


std::weak_ptr<AudioStream> mWeakThis;
};

/**
* This struct is a stateless functor which closes a audiostream prior to its deletion.
* This struct is a stateless functor which closes an AudioStream prior to its deletion.
* This means it can be used to safely delete a smart pointer referring to an open stream.
*/
struct StreamDeleterFunctor {
Expand Down
3 changes: 3 additions & 0 deletions include/oboe/AudioStreamBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace oboe {
// This depends on AudioStream, so we use forward declaration, it will close and delete the stream
struct StreamDeleterFunctor;
using ManagedStream = std::unique_ptr<AudioStream, StreamDeleterFunctor>;

/**
* Factory class for an audio Stream.
*/
Expand Down Expand Up @@ -399,6 +400,8 @@ class AudioStreamBuilder : public AudioStreamBase {
*/
Result openManagedStream(ManagedStream &stream);

Result openSharedStream(std::shared_ptr<oboe::AudioStream> &sharedStream);

private:

/**
Expand Down
20 changes: 20 additions & 0 deletions src/aaudio/AudioStreamAAudio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ static void oboe_aaudio_error_thread_proc(AudioStreamAAudio *oboeStream,
LOGD("%s() - exiting <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<", __func__);
}

// This runs in its own thread.
// Only one of these threads will be launched from internalErrorCallback().
// Prevents deletion of the stream if the app is using AudioStreamBuilder::openSharedStream()
static void oboe_aaudio_error_thread_proc_shared(std::shared_ptr<AudioStream> sharedStream,
Result error) {
LOGD("%s() - entering >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>", __func__);
AudioStreamAAudio *oboeStream = reinterpret_cast<AudioStreamAAudio*>(sharedStream.get());
oboe_aaudio_error_thread_proc(oboeStream, error);
LOGD("%s() - exiting <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<", __func__);
}

namespace oboe {

/*
Expand Down Expand Up @@ -101,12 +112,21 @@ void AudioStreamAAudio::internalErrorCallback(
void *userData,
aaudio_result_t error) {
AudioStreamAAudio *oboeStream = reinterpret_cast<AudioStreamAAudio*>(userData);

// Prevents deletion of the stream if the app is using AudioStreamBuilder::openSharedStream()
std::shared_ptr<AudioStream> sharedStream = oboeStream->lockWeakThis();

// These checks should be enough because we assume that the stream close()
// will join() any active callback threads and will not allow new callbacks.
if (oboeStream->wasErrorCallbackCalled()) { // block extra error callbacks
LOGE("%s() multiple error callbacks called!", __func__);
} else if (stream != oboeStream->getUnderlyingStream()) {
LOGD("%s() stream already closed", __func__); // can happen if there are bugs
} else if (sharedStream) {
// Handle error on a separate thread using shared pointer.
std::thread t(oboe_aaudio_error_thread_proc_shared, sharedStream,
static_cast<Result>(error));
t.detach();
} else {
// Handle error on a separate thread.
std::thread t(oboe_aaudio_error_thread_proc, oboeStream,
Expand Down
10 changes: 10 additions & 0 deletions src/common/AudioStreamBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,14 @@ Result AudioStreamBuilder::openManagedStream(oboe::ManagedStream &stream) {
return result;
}

Result AudioStreamBuilder::openSharedStream(std::shared_ptr<oboe::AudioStream> &sharedStream) {
sharedStream.reset();
AudioStream *streamptr;
auto result = openStream(&streamptr);
sharedStream.reset(streamptr);
// Save a weak_ptr in the stream for use with callbacks.
streamptr->setWeakThis(sharedStream);
return result;
}

} // namespace oboe
philburk marked this conversation as resolved.
Show resolved Hide resolved