Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C++ catching/finding connection and stream errors & continuous/infinite speech recognition #87

Closed
tumusudheer opened this issue Jan 14, 2020 · 14 comments
Labels
type: question Request for information or clarification. Not an issue.

Comments

@tumusudheer
Copy link

Hi,

I'm building an example (c++ based) in which I've a GRPC server which receives continuous audio 100 milli second chunks/bytes (using GRPC bidirectional streaming) from client (real time audio speaking from mic). My server code sends the audio to Google cloud speech to get text from speech. I'm using this example streaming_transcribe.cc. I've three questions:

  1. How do I handle errors like: when I can not connect to google speech or can not create a channel due to network connectivity issues or server is down, how to catch such exceptions ? Also the stream errors out in the middle of streaming audio (google closes streaming due to time exceeds or long pauses), where and how can I handle such errors.

  2. Client can keep sending audio for long duration ( may be like 1 hour). When the google speech stream is open for more than 5 mins, I'm getting the following error Exceeded maximum allowed stream duration of 305 seconds. The streamingrecognize requests have a limitation of 5 mins it seems as mentioned here. The same page mentions a few examples about endless streaming tutorials with some code in java and python. If I can get an example of how to implement this in c++ in https://github.com/GoogleCloudPlatform/cpp-samples/blob/master/speech/api/streaming_transcribe.cc, that would be very helpful, at least a peudo code.
    The examples mentioned in endless streaming tutorials closes current stream and reconnects new stream after about 290 seconds. But in the streaming_transcribe.cc, the stream is passed to a thread for writing audio bytes to the stream and the main thread reads responses from stream. How to close and create a new stream in the MicrophoneThreadMain (https://github.com/GoogleCloudPlatform/cpp-samples/blob/master/speech/api/streaming_transcribe.cc#L36) function ?
    A similar thread has been posted here but It was not that helpful for me.

  3. How do we implement backoff and reconnect to Google stream (speech server) in case of errors (https://cloud.google.com/speech-to-text/sla). If I know how to catch or find the connection has errors (asked in [1], then we can implement this but it would be great to provide an example.

Any help is much appreciated , Thank you very much.

@yoshi-automation yoshi-automation added the triage me I really want to be triaged. label Jan 14, 2020
@tumusudheer
Copy link
Author

Hi,

Any help is much appreciated for [1] and [2].

Thank you

@yoshi-automation yoshi-automation added the 🚨 This issue needs some love. label Jan 19, 2020
@coryan coryan added type: question Request for information or clarification. Not an issue. and removed 🚨 This issue needs some love. triage me I really want to be triaged. labels Jan 24, 2020
@coryan
Copy link
Contributor

coryan commented Jan 24, 2020

I'm building an example (c++ based) in which I've a GRPC server which receives continuous audio 100 milli second chunks/bytes (using GRPC bidirectional streaming) from client (real time audio speaking from mic). My server code sends the audio to Google cloud speech to get text from speech. I'm using this example streaming_transcribe.cc. I've three questions:

  1. How do I handle errors like: when I can not connect to google speech or can not create a channel due to network connectivity issues or server is down, how to catch such exceptions ?

I do not think creating the channel (this line):

auto channel = grpc::CreateChannel("speech.googleapis.com", creds);

can actually fail (well, it could be out of memory or something, but it does not try to connect or anything). Only once you try to use the channel you will see failures.

Also the stream errors out in the middle of streaming audio (google closes streaming due to time exceeds or long pauses), where and how can I handle such errors.

Error detection happens when you call Finish():

grpc::Status status = streamer->Finish();

At that point you get a grpc::Status and the status code tells you what action to take. The status code semantics are documented in:

https://github.com/grpc/grpc/blob/master/doc/statuscodes.md

Though sometimes the precise semantics of what constitutes a retriable error depends on the service.

  1. Client can keep sending audio for long duration ( may be like 1 hour). When the google speech stream is open for more than 5 mins, I'm getting the following error Exceeded maximum allowed stream duration of 305 seconds.

Okay.

The streamingrecognize requests have a limitation of 5 mins it seems as mentioned here.

Correct.

The same page mentions a few examples about endless streaming tutorials with some code in java and python.

If I can get an example of how to implement this in c++ in https://github.com/GoogleCloudPlatform/cpp-samples/blob/master/speech/api/streaming_transcribe.cc, that would be very helpful, at least a peudo code.

I am guessing it would be something like:

// Begin a stream.
auto last_reset = std::chrono::steady_close::now();
auto context = std::make_unique<grpc::ClientContext>();
auto streamer = speech->StreamingRecognize(context.get());

auto reset_stream = [&context, &streamer, &last_reset] {
    // Reset the stream
    context->TryCancel();
    auto status = streamer->Finish();
    if (not IsRecoverable(status)) {
      throw GrpcStatusToException(status);
    }
    context = std::make_unique<grpc::ClientContext>();
    streamer = speech->StreamingRecognize(context.get());
    last_reset = std::chrono::steady_clock::now();
};

while (still have input audio) {
  using std::chrono_literals;
  auto const elapsed_time = std::chrono::steady_clock::now() - start;
  if (elapsed_time > 4m) {
    reset_stream();
  }
  StreamingRecognizeRequest request = GetSomeAudioFromYourSource();
  streamer->Write(request);

  // Read responses.
  StreamingRecognizeResponse response;
  while (streamer->Read(&response)) {  // Returns false when no more to read.
    // Dump the transcript of all the results.
  }
  grpc::Status status = streamer->Finish();
  if (IsRecoverable(status)) {
    reset_stream();
    continue;
  }
  throw GrpcStatusToException(status);
}

The examples mentioned in endless streaming tutorials closes current stream and reconnects new stream after about 290 seconds. But in the streaming_transcribe.cc, the stream is passed to a thread for writing audio bytes to the stream and the main thread reads responses from stream. How to close and create a new stream in the MicrophoneThreadMain (https://github.com/GoogleCloudPlatform/cpp-samples/blob/master/speech/api/streaming_transcribe.cc#L36) function ?

Context::TryCancel() and then calling Finish() will close the stream

https://grpc.github.io/grpc/cpp/classgrpc__impl_1_1_client_context.html#a31fa8dbb85f66a18ad5041109154d9d2

A similar thread has been posted here but It was not that helpful for me.

  1. How do we implement backoff and reconnect to Google stream (speech server) in case of errors (https://cloud.google.com/speech-to-text/sla). If I know how to catch or find the connection has errors (asked in [1], then we can implement this but it would be great to provide an example.

See above, but feel free to ask for clarification if I was too obscure.

@tumusudheer
Copy link
Author

Hi @coryan

Thank you very much for the response and example code. It is really helpful. Based on what you've mentioned, I modified my code mentioned here:, if the code is not accessible, I've copied below as well.

The code (GRPC server) does the following

  1. server takes the audio bytes here and gives the transcription as response to the grpc client

  2. As given in the example, Writing audio to the google stream happens in a separate thread, while the main thread is reading response from google speech service.

I did the resetStream as you mentioned in your response, but I'm getting the following error:

resetting stream because of timer 
&&&& NOTE::: RESETING STREAM at 26972
 GRPC error detected 1  message:Cancelled
throw some exception here 
pure virtual method called
terminate called without an active exception
Aborted (core dumped)

I've declared the streamer and channel as global variables, hopefully that is fine.

Also I've one more question:
If The main thread hits the time limit (~ 3 minutes), we would like to reset the stream as mentioned in java and python examples.

  1. But, when the main thread is resetting the stream, how does the thread that writes audio to the stream behaves, because it is continuously sending audio.

Also 2. while the reset is happening, what will happen to the audio bytes those are being streamed at the time of resetting ? because the stream got reset, the main thread will not be able to read responses for that last audio.



#include <iostream>
#include <memory>
#include <string>
#include <fstream>
#include <chrono>
#include <iterator>
#include <thread>
#include <grpcpp/grpcpp.h>

#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif
#include "google/cloud/speech/v1/cloud_speech.grpc.pb.h"

using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerReader;
using grpc::ServerReaderWriter;
using grpc::ServerWriter;
using audiostream::HelloRequest;
using audiostream::HelloReply;
using audiostream::Streamer;
using audiostream::SpeechChunk;
using audiostream::TranscriptChunk;
using audiostream::SpeechRecognitionAlternative;

using google::cloud::speech::v1::Speech;
using google::cloud::speech::v1::StreamingRecognizeRequest;
using google::cloud::speech::v1::StreamingRecognizeResponse;
using google::cloud::speech::v1::RecognitionConfig;
using google::cloud::speech::v1::WordInfo;
using google::cloud::speech::v1::SpeechContext;
using google::protobuf::Duration;

// Create a Speech Stub connected to the speech service.
auto creds = grpc::GoogleDefaultCredentials();
auto channel = grpc::CreateChannel("speech.googleapis.com", creds);
std::unique_ptr<Speech::Stub> speech(Speech::NewStub(channel));

auto context = std::make_unique<grpc::ClientContext>();
auto streamer = speech->StreamingRecognize(context.get());

auto global_start = std::chrono::high_resolution_clock::now();
auto last_reset = std::chrono::high_resolution_clock::now();
int flag = 1;
int STREAMING_LIMIT = 29000; // ~29 seconds for testing purpose, can change it to < 3 min

void add_google_speech_results(audiostream::SpeechRecognitionAlternative* alt, google::cloud::speech::v1::SpeechRecognitionAlternative google_speech_alternative)
{
  alt->set_transcript(google_speech_alternative.transcript());//"bytes");
  alt->set_confidence(google_speech_alternative.confidence());//0.9);
}

// Dumpt the samples to a raw file 
void WriteToFile(const std::string& strFilename, std::vector<std::string>& audio_chunks)
{
  std::ofstream output_file(strFilename);
  std::ostream_iterator<std::string> output_iterator(output_file);
  std::copy(audio_chunks.begin(), audio_chunks.end(), output_iterator);
}

void add_google_hints(SpeechContext* speech_context, std::vector<std::string>& hints_words)
{
  for(std::string hints_word : hints_words)
  {
    speech_context->add_phrases(hints_word);
  }
  std::cout << " #### Number of hints added: " << speech_context->phrases_size() << "\n";
}

void setGoogleConfigSettings(RecognitionConfig* config)
{
  config->set_language_code("en");
  config->set_sample_rate_hertz(16000);  // Default sample rate.
  config->set_encoding(RecognitionConfig::LINEAR16);
  config->set_enable_word_time_offsets(true);
  config->set_profanity_filter(true);
  config->set_enable_automatic_punctuation(true);
  
  std::vector<std::string> hints_strings;
  hints_strings.push_back("hello");  
  add_google_hints(config->add_speech_contexts(), hints_strings);
  std::cout << " #### Number of speech contexts added: " << config->speech_contexts_size() << "\n";
  // for(std::string hints_string : hints_strings)
  // {
  //   add_google_hints(config->add_speech_contexts(), hints_string);
  // }
  
}

bool IsRecoverable(grpc::Status& grpc_status)
{
  std::cout << " GRPC error detected "<< grpc_status.error_code() << "\tmessage:" << grpc_status.error_message() << "\n";
  if(grpc_status.error_code() == 11) // google throwing exceeded maximum allowed duration of 305 seconds
  {
    return true;
  }

  return false;
}

auto reset_stream = [&context, &streamer, &last_reset]
{
    std::cout << "&&&& NOTE::: RESETING STREAM at " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-global_start).count() <<"\n";
    // Reset the stream
    context->TryCancel();
    auto status = streamer->Finish();
    if (not IsRecoverable(status)) {
      //TODO throw GrpcStatusToException(status);
      std::cout << "throw some exception here \n";
      //return;
    }
    context = std::make_unique<grpc::ClientContext>();
    streamer = speech->StreamingRecognize(context.get());
    last_reset = std::chrono::high_resolution_clock::now();
};

// Write the audio in 100 ms chunks at a time, simulating audio content arriving from a microphone.
static int MicrophoneThreadMain(
    grpc::ClientReaderWriterInterface<StreamingRecognizeRequest,
                                      StreamingRecognizeResponse>* streamer, ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream,
                                      std::vector<std::string>* audio_vector)
{
  flag = 1;
  SpeechChunk speech_chunk;
  StreamingRecognizeRequest request;
  while (stream->Read(&speech_chunk)) {
    std::string audio_content = speech_chunk.audio_content();
    //std::cout << "aduio content " << audio_content << "\n";
    request.set_audio_content(audio_content);
    //std::cout << " received " << audio_content.length() << " \n"; 
    //std::cout << "Sending " << audio_content.length() / 1024 << "k bytes." << std::endl;
    streamer->Write(request);
    if(flag == 1) 
    {
      global_start = std::chrono::high_resolution_clock::now();
      std::cout << "####### global_start timer updated ... " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count()<< "\n";
      flag = 2;
    }
    audio_vector->push_back(speech_chunk.audio_content());
  }
  std::cout << "closing stream.....\n";
  streamer->WritesDone();
  std::cout << "closed stream after writes done .....\n";
  return 0;
}
// Logic and data behind the server's behavior.
class GreeterServiceImpl final : public Streamer::Service 
{
  Status SayHello(ServerContext* context, const HelloRequest* request,
                  HelloReply* reply) override 
  {
    std::string prefix("Hello ");
    reply->set_message(prefix + request->name());
    std::cout << "hello world received >>>> \n";
    return Status::OK;
  }

  Status StreamAudio(ServerContext* context, ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream) override {
      std::cout << "####### Started receiving stream ... " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count()<< "\n";

      StreamingRecognizeRequest request;
      auto* streaming_config = request.mutable_streaming_config();
      setGoogleConfigSettings(streaming_config->mutable_config());
      streaming_config->set_interim_results(true);
      
      // Write the first request, containing the config only.
      streamer->Write(request);

      std::vector<std::string> audio_bytes;
      
      // The microphone thread writes the audio content.
      std::thread microphone_thread(&MicrophoneThreadMain, streamer.get(), stream, &audio_bytes);
      std::cout << "Reading responses....\n";
      
      // Read responses.
      StreamingRecognizeResponse response;
      auto full_start = std::chrono::high_resolution_clock::now();
      auto start = std::chrono::high_resolution_clock::now();
      while (streamer->Read(&response)) // Returns false when no more to read.
      {
        
        //std::cout << "### resp =======> " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-global_start).count() << " ms \n";
        auto resp_diff_with_global = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-global_start).count();
        // Dump the transcript of all the results.
        for (int r = 0; r < response.results_size(); ++r) 
        {
          const auto& result = response.results(r);
          TranscriptChunk transcript;
          transcript.set_is_final(result.is_final());
          transcript.set_stability(result.stability());
          //std::cout << "Result stability: " << result.stability() << std::endl;
          float stability_number = result.stability()*1.0;
          for (int a = 0; a < result.alternatives_size(); ++a) 
          {
            const auto& alternative = result.alternatives(a);
            add_google_speech_results(transcript.add_alternatives(), alternative);
            auto words = alternative.words();
            // std::cout << " ######" <<  stability_number << "\t" <<alternative.confidence() << "\t"
            //           << alternative.transcript() << " ..... " << words.size() << std::endl;
            if(stability_number >= 0.1)
            {
              std::cout << " ######" <<  stability_number <<  " is_final " << result.is_final() <<"\t" << alternative.transcript() << "\t" << resp_diff_with_global << " ms\n"; 
            }
            for(int i = 0; i < words.size(); i++) 
            {
              // Duration t1 = words[i].start_time();
              // Duration t2 = words[i].end_time();
              const auto& word = alternative.words(i);
              float start_secs = word.start_time().seconds() * 1000.0 + word.start_time().nanos()*1.0/1000000;
              float end_secs = word.end_time().seconds() * 1000.0 + word.end_time().nanos()*1.0/1000000;

              std::cout << word.word()  << " "<< start_secs  << " ms -  " << end_secs << " ms" << "\n";

            }
          }
          
          if(result.is_final())
          {
            std::cout << "final -----> " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-global_start).count() << " ms \n";
            start = std::chrono::high_resolution_clock::now();
          }
          
          stream->Write(transcript);

          if(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-last_reset).count() >= STREAMING_LIMIT)
          {
            std::cout << " resetting stream because of timer \n";
            reset_stream();
          }
        }
      }
      grpc::Status status = streamer->Finish();
      if(IsRecoverable(status))
      {
        std::cout << " resetting stream because of streamer finish caused some issues here \n";
        reset_stream();
      }

      microphone_thread.join();
      if (!status.ok()) {
        // Report the RPC failure.
        std::cerr << status.error_message() << std::endl;
        return Status::OK;
      }
      std::cout << " Writing to a file ... \n";
      WriteToFile("output.raw", audio_bytes);
      return Status::OK;
  }
};

