-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
C++ catching/finding connection and stream errors & continuous/infinite speech recognition #87
Comments
Hi, Any help is much appreciated for [1] and [2]. Thank you |
I do not think creating the channel (this line):
can actually fail (well, it could be out of memory or something, but it does not try to connect or anything). Only once you try to use the channel you will see failures.
Error detection happens when you call
At that point you get a https://github.com/grpc/grpc/blob/master/doc/statuscodes.md Though sometimes the precise semantics of what constitutes a retriable error depends on the service.
Okay.
Correct.
I am guessing it would be something like: // Begin a stream.
auto last_reset = std::chrono::steady_close::now();
auto context = std::make_unique<grpc::ClientContext>();
auto streamer = speech->StreamingRecognize(context.get());
auto reset_stream = [&context, &streamer, &last_reset] {
// Reset the stream
context->TryCancel();
auto status = streamer->Finish();
if (not IsRecoverable(status)) {
throw GrpcStatusToException(status);
}
context = std::make_unique<grpc::ClientContext>();
streamer = speech->StreamingRecognize(context.get());
last_reset = std::chrono::steady_clock::now();
};
while (still have input audio) {
using std::chrono_literals;
auto const elapsed_time = std::chrono::steady_clock::now() - start;
if (elapsed_time > 4m) {
reset_stream();
}
StreamingRecognizeRequest request = GetSomeAudioFromYourSource();
streamer->Write(request);
// Read responses.
StreamingRecognizeResponse response;
while (streamer->Read(&response)) { // Returns false when no more to read.
// Dump the transcript of all the results.
}
grpc::Status status = streamer->Finish();
if (IsRecoverable(status)) {
reset_stream();
continue;
}
throw GrpcStatusToException(status);
}
See above, but feel free to ask for clarification if I was too obscure. |
Hi @coryan Thank you very much for the response and example code. It is really helpful. Based on what you've mentioned, I modified my code mentioned here:, if the code is not accessible, I've copied below as well. The code (GRPC server) does the following
I did the resetStream as you mentioned in your response, but I'm getting the following error:
I've declared the streamer and channel as global variables, hopefully that is fine. Also I've one more question:
Also 2. while the reset is happening, what will happen to the audio bytes those are being streamed at the time of resetting ? because the stream got reset, the main thread will not be able to read responses for that last audio.
|
No problem.
Ack. I do not think I will be able to debug your code for you, but maybe I can give you some hints.
I think you need to synchronize both threads, right? And you are copying a pointer to the
I am guessing you still had at least one operation pending on the stream at the time? My code sketch did not cover synchronization between the thread reading from the speech-to-text stream and the thread writing to it. I will sketch one possible approach below, but this is really up to you I would think.
I would start thinking of wrapping all this state into a class by now, but this is, of course, up to you.
You need to synchronize manually with the stream sending the audio, or maybe through a queue or something. For example, you could do: // Assume the Microphone thread will add requests to this queue, you need a mutex
// and condition variable to synchronize them, I am sure you can find examples of this
// on the Internet. Code is mostly C++17, can be adapted for C++11.
std::mutex mu;
std::condition_variable cv;
bool shutdown_queue;
std::deque<StreamingRecognizeRequest> requests;
auto get_next = [&mu, &cv, &requests] -> std::optional<StreamingRecognizeRequest> {
std::unique_lock lk(mu);
cv.wait([&] { return shutdown_queue or not requests.empty(); });
if (shutdown_queue) return {};
auto value = std::move(requests.back());
requests.pop();
return value;
};
for (auto v = get_next(); v.has_value(); v = get_next()) {
using std::chrono_literals;
auto const elapsed_time = std::chrono::steady_clock::now() - start;
if (elapsed_time > 4m) {
reset_stream();
}
streamer->Write(*std::move(v));
// Read responses.
StreamingRecognizeResponse response;
while (streamer->Read(&response)) { // Returns false when no more to read.
// Dump the transcript of all the results.
}
grpc::Status status = streamer->Finish();
if (IsRecoverable(status)) {
reset_stream();
continue;
}
throw GrpcStatusToException(status);
}
Right. I did not cover any of this in my pseudo-code. Maybe the new pseudo-ish code will get you going. |
Hi @coryan, Thank you very much for taking your time for providing example with synchronization. I'm sorry about posting big chunks of code, thought it would give you some idea where I'm doing some mistakes, not meant to give you the impression that you have to debug. Basically I'm writing a GRPC server code that takes a continuous stream of bytes from client (it could be a mobile app where user can talk to or a small grpc client program that reads data from a wav file and streams to my server). The server code reads the bytes, sends to google speech service and provides the google ASR response to client. I tried to avoid sharing the speech-to-text stream object in two threads and tried to did the following:
I'm able to write the stream with the above code but I'm not getting any responses from Google with the above code. We do not want to use google speech async request. Then I started trying my old way where the main thread sends the response to the google speech while the other additional thread reads responses from google speech and process.
I just want to to check if the reinitialization of the stream at the beginning of the code works ? If it works, then I will try to do after every 4.5 mins. But I'm not able to get it working. I'm able to send the audio data to google but the Thread that process the responses is not printing anything. I believe Since Also I'm a beginner to c++ and not know much about synchronization. You previous example with synchronization helped a lot for me to understand. So I wrote a small stub to reset the stream with mutex like here:
I believe the main thread is calling the reset method so it would wait while the stream is resetting ? can you help me what do I need to do to make the thread the that is reading response wait while stream is getting reset ? Also If I need to change any thing in the reset_speech_stream() method to make synchronization work ? Also I was going through the examples from the tutorial you've mentioned: The python example never reinitializes the stream but if works for continuous speech no matter how long & also it writes and reads from the stream in the same thread (and it works :)). Java example reinitializes the stream but it uses some grpc inbuilt splitCall method to associate a new thread to read responses. It would be good and very helpful if google can provide an example how to make c++ example works with infinite continuous speech which I'm trying to achieve here. It took me a couple of days to try different stuff. Thank you once again for your help. |
Hi @coryan , I got something working today for infinite streaming by sharing the speech to stream between the threads and using synchronization: Method 1 Main threads process Google speech responses and an additional thread writes audio to google speech stream. But the main threads restarts the stream based on a timer for every 4.5 minutes. Issues I'm facing here are:
Method 2 Main threads writes audio to google speech stream and additional thread process google speech responses. And the main threads restarts the stream based on a timer for every 4.5 minutes. Issues I'm facing here are:
Can you suggest me which method is more efficient. From the examples Java and JS It would be great if you can suggest me which method is better and also any pseudo code on how to resend the audio after reset as the code in the examples are little complicated. Method 1 code
Method 2 code
|
I think you are quickly exceeding my knowledge of how this service works.
Awesome!
What does this mean? That part of the audio is not processed? Or that you drop audio packets?
I see.
Unfortunately, not. I do not have enough experience with this service to say one way or the other. Maybe @SurferJeffAtGoogle or @tmatsuo can chime in.
One thing I noticed is that the Java example does not cancel the stream, instead it sends a message to close it gracefully: Maybe we should do the same in C++? I think that is a https://grpc.github.io/grpc/cpp/classgrpc__impl_1_1_client_writer.html
No idea, but Method 2 seems easier to follow.
nit: in C++ the high resolution clock is not guaranteed to be steady (it can go back in time, and skip forward), though it typically is. I understand those things matter for audio (again, no expert here), so you may want to insert a static assert if you are assuming that this clock is in fact steady. https://en.cppreference.com/w/cpp/chrono/high_resolution_clock
is it? You are comparing this against std::chrono::milliseconds(...).count(), so it is really more like 10 seconds, but maybe I read the code wrong. Why not just say "std::chrono::minutes(5)" if that is what you want?
Maybe we should do a WritesDone() here. But we need to wait until the
You have no locks around streamer in this function, is there a reason for it?
|
Also, sorry I cannot be of more help. I do hear that having an example of infinite streaming for speech + C++ would be useful. I have added this to our backlog: googleapis/google-cloud-cpp#3356 Though candidly it will take us a while to get to it. |
Hi @coryan ,
Yeah, what ever the audio packets received while the stream is getting reset are not being processed. The examples mentioned in java and JS keeps track of when was the last final response received and sends again the audio from that time to the time of reset (and something like that). For me the logic in those examples seems complicated so it would be great if I can get a pseudo code of what to do about the audio packets coming in while stream is getting reset.
I will try with this today.
Oh thank you very much for the suggestions. I can use steady clock. Not sure how to use asserts in c++ but I'll figure out.
So In method2, The additional thread is waiting for responses and processing them. I should do streamer->Finish() at the end of this wile loop I believe then ? since the while loop is waiting till read() returns false.
I was just testing with 10 seconds. I'll change it back to 290000 for about 4 minutes and 50 seconds just to be safe (little less than 5 minutes).
You have no locks around streamer in this function, is there a reason for it? Which ever the thread that does reset, I'm not using locks around the streamer in that thread. Let me know if that is wrong. Thank you very much. |
Thank you very much for your help. You are very helpful for me in this work. I think I'm getting closer to have a working version.
Great. It would be very helpful. Meanwhile, if there is a pseudo code on how to do it along the liner of streaming_transcribe.cc, that would be very helpful for me. Thank you very much. |
Hi @coryan , I got the infinite streaming working with the method where the main thread writes audio to google speech while the additional thread reads responses. I've a quick question:
Or you suggested in your recent post to use Method 2
In the second method, I'm using
For method 1 The thread code looks as follows:
If you can suggest me the best way to reset the part is whether method 1 or method 2 it would be great. Also in both reset methods , the first line where it grabs the lock is some time taking about 10 seconds, probably it could be because of some bug in my synchronization part. I'll debug this issue. Also I've tried to move the code into a class:
The CPP file ASR_Google.cpp
Main method code
But the code is erroring out at What is the best way to declare this streamer object as Thank you very much. |
I found the above conversation very very helpful. Actually in my implementation, I have a parent program which is written in C. Whenever a transcribe request is made to the parent program, It forks to this C++ application providing a named pipe in argument to send audio to this C++ program which can be further sent to Google, and when the inactivity is detected. C++ sends finish() request to get the final response. Till then the parent program is waiting with waitpid(). After the C++ program exits, parent program picks up the PID and does rest of the processing. My question is, is this design okay or should I change it? |
This is a difficult question to answer. Since you are using both C and C++ the idea of forking a process seems fine. Whether a named pipe has the necessary capacity and does not get full, deadlocking your two programs, is a different question. Blocking on |
Seems like the questions posed here are answered. Closing |
Hi,
I'm building an example (c++ based) in which I've a GRPC server which receives continuous audio 100 milli second chunks/bytes (using GRPC bidirectional streaming) from client (real time audio speaking from mic). My server code sends the audio to Google cloud speech to get text from speech. I'm using this example streaming_transcribe.cc. I've three questions:
How do I handle errors like: when I can not connect to google speech or can not create a channel due to network connectivity issues or server is down, how to catch such exceptions ? Also the stream errors out in the middle of streaming audio (google closes streaming due to time exceeds or long pauses), where and how can I handle such errors.
Client can keep sending audio for long duration ( may be like 1 hour). When the google speech stream is open for more than 5 mins, I'm getting the following error
Exceeded maximum allowed stream duration of 305 seconds
. The streamingrecognize requests have a limitation of 5 mins it seems as mentioned here. The same page mentions a few examples about endless streaming tutorials with some code in java and python. If I can get an example of how to implement this in c++ in https://github.com/GoogleCloudPlatform/cpp-samples/blob/master/speech/api/streaming_transcribe.cc, that would be very helpful, at least a peudo code.The examples mentioned in endless streaming tutorials closes current stream and reconnects new stream after about 290 seconds. But in the streaming_transcribe.cc, the stream is passed to a thread for writing audio bytes to the stream and the main thread reads responses from stream. How to close and create a new stream in the MicrophoneThreadMain (https://github.com/GoogleCloudPlatform/cpp-samples/blob/master/speech/api/streaming_transcribe.cc#L36) function ?
A similar thread has been posted here but It was not that helpful for me.
How do we implement backoff and reconnect to Google stream (speech server) in case of errors (https://cloud.google.com/speech-to-text/sla). If I know how to catch or find the connection has errors (asked in [1], then we can implement this but it would be great to provide an example.
Any help is much appreciated , Thank you very much.
The text was updated successfully, but these errors were encountered: