Skip to content

Commit

Permalink
Removed SyncMessenger from DSPDeviceSinkEngine. Part of #2159
Browse files Browse the repository at this point in the history
  • Loading branch information
f4exb committed Aug 27, 2024
1 parent 2b26f15 commit d553834
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 114 deletions.
201 changes: 97 additions & 104 deletions sdrbase/dsp/dspdevicesinkengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,15 @@ DSPDeviceSinkEngine::DSPDeviceSinkEngine(uint32_t uid, QObject* parent) :
m_centerFrequency(0),
m_realElseComplex(false)
{
setState(StIdle);
connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection);
connect(&m_syncMessenger, SIGNAL(messageSent()), this, SLOT(handleSynchronousMessages()), Qt::QueuedConnection);

moveToThread(this);
}

DSPDeviceSinkEngine::~DSPDeviceSinkEngine()
{
stop();
wait();
qDebug("DSPDeviceSinkEngine::~DSPDeviceSinkEngine");
}

void DSPDeviceSinkEngine::setState(State state)
Expand Down Expand Up @@ -86,32 +85,32 @@ void DSPDeviceSinkEngine::stop()
bool DSPDeviceSinkEngine::initGeneration()
{
qDebug() << "DSPDeviceSinkEngine::initGeneration";
DSPGenerationInit cmd;

return m_syncMessenger.sendWait(cmd) == StReady;
auto *cmd = new DSPGenerationInit();
getInputMessageQueue()->push(cmd);
return true;
}

bool DSPDeviceSinkEngine::startGeneration()
{
qDebug() << "DSPDeviceSinkEngine::startGeneration";
DSPGenerationStart cmd;

return m_syncMessenger.sendWait(cmd) == StRunning;
auto *cmd = new DSPGenerationStart();
getInputMessageQueue()->push(cmd);
return true;
}

void DSPDeviceSinkEngine::stopGeneration()
{
qDebug() << "DSPDeviceSinkEngine::stopGeneration";
DSPGenerationStop cmd;
m_syncMessenger.storeMessage(cmd);
handleSynchronousMessages();
DSPGenerationStop *cmd = new DSPGenerationStop();
getInputMessageQueue()->push(cmd);
}

void DSPDeviceSinkEngine::setSink(DeviceSampleSink* sink)
{
qDebug() << "DSPDeviceSinkEngine::setSink";
DSPSetSink cmd(sink);
m_syncMessenger.sendWait(cmd);
m_deviceSampleSink = sink;
auto *cmd = new DSPSetSink(sink);
getInputMessageQueue()->push(cmd);
}

void DSPDeviceSinkEngine::setSinkSequence(int sequence)
Expand All @@ -123,45 +122,40 @@ void DSPDeviceSinkEngine::setSinkSequence(int sequence)
void DSPDeviceSinkEngine::addChannelSource(BasebandSampleSource* source)
{
qDebug() << "DSPDeviceSinkEngine::addChannelSource: " << source->getSourceName().toStdString().c_str();
DSPAddBasebandSampleSource cmd(source);
m_syncMessenger.sendWait(cmd);
DSPAddBasebandSampleSource *cmd = new DSPAddBasebandSampleSource(source);
getInputMessageQueue()->push(cmd);
}

void DSPDeviceSinkEngine::removeChannelSource(BasebandSampleSource* source)
{
qDebug() << "DSPDeviceSinkEngine::removeChannelSource: " << source->getSourceName().toStdString().c_str();
DSPRemoveBasebandSampleSource cmd(source);
m_syncMessenger.sendWait(cmd);
auto *cmd = new DSPRemoveBasebandSampleSource(source);
getInputMessageQueue()->push(cmd);
}

void DSPDeviceSinkEngine::addSpectrumSink(BasebandSampleSink* spectrumSink)
{
qDebug() << "DSPDeviceSinkEngine::addSpectrumSink: " << spectrumSink->getSinkName().toStdString().c_str();
DSPAddSpectrumSink cmd(spectrumSink);
m_syncMessenger.sendWait(cmd);
m_spectrumSink = spectrumSink;
}

void DSPDeviceSinkEngine::removeSpectrumSink(BasebandSampleSink* spectrumSink)
{
qDebug() << "DSPDeviceSinkEngine::removeSpectrumSink: " << spectrumSink->getSinkName().toStdString().c_str();
DSPRemoveSpectrumSink cmd(spectrumSink);
m_syncMessenger.sendWait(cmd);
auto *cmd = new DSPRemoveSpectrumSink(spectrumSink);
getInputMessageQueue()->push(cmd);
}

QString DSPDeviceSinkEngine::errorMessage()
{
qDebug() << "DSPDeviceSinkEngine::errorMessage";
DSPGetErrorMessage cmd;
m_syncMessenger.sendWait(cmd);
return cmd.getErrorMessage();
return m_errorMessage;
}