void RunServer() {
  std::string server_address("0.0.0.0:50051");
  GreeterServiceImpl service;

  ServerBuilder builder;
  // Listen on the given address without any authentication mechanism.
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  // Register "service" as the instance through which we'll communicate with
  // clients. In this case it corresponds to an *synchronous* service.
  builder.RegisterService(&service);
  // Finally assemble the server.
  std::unique_ptr<Server> server(builder.BuildAndStart());
  std::cout << "Server listening on " << server_address << std::endl;

  // Wait for the server to shutdown. Note that some other thread must be
  // responsible for shutting down the server for this call to ever return.
  server->Wait();
}

int main(int argc, char** argv) {
  RunServer();

  return 0;
}

@coryan
Copy link
Contributor

coryan commented Jan 25, 2020

Thank you very much for the response and example code. It is really helpful.

No problem.

Based on what you've mentioned, I modified my code mentioned here:, if the code is not accessible, I've copied below as well.

Ack. I do not think I will be able to debug your code for you, but maybe I can give you some hints.

The code (GRPC server) does the following

  1. server takes the audio bytes here and gives the transcription as response to the grpc client
  2. As given in the example, Writing audio to the google stream happens in a separate thread,

I think you need to synchronize both threads, right? And you are copying a pointer to the streamer object in your MicrophoneThreadMain(). You need to:

  • keep both threads synchronized and sharing the same data
  • or avoid using a separate thread altogether, maybe doing some asynchronous calls
  • or avoid using the speech-to-text stream in two threads, passing the data from the microphone thread to the recognition thread via a queue or something (that is what I would do, without knowing more about your application).

while the main thread is reading response from google speech service.

I did the resetStream as you mentioned in your response, but I'm getting the following error:

resetting stream because of timer 
&&&& NOTE::: RESETING STREAM at 26972
 GRPC error detected 1  message:Cancelled
throw some exception here 
pure virtual method called
terminate called without an active exception
Aborted (core dumped)

I am guessing you still had at least one operation pending on the stream at the time? My code sketch did not cover synchronization between the thread reading from the speech-to-text stream and the thread writing to it. I will sketch one possible approach below, but this is really up to you I would think.

I've declared the streamer and channel as global variables, hopefully that is fine.

I would start thinking of wrapping all this state into a class by now, but this is, of course, up to you.

Also I've one more question:
If The main thread hits the time limit (~ 3 minutes), we would like to reset the stream as mentioned in java and python examples.

  1. But, when the main thread is resetting the stream, how does the thread that writes audio to the stream behaves, because it is continuously sending audio.

You need to synchronize manually with the stream sending the audio, or maybe through a queue or something. For example, you could do:

// Assume the Microphone thread will add requests to this queue, you need a mutex
// and condition variable to synchronize them, I am sure you can find examples of this
// on the Internet. Code is mostly C++17, can be adapted for C++11.
std::mutex mu;
std::condition_variable cv;
bool shutdown_queue;
std::deque<StreamingRecognizeRequest> requests;

auto get_next = [&mu, &cv, &requests] -> std::optional<StreamingRecognizeRequest> {
  std::unique_lock lk(mu);
  cv.wait([&] { return shutdown_queue or not requests.empty(); });
  if (shutdown_queue) return {};
  auto value = std::move(requests.back());
  requests.pop();
  return value;
};

for (auto v = get_next(); v.has_value(); v = get_next()) {
  using std::chrono_literals;
  auto const elapsed_time = std::chrono::steady_clock::now() - start;
  if (elapsed_time > 4m) {
    reset_stream();
  }
  streamer->Write(*std::move(v));

  // Read responses.
  StreamingRecognizeResponse response;
  while (streamer->Read(&response)) {  // Returns false when no more to read.
    // Dump the transcript of all the results.
  }
  grpc::Status status = streamer->Finish();
  if (IsRecoverable(status)) {
    reset_stream();
    continue;
  }
  throw GrpcStatusToException(status);
}

Also 2. while the reset is happening, what will happen to the audio bytes those are being streamed at the time of resetting ? because the stream got reset, the main thread will not be able to read responses for that last audio.

Right. I did not cover any of this in my pseudo-code. Maybe the new pseudo-ish code will get you going.

@tumusudheer
Copy link
Author

Hi @coryan,

Thank you very much for taking your time for providing example with synchronization. I'm sorry about posting big chunks of code, thought it would give you some idea where I'm doing some mistakes, not meant to give you the impression that you have to debug.

Basically I'm writing a GRPC server code that takes a continuous stream of bytes from client (it could be a mobile app where user can talk to or a small grpc client program that reads data from a wav file and streams to my server). The server code reads the bytes, sends to google speech service and provides the google ASR response to client.

I tried to avoid sharing the speech-to-text stream object in two threads and tried to did the following:
main thread reads google SpeechRequest from a queue, sends to google speech service and reds google speech responses, while microphone thread reads audio from grpc stream, creates SpeechRequest object with the audio bytes and pushes to the queue.
Example code:

static int MicrophoneThreadMain(
    ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream,
                                       std::mutex* mut, std::condition_variable* data_cond, std::queue<StreamingRecognizeRequest>* data_queue)
{
  flag = 1;
  SpeechChunk speech_chunk;
  StreamingRecognizeRequest request;
  while (stream->Read(&speech_chunk)) {
    std::string audio_content = speech_chunk.audio_content();
    request.set_audio_content(audio_content);
    std::lock_guard<std::mutex> lk(*mut);
    data_queue->push(request);
    data_cond->notify_one();  
  }
  std::cout << "Finished adding data to the queue from stream.....\n";
  return 0;
}