QString DSPDeviceSinkEngine::sinkDeviceDescription()
{
qDebug() << "DSPDeviceSinkEngine::sinkDeviceDescription";
DSPGetSinkDeviceDescription cmd;
m_syncMessenger.sendWait(cmd);
return cmd.getDeviceDescription();
return m_deviceDescription;
}

void DSPDeviceSinkEngine::workSampleFifo()
Expand Down Expand Up @@ -399,15 +393,13 @@ DSPDeviceSinkEngine::State DSPDeviceSinkEngine::gotoError(const QString& errorMe
return StError;
}

void DSPDeviceSinkEngine::handleSetSink(DeviceSampleSink* sink)
void DSPDeviceSinkEngine::handleSetSink(DeviceSampleSink*)
{
m_deviceSampleSink = sink;

if (!m_deviceSampleSink) { // Early leave
return;
}

qDebug("DSPDeviceSinkEngine::handleSetSink: set %s", qPrintable(sink->getDeviceDescription()));
qDebug("DSPDeviceSinkEngine::handleSetSink: set %s", qPrintable(m_deviceSampleSink->getDeviceDescription()));

QObject::connect(
m_deviceSampleSink->getSampleFifo(),
Expand All @@ -416,7 +408,6 @@ void DSPDeviceSinkEngine::handleSetSink(DeviceSampleSink* sink)
&DSPDeviceSinkEngine::handleData,
Qt::QueuedConnection
);

}

void DSPDeviceSinkEngine::handleData()
Expand All @@ -426,126 +417,128 @@ void DSPDeviceSinkEngine::handleData()
}
}