int get_next_audio_request_with_data(std::mutex* mut, std::condition_variable* data_cond, std::queue<StreamingRecognizeRequest>* data_queue, StreamingRecognizeRequest& output_request)
{
  std::unique_lock<std::mutex> lk(*mut);
  data_cond->wait(lk,[&]{return !data_queue->empty();});
  if(data_queue->empty())
  {
    return 1;
  }
  StreamingRecognizeRequest r = data_queue->front();
  data_queue->pop();
  output_request = r;
  return 0;
}

Status StreamAudio(ServerContext* context, ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream) override 
  {
    std::mutex mut;
    std::condition_variable data_cond;
    bool shutdown_queue=false;
    std::queue<StreamingRecognizeRequest> queue_with_requests;
    std::vector<std::string> audio_bytes;

    // The microphone thread writes the audio content.
    std::thread microphone_thread(&MicrophoneThreadMain, stream, &audio_bytes, &mut, &data_cond, &queue_with_requests);
   auto creds = grpc::GoogleDefaultCredentials();
    auto channel = grpc::CreateChannel("speech.googleapis.com", creds);
    std::unique_ptr<Speech::Stub> speech(Speech::NewStub(channel));
    // Parse command line arguments.
    StreamingRecognizeRequest request;
    auto* streaming_config = request.mutable_streaming_config();
    setGoogleConfigSettings(streaming_config->mutable_config());
    
    // Begin a stream.
    grpc::ClientContext client_context;
    auto streamer = speech->StreamingRecognize(&client_context);
    // Write the first request, containing the config only.
    streaming_config->set_interim_results(true);
    bool initial_write_success = streamer->Write(request);
    std::cout << " initial write success ...: " <<  initial_write_success << "\n";
    if(!initial_write_success)
    {
      return Status(StatusCode::UNAVAILABLE, "Something is wrong..., not able to succssfully opened google stream");//Status::OK;
    }
    
    while(true)
    {
      StreamingRecognizeRequest request;
      int fetch_next_chunk = get_next_audio_request_with_data(&mut, &data_cond, &queue_with_requests, request);
      if(fetch_next_chunk == 1)
      {
        break;
      }
      std::cout << " length... \t" << request.audio_content().length()  << "\n";
      //std::cout << "writing the request to google\n";
      streamer->Write(request);
      //std::cout << "reding response from google\n";
      StreamingRecognizeResponse response;
      while (streamer->Read(&response)) // Returns false when no more to read.
      {
         // process response here;
      }
   }
}

streamer->WritesDone();
grpc::Status status = streamer->Finish();
microphone_thread.join();

I'm able to write the stream with the above code but I'm not getting any responses from Google with the above code.

We do not want to use google speech async request. Then I started trying my old way where the main thread sends the response to the google speech while the other additional thread reads responses from google speech and process.

auto creds = grpc::GoogleDefaultCredentials();
auto channel = grpc::CreateChannel("speech.googleapis.com", creds);
std::unique_ptr<Speech::Stub> speech(Speech::NewStub(channel));
auto client_context = std::make_unique<grpc::ClientContext>();
auto streamer = speech->StreamingRecognize(client_context.get());

auto global_start = std::chrono::high_resolution_clock::now();
//auto end = std::chrono::high_resolution_clock::now();
auto last_reset = std::chrono::high_resolution_clock::now();
int flag = 1;
int STREAMING_LIMIT = 20000; // ~ 5 minutes

void reset_speech_stream(std::unique_ptr<grpc::ClientReaderWriterInterface<StreamingRecognizeRequest,
                                      StreamingRecognizeResponse>>& streamer)
{ 
  //std::unique_lock<std::mutex> lk(mut);
  //lk.lock();
  client_context->TryCancel();
  auto status = streamer->Finish();
  if (not IsRecoverable(status)) {
      //throw GrpcStatusToException(status);
    std::cout << " throw some exception here\n";
  }
  client_context = std::make_unique<grpc::ClientContext>();
  streamer = speech->StreamingRecognize(client_context.get());

  // Parse command line arguments.
  StreamingRecognizeRequest request;
  auto* streaming_config = request.mutable_streaming_config();
  setGoogleConfigSettings(streaming_config->mutable_config());
  streaming_config->set_interim_results(true);
  streamer->Write(request);
  last_reset = std::chrono::high_resolution_clock::now();
  //lk.unlock();
} 

static int MicrophoneThreadMain(
    grpc::ClientReaderWriterInterface<StreamingRecognizeRequest,
                                      StreamingRecognizeResponse>* streamer, ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream,
                                      )
{
  StreamingRecognizeResponse response;

  while (streamer->Read(&response)) // Returns false when no more to read.
  { 
    // process response
  }
}

Status StreamAudio(ServerContext* context, ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream) override 
{
      StreamingRecognizeRequest request;
      auto* streaming_config = request.mutable_streaming_config();
      setGoogleConfigSettings(streaming_config->mutable_config());
      streaming_config->set_interim_results(true);
      streamer->Write(request);  // write initial request with params
      std::unique_ptr<grpc::ClientReaderWriterInterface<StreamingRecognizeRequest,
                                      StreamingRecognizeResponse>> streamer = speech->StreamingRecognize(client_context.get());

      std::vector<std::string> audio_bytes;
      // The microphone thread writes the audio content.
      std::thread microphone_thread(&MicrophoneThreadMain, streamer.get(), stream, &audio_bytes);

      flag = 1;
      SpeechChunk speech_chunk;
      StreamingRecognizeRequest stream_request;
      while (stream->Read(&speech_chunk)) 
      {
        if(flag == 1) // test to see if reinitializing streamer works ?
        {
            std::cout << "............. resetting stream...\n";
            reset_speech_stream(streamer);
             flag = 2;
        }
        std::string audio_content = speech_chunk.audio_content();
        stream_request.set_audio_content(audio_content);

        streamer->Write(stream_request);
        audio_bytes.push_back(speech_chunk.audio_content());
      }
      std::cout << "closing stream.....\n";
      streamer->WritesDone();
      std::cout << "closed stream after writes done .....\n";
      grpc::Status status = streamer->Finish();      
      microphone_thread.join();
      if (!status.ok()) {
        // Report the RPC failure.
        std::cerr << status.error_message() << std::endl;
        return Status::OK;
      }
  }

I just want to to check if the reinitialization of the stream at the beginning of the code works ? If it works, then I will try to do after every 4.5 mins. But I'm not able to get it working. I'm able to send the audio data to google but the Thread that process the responses is not printing anything.

I believe Since streamer = speech->StreamingRecognize(client_context.get()); return a unique pointer , the main thread reinitialized the pointer because of calling reset_speech_stream() method, but the additional (other) Thread that is trying to process responses is reading responses from the old pointer. May I know how to reinitialize the unique pointer and make sure the thread reads responses from the reinitialized the pointer so it will be able to process them.

Also I'm a beginner to c++ and not know much about synchronization. You previous example with synchronization helped a lot for me to understand. So I wrote a small stub to reset the stream with mutex like here:

// Create a Speech Stub connected to the speech service.
auto creds = grpc::GoogleDefaultCredentials();
auto channel = grpc::CreateChannel("speech.googleapis.com", creds);
std::unique_ptr<Speech::Stub> speech(Speech::NewStub(channel));
auto client_context = std::make_unique<grpc::ClientContext>();
auto streamer = speech->StreamingRecognize(client_context.get());