void DSPDeviceSinkEngine::handleSynchronousMessages()
bool DSPDeviceSinkEngine::handleMessage(const Message& message)
{
Message *message = m_syncMessenger.getMessage();
qDebug() << "DSPDeviceSinkEngine::handleSynchronousMessages: " << message->getIdentifier();
if (DSPSignalNotification::match(message))
{
const DSPSignalNotification& notif = (const DSPSignalNotification&) message;

// update DSP values

m_sampleRate = notif.getSampleRate();
m_centerFrequency = notif.getCenterFrequency();
m_realElseComplex = notif.getRealElseComplex();

qDebug() << "DSPDeviceSinkEngine::handleInputMessages: DSPSignalNotification:"
<< " m_sampleRate: " << m_sampleRate
<< " m_centerFrequency: " << m_centerFrequency
<< " m_realElseComplex" << m_realElseComplex;

// forward source changes to sources with immediate execution

for(BasebandSampleSources::const_iterator it = m_basebandSampleSources.begin(); it != m_basebandSampleSources.end(); it++)
{
auto *rep = new DSPSignalNotification(notif); // make a copy
qDebug() << "DSPDeviceSinkEngine::handleInputMessages: forward message to " << (*it)->getSourceName().toStdString().c_str();
(*it)->pushMessage(rep);
}

// forward changes to listeners on DSP output queue
if (m_deviceSampleSink)
{
MessageQueue *guiMessageQueue = m_deviceSampleSink->getMessageQueueToGUI();
qDebug("DSPDeviceSinkEngine::handleInputMessages: DSPSignalNotification: guiMessageQueue: %p", guiMessageQueue);

if (DSPGenerationInit::match(*message))
if (guiMessageQueue)
{
auto *rep = new DSPSignalNotification(notif); // make a copy for the output queue
guiMessageQueue->push(rep);
}
}

return true;
}
// From synchronous messages
if (DSPGenerationInit::match(message))
{
setState(gotoIdle());

if(m_state == StIdle) {
setState(gotoInit()); // State goes ready if init is performed
}

return true;
}
else if (DSPGenerationStart::match(*message))
else if (DSPGenerationStart::match(message))
{
if(m_state == StReady) {
setState(gotoRunning());
}

return true;
}
else if (DSPGenerationStop::match(*message))
else if (DSPGenerationStop::match(message))
{
setState(gotoIdle());
return true;
}
else if (DSPGetSinkDeviceDescription::match(*message))
{
((DSPGetSinkDeviceDescription*) message)->setDeviceDescription(m_deviceDescription);
}
else if (DSPGetErrorMessage::match(*message))
{
((DSPGetErrorMessage*) message)->setErrorMessage(m_errorMessage);
}
else if (DSPSetSink::match(*message)) {
handleSetSink(((DSPSetSink*) message)->getSampleSink());
}
else if (DSPAddSpectrumSink::match(*message))
{
m_spectrumSink = ((DSPAddSpectrumSink*) message)->getSampleSink();
else if (DSPSetSink::match(message))
{
const DSPSetSink& cmd = (const DSPSetSink&) message;
handleSetSink(cmd.getSampleSink());
return true;
}
else if (DSPRemoveSpectrumSink::match(*message))
else if (DSPRemoveSpectrumSink::match(message))
{
BasebandSampleSink* spectrumSink = ((DSPRemoveSpectrumSink*) message)->getSampleSink();
auto& cmd = (const DSPRemoveSpectrumSink&) message;
BasebandSampleSink* spectrumSink = cmd.getSampleSink();

if(m_state == StRunning) {
spectrumSink->stop();
}

m_spectrumSink = nullptr;
return true;
}
else if (DSPAddBasebandSampleSource::match(*message))
else if (DSPAddBasebandSampleSource::match(message))
{
BasebandSampleSource* source = ((DSPAddBasebandSampleSource*) message)->getSampleSource();
auto& cmd = (const DSPAddBasebandSampleSource&) message;
BasebandSampleSource* source = cmd.getSampleSource();
m_basebandSampleSources.push_back(source);
DSPSignalNotification *notif = new DSPSignalNotification(m_sampleRate, m_centerFrequency);
auto *notif = new DSPSignalNotification(m_sampleRate, m_centerFrequency);
source->pushMessage(notif);

if (m_state == StRunning)
{
if (m_state == StRunning) {
source->start();
}

return true;
}
else if (DSPRemoveBasebandSampleSource::match(*message))
else if (DSPRemoveBasebandSampleSource::match(message))
{
BasebandSampleSource* source = ((DSPRemoveBasebandSampleSource*) message)->getSampleSource();
auto& cmd = (const DSPRemoveBasebandSampleSource&) message;
BasebandSampleSource* source = cmd.getSampleSource();

if(m_state == StRunning) {
source->stop();
}

m_basebandSampleSources.remove(source);
return true;
}

m_syncMessenger.done(m_state);
return false;
}

void DSPDeviceSinkEngine::handleInputMessages()
{
Message* message;

while ((message = m_inputMessageQueue.pop()) != 0)
while ((message = m_inputMessageQueue.pop()) != nullptr)
{
qDebug("DSPDeviceSinkEngine::handleInputMessages: message: %s", message->getIdentifier());

if (DSPSignalNotification::match(*message))
{
DSPSignalNotification *notif = (DSPSignalNotification *) message;

// update DSP values

m_sampleRate = notif->getSampleRate();
m_centerFrequency = notif->getCenterFrequency();
m_realElseComplex = notif->getRealElseComplex();

qDebug() << "DSPDeviceSinkEngine::handleInputMessages: DSPSignalNotification:"
<< " m_sampleRate: " << m_sampleRate
<< " m_centerFrequency: " << m_centerFrequency
<< " m_realElseComplex" << m_realElseComplex;

// forward source changes to sources with immediate execution

for(BasebandSampleSources::const_iterator it = m_basebandSampleSources.begin(); it != m_basebandSampleSources.end(); it++)
{
DSPSignalNotification* rep = new DSPSignalNotification(*notif); // make a copy
qDebug() << "DSPDeviceSinkEngine::handleInputMessages: forward message to " << (*it)->getSourceName().toStdString().c_str();
(*it)->pushMessage(rep);
}

// forward changes to listeners on DSP output queue
if (m_deviceSampleSink)
{
MessageQueue *guiMessageQueue = m_deviceSampleSink->getMessageQueueToGUI();
qDebug("DSPDeviceSinkEngine::handleInputMessages: DSPSignalNotification: guiMessageQueue: %p", guiMessageQueue);

if (guiMessageQueue)
{
DSPSignalNotification* rep = new DSPSignalNotification(*notif); // make a copy for the output queue
guiMessageQueue->push(rep);
}
}

delete message;
}
if (handleMessage(*message)) {
delete message;
}
}
}
4 changes: 1 addition & 3 deletions sdrbase/dsp/dspdevicesinkengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

#include "dsp/dsptypes.h"
#include "util/messagequeue.h"
#include "util/syncmessenger.h"
#include "util/incrementalvector.h"
#include "export.h"

Expand Down Expand Up @@ -86,7 +85,6 @@ class SDRBASE_API DSPDeviceSinkEngine : public QThread {
uint32_t m_uid; //!< unique ID

MessageQueue m_inputMessageQueue; //<! Input message queue. Post here.
SyncMessenger m_syncMessenger; //!< Used to process messages synchronously with the thread

State m_state;

Expand Down Expand Up @@ -119,11 +117,11 @@ class SDRBASE_API DSPDeviceSinkEngine : public QThread {
void setState(State state);

void handleSetSink(DeviceSampleSink* sink); //!< Manage sink setting
bool handleMessage(const Message& cmd);

private slots:
void handleData(); //!< Handle data when samples have to be written to the sample FIFO
void handleInputMessages(); //!< Handle input message queue
void handleSynchronousMessages(); //!< Handle synchronous messages with the thread

signals:
void stateChanged();
Expand Down
Loading

0 comments on commit d553834

Please sign in to comment.