auto global_start = std::chrono::high_resolution_clock::now();
//auto end = std::chrono::high_resolution_clock::now();
auto last_reset = std::chrono::high_resolution_clock::now();
int flag = 1;
int STREAMING_LIMIT = 20000; // ~ 5 minutes
std::mutex mut;
void reset_speech_stream(std::unique_ptr<grpc::ClientReaderWriterInterface<StreamingRecognizeRequest,
                                      StreamingRecognizeResponse>>& streamer)
{ 
 std::unique_lock<std::mutex> lk(mut);
  lk.lock();
  client_context->TryCancel();
  auto status = streamer->Finish();
  if (not IsRecoverable(status)) {
      //throw GrpcStatusToException(status);
    std::cout << " throw some exception here\n";
  }
  client_context = std::make_unique<grpc::ClientContext>();
  streamer = speech->StreamingRecognize(client_context.get());

  // Parse command line arguments.
  StreamingRecognizeRequest request;
  auto* streaming_config = request.mutable_streaming_config();
  setGoogleConfigSettings(streaming_config->mutable_config());
  streaming_config->set_interim_results(true);
  streamer->Write(request);
  last_reset = std::chrono::high_resolution_clock::now();
  lk.unlock();
} 

static int MicrophoneThreadMain(
    grpc::ClientReaderWriterInterface<StreamingRecognizeRequest,
                                      StreamingRecognizeResponse>* streamer, ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream,
                                      )
{
  StreamingRecognizeResponse response;

  /* This thread need to wait while stream is getting reset */
  while (streamer->Read(&response)) // Returns false when no more to read.
  { 
    // process response
  }
  
Status StreamAudio(ServerContext* context, ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream) override 
{
      StreamingRecognizeRequest request;
      auto* streaming_config = request.mutable_streaming_config();
      setGoogleConfigSettings(streaming_config->mutable_config());
      streaming_config->set_interim_results(true);
      streamer->Write(request);  // write initial request with params
      std::unique_ptr<grpc::ClientReaderWriterInterface<StreamingRecognizeRequest,
                                      StreamingRecognizeResponse>> streamer = speech->StreamingRecognize(client_context.get());

      std::vector<std::string> audio_bytes;
      // The microphone thread writes the audio content.
      std::thread microphone_thread(&MicrophoneThreadMain, streamer.get(), stream, &audio_bytes);

      flag = 1;
      SpeechChunk speech_chunk;
      StreamingRecognizeRequest stream_request;
      while (stream->Read(&speech_chunk)) 
      {
        if(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-last_reset).count() >= STREAMING_LIMIT)
        {
            std::cout << "............. resetting stream...\n";
            reset_speech_stream(streamer);
             flag = 2;
        }
        std::string audio_content = speech_chunk.audio_content();
        stream_request.set_audio_content(audio_content);

        streamer->Write(stream_request);
        audio_bytes.push_back(speech_chunk.audio_content());
      }
      std::cout << "closing stream.....\n";
      streamer->WritesDone();
      std::cout << "closed stream after writes done .....\n";
      grpc::Status status = streamer->Finish();      
      microphone_thread.join();
      if (!status.ok()) {
        // Report the RPC failure.
        std::cerr << status.error_message() << std::endl;
        return Status::OK;
      }
  }
}

I believe the main thread is calling the reset method so it would wait while the stream is resetting ? can you help me what do I need to do to make the thread the that is reading response wait while stream is getting reset ? Also If I need to change any thing in the reset_speech_stream() method to make synchronization work ?

Also I was going through the examples from the tutorial you've mentioned: The python example never reinitializes the stream but if works for continuous speech no matter how long & also it writes and reads from the stream in the same thread (and it works :)). Java example reinitializes the stream but it uses some grpc inbuilt splitCall method to associate a new thread to read responses. It would be good and very helpful if google can provide an example how to make c++ example works with infinite continuous speech which I'm trying to achieve here.

It took me a couple of days to try different stuff. Thank you once again for your help.

@tumusudheer
Copy link
Author

Hi @coryan ,

I got something working today for infinite streaming by sharing the speech to stream between the threads and using synchronization:

Method 1

Main threads process Google speech responses and an additional thread writes audio to google speech stream. But the main threads restarts the stream based on a timer for every 4.5 minutes.

Issues I'm facing here are:

  1. loss of audio while the stream is resetting.

Method 2

Main threads writes audio to google speech stream and additional thread process google speech responses. And the main threads restarts the stream based on a timer for every 4.5 minutes.

Issues I'm facing here are:

  1. loss of audio while the stream is resetting.
  2. After one or two times stream got reset, stream keep getting reset and I'm not getting any responses.

Can you suggest me which method is more efficient. From the examples Java and JS
, It seems I have to resend some audio after stream got reset. Based on this scenario I'm thinking method 2 is better because the main thread both writing the data to google speech and also restarting the stream , but I'm having the issue that after a couple of resets, the stream keep getting reset without giving any responses.

It would be great if you can suggest me which method is better and also any pseudo code on how to resend the audio after reset as the code in the examples are little complicated.

Method 1 code


// Create a Speech Stub connected to the speech service.
auto creds = grpc::GoogleDefaultCredentials();
auto channel = grpc::CreateChannel("speech.googleapis.com", creds);
std::unique_ptr<Speech::Stub> speech(Speech::NewStub(channel));

auto context = std::make_unique<grpc::ClientContext>();
auto streamer = speech->StreamingRecognize(context.get());

auto global_start = std::chrono::high_resolution_clock::now();
auto last_reset = std::chrono::high_resolution_clock::now();
int flag = 1;
int STREAMING_LIMIT = 10000; // ~5 minutes
std::mutex mut;

bool IsRecoverable(grpc::Status& grpc_status)
{
  std::cout << " GRPC error detected "<< grpc_status.error_code() << "\tmessage:" << grpc_status.error_message() << "\n";
  if(grpc_status.error_code() == 11) // google throwing exceeded maximum allowed duration of 305 seconds
  {
    return true;
  }
  return false;
}

auto reset_stream = [&context, &streamer, &last_reset]
{
    const std::lock_guard<std::mutex> s_lock(mut);
    // Reset the stream
    context->TryCancel();
    auto status = streamer->Finish();
    if (not IsRecoverable(status)) {
      //TODO throw GrpcStatusToException(status);
      std::cout << "throw some exception here \n";
      //return;
    }
    context = std::make_unique<grpc::ClientContext>();
    streamer = speech->StreamingRecognize(context.get());
    StreamingRecognizeRequest request;
    auto* streaming_config = request.mutable_streaming_config();
    setGoogleConfigSettings(streaming_config->mutable_config());
    streaming_config->set_interim_results(true);
    streamer->Write(request);
    last_reset = std::chrono::high_resolution_clock::now();
};

// Write the audio in 100 ms chunks at a time, simulating audio content arriving from a microphone.
static int MicrophoneThreadMain(ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream,
                                      std::vector<std::string>* audio_vector)
{
  flag = 1;
  SpeechChunk speech_chunk;
  StreamingRecognizeRequest request;
  
  while (stream->Read(&speech_chunk)) {
    std::string audio_content = speech_chunk.audio_content();
    request.set_audio_content(audio_content);
    std::unique_lock<std::mutex> lk(mut);
    streamer.get()->Write(request);
    lk.unlock();
    audio_vector->push_back(speech_chunk.audio_content());
  }

  std::unique_lock<std::mutex> lk(mut);
  streamer.get()->WritesDone();
  lk.unlock();
 
  return 0;
}

 Status StreamAudio(ServerContext* context, ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream) override
 {
      StreamingRecognizeRequest request;
      auto* streaming_config = request.mutable_streaming_config();
      setGoogleConfigSettings(streaming_config->mutable_config());
      streaming_config->set_interim_results(true);
      // Write the first request, containing the config only.
      streamer->Write(request);
      std::vector<std::string> audio_bytes;
      // The microphone thread writes the audio content.
      std::thread microphone_thread(&MicrophoneThreadMain, stream, &audio_bytes);
      // Read responses.
      StreamingRecognizeResponse response;
      while (streamer->Read(&response)) // Returns false when no more to read.
      {
         // process response and write final transcript to grpc response stream
          stream->Write(transcript);
          if(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-last_reset).count() >= STREAMING_LIMIT)
          {
            std::cout << " resetting stream because of timer \n";
            reset_stream();
          }
      }
      grpc::Status status = streamer->Finish();
      if(IsRecoverable(status))
      {
        std::cout << " resetting stream because of streamer finish caused some issues here \n";
        reset_stream();
      }

      microphone_thread.join();
      if (!status.ok()) {
        // Report the RPC failure.
        std::cerr << status.error_message() << std::endl;
        return Status::OK;
      }
      std::cout << " Writing to a file ... \n";
      WriteToFile("output.raw", audio_bytes);
      return Status::OK;
  }

Method 2 code


// Create a Speech Stub connected to the speech service.
auto creds = grpc::GoogleDefaultCredentials();
auto channel = grpc::CreateChannel("speech.googleapis.com", creds);
std::unique_ptr<Speech::Stub> speech(Speech::NewStub(channel));

auto context = std::make_unique<grpc::ClientContext>();
auto streamer = speech->StreamingRecognize(context.get());

auto global_start = std::chrono::high_resolution_clock::now();
auto last_reset = std::chrono::high_resolution_clock::now();
int flag = 1;
int STREAMING_LIMIT = 10000; // ~5 minutes
std::mutex mut;

bool IsRecoverable(grpc::Status& grpc_status)
{
  std::cout << " GRPC error detected "<< grpc_status.error_code() << "\tmessage:" << grpc_status.error_message() << "\n";
  if(grpc_status.error_code() == 11) // google throwing exceeded maximum allowed duration of 305 seconds
  {
    return true;
  }
  return false;
}

auto reset_stream = [&context, &streamer, &last_reset]
{
    const std::lock_guard<std::mutex> s_lock(mut);
    // Reset the stream
    context->TryCancel();
    auto status = streamer->Finish();
    if (not IsRecoverable(status)) {
      //TODO throw GrpcStatusToException(status);
      std::cout << "throw some exception here \n";
      //return;
    }
    context = std::make_unique<grpc::ClientContext>();
    streamer = speech->StreamingRecognize(context.get());
    StreamingRecognizeRequest request;
    auto* streaming_config = request.mutable_streaming_config();
    setGoogleConfigSettings(streaming_config->mutable_config());
    streaming_config->set_interim_results(true);
    streamer->Write(request);
    last_reset = std::chrono::high_resolution_clock::now();
};

static int MicrophoneThreadMain(ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream)
{
  // Read responses.
  auto start = std::chrono::high_resolution_clock::now();
  while (true) // Returns false when no more to read.
  {
    StreamingRecognizeResponse response;
    std::unique_lock<std::mutex> lk(mut);
    bool message_received = streamer.get()->Read(&response);
    lk.unlock();
    if(!message_received)
    {
      break;
    }
    // process responses 
    stream->Write(transcript); 
  }
}

// Logic and data behind the server's behavior.
class GreeterServiceImpl final : public Streamer::Service 

  Status StreamAudio(ServerContext* context, ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream) override 
  {
    StreamingRecognizeRequest request;
    auto* streaming_config = request.mutable_streaming_config();
    setGoogleConfigSettings(streaming_config->mutable_config());
    streaming_config->set_interim_results(true);
    streamer->Write(request);
    std::vector<std::string> audio_bytes;
    // The microphone thread writes the audio content.
    std::thread microphone_thread(&MicrophoneThreadMain, stream);
    flag = 1;
   
    SpeechChunk speech_chunk;
    StreamingRecognizeRequest data_request;
    while (stream->Read(&speech_chunk)) {
      std::string audio_content = speech_chunk.audio_content();
      data_request.set_audio_content(audio_content);
      streamer->Write(data_request);
      audio_bytes.push_back(speech_chunk.audio_content());
      if(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-last_reset).count() >= STREAMING_LIMIT)
      {
        std::cout << " resetting stream because of timer \n";
        reset_stream();
      }
    }
    streamer->WritesDone();
    WriteToFile("output.raw", audio_bytes);
    grpc::Status status = streamer->Finish();
    if(IsRecoverable(status))
    {
      std::cout << " resetting stream because of streamer finish caused some issues here \n";
      reset_stream();
    }
    microphone_thread.join();
    if (!status.ok()) {
      // Report the RPC failure.
      std::cerr << status.error_message() << std::endl;
      return Status::OK;
    }
    return Status::OK;
  }
};

@coryan
Copy link
Contributor

coryan commented Jan 29, 2020

I think you are quickly exceeding my knowledge of how this service works.

I got something working today for infinite streaming by sharing the speech to stream between the threads and using synchronization:

Awesome!

Method 1

Main threads process Google speech responses and an additional thread writes audio to google speech stream. But the main threads restarts the stream based on a timer for every 4.5 minutes.

Issues I'm facing here are:

  1. loss of audio while the stream is resetting.

What does this mean? That part of the audio is not processed? Or that you drop audio packets?

Method 2

Main threads writes audio to google speech stream and additional thread process google speech responses. And the main threads restarts the stream based on a timer for every 4.5 minutes.

Issues I'm facing here are:

  1. loss of audio while the stream is resetting.
  2. After one or two times stream got reset, stream keep getting reset and I'm not getting any responses.

I see.

Can you suggest me which method is more efficient.

Unfortunately, not. I do not have enough experience with this service to say one way or the other. Maybe @SurferJeffAtGoogle or @tmatsuo can chime in.

From the examples Java and JS

, It seems I have to resend some audio after stream got reset.

One thing I noticed is that the Java example does not cancel the stream, instead it sends a message to close it gracefully:

https://github.com/GoogleCloudPlatform/java-docs-samples/blob/cad876ad205e03a19abafcd16083b2790d84db95/speech/cloud-client/src/main/java/com/example/speech/InfiniteStreamRecognize.java#L223

Maybe we should do the same in C++? I think that is a WritesDone():

https://grpc.github.io/grpc/cpp/classgrpc__impl_1_1_client_writer.html

Based on this scenario I'm thinking method 2 is better because the main thread both writing the data to google speech and also restarting the stream , but I'm having the issue that after a couple of resets, the stream keep getting reset without giving any responses.

It would be great if you can suggest me which method is better

No idea, but Method 2 seems easier to follow.

and also any pseudo code on how to resend the audio after reset as the code in the examples are little complicated.

Method 2 code


// Create a Speech Stub connected to the speech service.
auto creds = grpc::GoogleDefaultCredentials();
auto channel = grpc::CreateChannel("speech.googleapis.com", creds);
std::unique_ptr<Speech::Stub> speech(Speech::NewStub(channel));

auto context = std::make_unique<grpc::ClientContext>();
auto streamer = speech->StreamingRecognize(context.get());

auto global_start = std::chrono::high_resolution_clock::now();
auto last_reset = std::chrono::high_resolution_clock::now();

nit: in C++ the high resolution clock is not guaranteed to be steady (it can go back in time, and skip forward), though it typically is. I understand those things matter for audio (again, no expert here), so you may want to insert a static assert if you are assuming that this clock is in fact steady.

https://en.cppreference.com/w/cpp/chrono/high_resolution_clock

int flag = 1;
int STREAMING_LIMIT = 10000; // ~5 minutes

is it? You are comparing this against std::chrono::milliseconds(...).count(), so it is really more like 10 seconds, but maybe I read the code wrong. Why not just say "std::chrono::minutes(5)" if that is what you want?

std::mutex mut;

bool IsRecoverable(grpc::Status& grpc_status)
{
std::cout << " GRPC error detected "<< grpc_status.error_code() << "\tmessage:" << grpc_status.error_message() << "\n";
if(grpc_status.error_code() == 11) // google throwing exceeded maximum allowed duration of 305 seconds
{
return true;
}
return false;
}

auto reset_stream = [&context, &streamer, &last_reset]
{
const std::lock_guardstd::mutex s_lock(mut);
// Reset the stream
context->TryCancel();

Maybe we should do a WritesDone() here. But we need to wait until the Read() call returns false before calling Finish():

https://grpc.github.io/grpc/cpp/classgrpc__impl_1_1internal_1_1_client_streaming_interface.html#a010a28c3d4428fae26c5e7f6866e502c

auto status = streamer->Finish();
if (not IsRecoverable(status)) {
  //TODO throw GrpcStatusToException(status);
  std::cout << "throw some exception here \n";
  //return;
}
context = std::make_unique<grpc::ClientContext>();
streamer = speech->StreamingRecognize(context.get());
StreamingRecognizeRequest request;
auto* streaming_config = request.mutable_streaming_config();
setGoogleConfigSettings(streaming_config->mutable_config());
streaming_config->set_interim_results(true);
streamer->Write(request);
last_reset = std::chrono::high_resolution_clock::now();

};

static int MicrophoneThreadMain(ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream)
{
// Read responses.
auto start = std::chrono::high_resolution_clock::now();
while (true) // Returns false when no more to read.
{
StreamingRecognizeResponse response;
std::unique_lockstd::mutex lk(mut);
bool message_received = streamer.get()->Read(&response);
lk.unlock();
if(!message_received)
{
break;
}
// process responses
stream->Write(transcript);
}
}

// Logic and data behind the server's behavior.
class GreeterServiceImpl final : public Streamer::Service

Status StreamAudio(ServerContext* context, ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream) override
{
StreamingRecognizeRequest request;
auto* streaming_config = request.mutable_streaming_config();
setGoogleConfigSettings(streaming_config->mutable_config());
streaming_config->set_interim_results(true);
streamer->Write(request);

You have no locks around streamer in this function, is there a reason for it?

std::vector<std::string> audio_bytes;
// The microphone thread writes the audio content.
std::thread microphone_thread(&MicrophoneThreadMain, stream);
flag = 1;

SpeechChunk speech_chunk;
StreamingRecognizeRequest data_request;
while (stream->Read(&speech_chunk)) {
  std::string audio_content = speech_chunk.audio_content();
  data_request.set_audio_content(audio_content);
  streamer->Write(data_request);
  audio_bytes.push_back(speech_chunk.audio_content());
  if(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-last_reset).count() >= STREAMING_LIMIT)
  {
    std::cout << " resetting stream because of timer \n";
    reset_stream();
  }
}
streamer->WritesDone();
WriteToFile("output.raw", audio_bytes);
grpc::Status status = streamer->Finish();
if(IsRecoverable(status))
{
  std::cout << " resetting stream because of streamer finish caused some issues here \n";
  reset_stream();
}
microphone_thread.join();
if (!status.ok()) {
  // Report the RPC failure.
  std::cerr << status.error_message() << std::endl;
  return Status::OK;
}
return Status::OK;

}
};

@coryan
Copy link
Contributor

coryan commented Jan 29, 2020

Also, sorry I cannot be of more help. I do hear that having an example of infinite streaming for speech + C++ would be useful. I have added this to our backlog:

googleapis/google-cloud-cpp#3356

Though candidly it will take us a while to get to it.

@tumusudheer
Copy link
Author

Hi @coryan ,

What does this mean? That part of the audio is not processed? Or that you drop audio packets?

Yeah, what ever the audio packets received while the stream is getting reset are not being processed. The examples mentioned in java and JS keeps track of when was the last final response received and sends again the audio from that time to the time of reset (and something like that). For me the logic in those examples seems complicated so it would be great if I can get a pseudo code of what to do about the audio packets coming in while stream is getting reset.

Maybe we should do the same in C++? I think that is a WritesDone():

I will try with this today.

nit: in C++ the high resolution clock is not guaranteed to be steady (it can go back in time, and skip forward), though it typically is. I understand those things matter for audio (again, no expert here), so you may want to insert a static assert if you are assuming that this clock is in fact steady.

Oh thank you very much for the suggestions. I can use steady clock. Not sure how to use asserts in c++ but I'll figure out.

Maybe we should do a WritesDone() here. But we need to wait until the Read() call returns false before calling Finish():
I'll try with WritesDone().

static int MicrophoneThreadMain(ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream)
{
  // Read responses.
  auto start = std::chrono::high_resolution_clock::now();
  while (true) // Returns false when no more to read.
  {
    StreamingRecognizeResponse response;
    std::unique_lock<std::mutex> lk(mut);
    bool message_received = streamer.get()->Read(&response);
    lk.unlock();
    if(!message_received)
    {
      break;
    }
    // process responses 
    stream->Write(transcript); 
  }

So In method2, The additional thread is waiting for responses and processing them. I should do streamer->Finish() at the end of this wile loop I believe then ? since the while loop is waiting till read() returns false.

int STREAMING_LIMIT = 10000; // ~5 minutes

I was just testing with 10 seconds. I'll change it back to 290000 for about 4 minutes and 50 seconds just to be safe (little less than 5 minutes).

static int MicrophoneThreadMain(ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream)
{
  // Read responses.
  auto start = std::chrono::high_resolution_clock::now();
  while (true) // Returns false when no more to read.
  {
    StreamingRecognizeResponse response;
    std::unique_lock<std::mutex> lk(mut);
    bool message_received = streamer.get()->Read(&response);
    lk.unlock();
    if(!message_received)
    {
      break;
    }
    // process responses 
    stream->Write(transcript); 
  }
}

// Logic and data behind the server's behavior.
class GreeterServiceImpl final : public Streamer::Service 

  Status StreamAudio(ServerContext* context, ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream) override 
  {
    StreamingRecognizeRequest request;
    auto* streaming_config = request.mutable_streaming_config();
    setGoogleConfigSettings(streaming_config->mutable_config());
    streaming_config->set_interim_results(true);
    streamer->Write(request);
    std::vector<std::string> audio_bytes;
    // The microphone thread writes the audio content.
    std::thread microphone_thread(&MicrophoneThreadMain, stream);
    flag = 1;
   
    SpeechChunk speech_chunk;
    StreamingRecognizeRequest data_request;
    while (stream->Read(&speech_chunk)) {
      std::string audio_content = speech_chunk.audio_content();
      data_request.set_audio_content(audio_content);
      streamer->Write(data_request);
      audio_bytes.push_back(speech_chunk.audio_content());
      if(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-last_reset).count() >= STREAMING_LIMIT)
      {
        std::cout << " resetting stream because of timer \n";
        reset_stream();
      }
    }
    streamer->WritesDone();
    WriteToFile("output.raw", audio_bytes);
    grpc::Status status = streamer->Finish();
    if(IsRecoverable(status))
    {
      std::cout << " resetting stream because of streamer finish caused some issues here \n";
      reset_stream();
    }
    microphone_thread.join();
    if (!status.ok()) {
      // Report the RPC failure.
      std::cerr << status.error_message() << std::endl;
      return Status::OK;
    }
    return Status::OK;
  }
};

You have no locks around streamer in this function, is there a reason for it?
since the main thread (not the MicrophoneThreadMain) is doing the reset based on the timer, so I thought it doesn't need to have locks around the streamer. Or do I need to have a separate thread that just does reset functionality based on the timer ?

Which ever the thread that does reset, I'm not using locks around the streamer in that thread. Let me know if that is wrong.

Thank you very much.

@tumusudheer
Copy link
Author

Also, sorry I cannot be of more help.

Thank you very much for your help. You are very helpful for me in this work. I think I'm getting closer to have a working version.

I do hear that having an example of infinite streaming for speech + C++ would be useful. I have added this to our backlog:

googleapis/google-cloud-cpp#3356

Though candidly it will take us a while to get to it.

Great. It would be very helpful. Meanwhile, if there is a pseudo code on how to do it along the liner of streaming_transcribe.cc, that would be very helpful for me.

Thank you very much.

@tumusudheer
Copy link
Author

Hi @coryan ,

I got the infinite streaming working with the method where the main thread writes audio to google speech while the additional thread reads responses. I've a quick question:
what is the best way to reset the stream ?
Method 1

auto reset_stream = [&context, &streamer, &last_reset]
{
    const std::lock_guard<std::mutex> s_lock(mut);
    // Reset the stream
    context->TryCancel();
    auto status = streamer->Finish();
    if (not IsRecoverable(status)) {
      //TODO throw GrpcStatusToException(status);
    
    }
    context = std::make_unique<grpc::ClientContext>();
    streamer = speech->StreamingRecognize(context.get());
    StreamingRecognizeRequest request;
    auto* streaming_config = request.mutable_streaming_config();
    setGoogleConfigSettings(streaming_config->mutable_config());
    streaming_config->set_interim_results(true);
    // Write the first request, containing the config only.
    streamer->Write(request);
    last_reset = std::chrono::high_resolution_clock::now();
    std::cout << "reset stream is done  at " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()-global_start).count() <<"\n";
};

Or you suggested in your recent post to use writesDone

Method 2

auto reset_stream = [&context, &streamer, &last_reset]
{
    std::unique_lock<std::mutex> s_lock(mut);
    // Reset the stream
    streamer->WritesDone();
    condVar.wait(s_lock,[]{return stream_read_finished;});
    stream_read_finished = false;
    auto status = streamer->Finish();
    if (not IsRecoverable(status)) {
      //TODO throw GrpcStatusToException(status);
    }
    context->TryCancel();
    context = std::make_unique<grpc::ClientContext>();
    streamer = speech->StreamingRecognize(context.get());
    StreamingRecognizeRequest request;
    auto* streaming_config = request.mutable_streaming_config();
    setGoogleConfigSettings(streaming_config->mutable_config());
    streaming_config->set_interim_results(true);
    streamer->Write(request);
    last_reset = std::chrono::high_resolution_clock::now();
    s_lock.unlock();
};

In the second method, I'm using streamer->WritesDone(); and I believe after doing writesDone(), the streamer may still give response, so I added a conditional variable to wait for till the reads are done from streamer. For method 2 above, My thread to process responses is as follows:

static int MicrophoneThreadMain(ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream)
{
  // Read responses.
  while (!stream_write_finished) // this variable becomes true when grpc received no more data
  {
    StreamingRecognizeResponse response;
    std::unique_lock<std::mutex> lk(mut);
    bool message_received = streamer.get()->Read(&response);
    if(!message_received)
    {
      stream_read_finished = true;
      condVar.notify_all();
      lk.unlock();
      continue;
    }
    lk.unlock();
   // process responses here
  }
  std::cout << " |||||||||||||||| REACHED END OF READING \n";
}

For method 1 The thread code looks as follows:

static int MicrophoneThreadMain(ServerReaderWriter<TranscriptChunk, SpeechChunk>* stream)
{
  // Read responses.
  while (!stream_write_finished) // this variable becomes true when grpc received no more data
  {
    StreamingRecognizeResponse response;
    std::unique_lock<std::mutex> lk(mut);
    bool message_received = streamer.get()->Read(&response);
    lk.unlock();
    if(!message_received)
    {
      continue;
    }
     // process responses here
  }
  std::cout << " |||||||||||||||| REACHED END OF READING \n";
}

If you can suggest me the best way to reset the part is whether method 1 or method 2 it would be great. Also in both reset methods , the first line where it grabs the lock is some time taking about 10 seconds, probably it could be because of some bug in my synchronization part. I'll debug this issue.

Also I've tried to move the code into a class:

class ASR_Google
{

public:
    ASR_Google(void);
    ~ASR_Google(void);
    int Initialize();
    void SendData(std::string& audio_bytes);
std::unique_ptr<Speech::Stub> speech;  
std::unique_ptr<grpc::ClientReaderWriter<google::cloud::speech::v1::StreamingRecognizeRequest, google::cloud::speech::v1::StreamingRecognizeResponse>> google_streamer; 

};

The CPP file ASR_Google.cpp

void ASR_Google::SendData(std::string& audio_bytes)
{
    audio_request.set_audio_content(audio_bytes);
    google_streamer->Write(audio_request);
}
    
int ASR_Google::Initialize()
{
    // Step 1: Create a Speech Stub connected to the speech service.
    
    auto creds = grpc::GoogleDefaultCredentials();
    auto channel = grpc::CreateChannel("speech.googleapis.com", creds);
    speech = Speech::NewStub(channel);

    // Parse command line arguments.
    StreamingRecognizeRequest request;
    auto* streaming_config = request.mutable_streaming_config();
    SetGoogleConfigSettings(streaming_config->mutable_config());

    // Begin a stream.
    grpc::ClientContext client_context;
    google_streamer = speech->StreamingRecognize(&client_context);
    // Write the first request, containing the config only.
    streaming_config->set_interim_results(true);
    google_streamer->Write(request);
    return 0;
}

Main method code

int main()
{
   ASR_Google asr_google;
  asr_google.Initialize();
 while(data)
{
asr_google. SendData()
}
}

But the code is erroring out at google_streamer->Write(audio_request); After debugging I found the issue is the unique_ptr google_streamer

What is the best way to declare this streamer object as std::unique_ptr<grpc::ClientReaderWriter<google::cloud::speech::v1::StreamingRecognizeRequest, google::cloud::speech::v1::StreamingRecognizeResponse>> google_streamer; in class and initialize in the initialze() method. I could be wrong, but I think this has to be a class variable because it should get reset for every 4.5 minutes.

Thank you very much.

@karan-k111098
Copy link

I found the above conversation very very helpful. Actually in my implementation, I have a parent program which is written in C. Whenever a transcribe request is made to the parent program, It forks to this C++ application providing a named pipe in argument to send audio to this C++ program which can be further sent to Google, and when the inactivity is detected. C++ sends finish() request to get the final response. Till then the parent program is waiting with waitpid(). After the C++ program exits, parent program picks up the PID and does rest of the processing.

My question is, is this design okay or should I change it?

@coryan
Copy link
Contributor

coryan commented Jan 20, 2021

My question is, is this design okay or should I change it?

This is a difficult question to answer. Since you are using both C and C++ the idea of forking a process seems fine. Whether a named pipe has the necessary capacity and does not get full, deadlocking your two programs, is a different question. Blocking on waitpid() without reading from the pipe or allowing for timeouts seems dangerous to me, I would consider pselect() to wait for I/O, signals (SIGCHLD in this case), and a timeout, all at the same time.

@coryan
Copy link
Contributor

coryan commented Jun 9, 2022

Seems like the questions posed here are answered. Closing

@coryan coryan closed this as completed Jun 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

4 participants