Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[apps] Fixed srt-file-transit (issue #645) #658

Merged
merged 4 commits into from
Apr 23, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions apps/srt-file-transmit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ int parse_args(FileTransmitConfig &cfg, int argc, char** argv)
return 2;
}

cfg.chunk_size = stoul(Option<OutString>(params, "1316", o_chunk));
cfg.chunk_size = stoul(Option<OutString>(params, "1456", o_chunk));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I doubt this option makes any sense anymore.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is very unlikely to be used, but still is was in the previous file transmit version, so I'd better keep it, at least to minimize the work on these apps.

cfg.skip_flushing = Option<OutBool>(params, false, o_no_flush);
cfg.bw_report = stoi(Option<OutString>(params, "0", o_bwreport));
cfg.stats_report = stoi(Option<OutString>(params, "0", o_statsrep));
Expand Down Expand Up @@ -584,24 +584,23 @@ bool DoDownload(UriParser& us, string directory, string filename,
if (connected)
{
vector<char> buf(cfg.chunk_size);
int n;

if(!ofile.is_open())
if (!ofile.is_open())
{
const char * fn = id.empty() ? filename.c_str() : id.c_str();
directory.append("/");
directory.append(fn);
ofile.open(directory.c_str(), ios::out | ios::trunc | ios::binary);

if(!ofile.is_open())
if (!ofile.is_open())
{
cerr << "Error opening file [" << directory << "]" << endl;
goto exit;
}
cerr << "Writing output to [" << directory << "]" << endl;
}

n = src->Read(cfg.chunk_size, buf, out_stats);
int n = src->Read(cfg.chunk_size, buf, out_stats);
if (n == SRT_ERROR)
{
cerr << "Download: SRT error: " << srt_getlasterror_str() << endl;
Expand Down
14 changes: 13 additions & 1 deletion apps/srt-live-transmit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,10 +712,22 @@ int main(int argc, char** argv)
{
std::shared_ptr<bytevector> pdata(
new bytevector(cfg.chunk_size));
if (!src->Read(cfg.chunk_size, *pdata, out_stats) || (*pdata).empty())

const int res = src->Read(cfg.chunk_size, *pdata, out_stats);

if (res == SRT_ERROR && src->uri.type() == UriParser::SRT)
{
if (srt_getlasterror(NULL) == SRT_EASYNCRCV)
break;

throw std::runtime_error(string("error: recvmsg: ") + string(srt_getlasterror_str()));
}

if (res == 0 || pdata->empty())
{
break;
}

dataqueue.push_back(pdata);
receivedBytes += (*pdata).size();
}
Expand Down
12 changes: 7 additions & 5 deletions apps/srt-multiplex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ struct MediumPair

if (!initial_portion.empty())
{
tar->Write(initial_portion);
if ( tar->Broken() )
tar->Write(initial_portion.data(), initial_portion.size());
if (tar->Broken())
{
applog.Note() << "OUTPUT BROKEN for loop: " << name;
return;
Expand All @@ -121,7 +121,9 @@ struct MediumPair
ostringstream sout;
alarm(1);
bytevector data;
src->Read(chunk, data);
const int read_res = src->Read(chunk, data);


alarm(0);
if (alarm_state)
{
Expand All @@ -138,8 +140,8 @@ struct MediumPair
applog.Note() << sout.str();
break;
}
tar->Write(data);
if ( tar->Broken() )
tar->Write(data.data(), data.size());
if (tar->Broken())
{
sout << " OUTPUT broken";
applog.Note() << sout.str();
Expand Down
5 changes: 2 additions & 3 deletions apps/transmitbase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Location
class Source: public Location
{
public:
virtual bool Read(size_t chunk, bytevector& data, std::ostream &out_stats = std::cout) = 0;
virtual int Read(size_t chunk, bytevector& data, std::ostream &out_stats = std::cout) = 0;
virtual bool IsOpen() = 0;
virtual bool End() = 0;
static std::unique_ptr<Source> Create(const std::string& url);
Expand All @@ -65,8 +65,7 @@ class Source: public Location
class Target: public Location
{
public:
virtual int Write(const char* data, size_t size, std::ostream &out_stats = std::cout) = 0;
virtual bool Write(const bytevector& portion) = 0;
virtual int Write(const char* data, size_t size, std::ostream &out_stats = std::cout) = 0;
virtual bool IsOpen() = 0;
virtual bool Broken() = 0;
virtual void Close() {}
Expand Down
87 changes: 30 additions & 57 deletions apps/transmitmedia.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class FileSource: public Source
throw std::runtime_error(path + ": Can't open file for reading");
}

bool Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override
int Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override
{
if (data.size() < chunk)
data.resize(chunk);
Expand All @@ -67,12 +67,12 @@ class FileSource: public Source
if ( nread < data.size() )
data.resize(nread);

if ( data.empty() )
if (data.empty())
{
return false;
return 0;
}

return true;
return (int) nread;
}

bool IsOpen() override { return bool(ifile); }
Expand All @@ -86,16 +86,10 @@ class FileTarget: public Target

FileTarget(const string& path): ofile(path, ios::out | ios::trunc | ios::binary) {}

bool Write(const bytevector& data) override
{
ofile.write(data.data(), data.size());
return !(ofile.bad());
}

int Write(const char* data, size_t size, ostream &SRT_ATR_UNUSED = cout) override
{
ofile.write(data, size);
return !(ofile.bad()) ? size : 0;
return !(ofile.bad()) ? (int) size : 0;
}

bool IsOpen() override { return !!ofile; }
Expand Down Expand Up @@ -629,35 +623,28 @@ SrtSource::SrtSource(string host, int port, const map<string,string>& par)
hostport_copy = os.str();
}

bool SrtSource::Read(size_t chunk, bytevector& data, ostream &out_stats)
int SrtSource::Read(size_t chunk, bytevector& data, ostream &out_stats)
{
static unsigned long counter = 1;

if (data.size() < chunk)
data.resize(chunk);

bool ready = true;
int stat;
do
const int stat = srt_recvmsg(m_sock, data.data(), (int) chunk);
if (stat == SRT_ERROR)
{
stat = srt_recvmsg(m_sock, data.data(), chunk);
if ( stat == SRT_ERROR )
// EAGAIN for SRT READING
if (srt_getlasterror(NULL) == SRT_EASYNCRCV)
{
// EAGAIN for SRT READING
if ( srt_getlasterror(NULL) == SRT_EASYNCRCV )
{
data.clear();
return false;
}
Error(UDT::getlasterror(), "recvmsg");
data.clear();
}
return stat;
}

if ( stat == 0 )
{
throw ReadEOF(hostport_copy);
}
if (stat == 0)
{
return stat;
}
while (!ready);

chunk = size_t(stat);
if ( chunk < data.size() )
Expand All @@ -682,7 +669,7 @@ bool SrtSource::Read(size_t chunk, bytevector& data, ostream &out_stats)

++counter;

return true;
return stat;
}

int SrtTarget::ConfigurePre(SRTSOCKET sock)
Expand All @@ -707,7 +694,7 @@ int SrtTarget::Write(const char* data, size_t size, ostream &out_stats)
{
static unsigned long counter = 1;

int stat = srt_sendmsg2(m_sock, data, size, nullptr);
int stat = srt_sendmsg2(m_sock, data, (int) size, nullptr);
if (stat == SRT_ERROR)
{
return stat;
Expand Down Expand Up @@ -735,10 +722,6 @@ int SrtTarget::Write(const char* data, size_t size, ostream &out_stats)
return stat;
}

bool SrtTarget::Write(const bytevector& data)
{
return -1 != Write(data.data(), data.size());
}

SrtModel::SrtModel(string host, int port, map<string,string> par)
{
Expand Down Expand Up @@ -840,25 +823,25 @@ class ConsoleSource: public Source
#endif
}

bool Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override
int Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override
{
if (data.size() < chunk)
data.resize(chunk);

bool st = cin.read(data.data(), chunk).good();
chunk = cin.gcount();
if ( chunk == 0 && !st )
if (chunk == 0 && !st)
{
data.clear();
return false;
return 0;
}

if ( chunk < data.size() )
if (chunk < data.size())
data.resize(chunk);
if ( data.empty() )
return false;
if (data.empty())
return 0;

return true;
return (int) chunk;
}

bool IsOpen() override { return cin.good(); }
Expand All @@ -882,12 +865,7 @@ class ConsoleTarget: public Target
int Write(const char* data, size_t len, ostream &SRT_ATR_UNUSED = cout) override
{
cout.write(data, len);
return len;
}

bool Write(const bytevector& data) override
{
return 0 != Write(data.data(), data.size());
return (int) len;
}

bool IsOpen() override { return cout.good(); }
Expand Down Expand Up @@ -1109,27 +1087,27 @@ class UdpSource: public Source, public UdpCommon
eof = false;
}

bool Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override
int Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override
{
if (data.size() < chunk)
data.resize(chunk);

sockaddr_in sa;
socklen_t si = sizeof(sockaddr_in);
int stat = recvfrom(m_sock, data.data(), chunk, 0, (sockaddr*)&sa, &si);
if ( stat < 1 )
if (stat < 1)
{
if (SysError() != EWOULDBLOCK)
eof = true;
data.clear();
return false;
return stat;
}

chunk = size_t(stat);
if ( chunk < data.size() )
data.resize(chunk);

return true;
return stat;
}

bool IsOpen() override { return m_sock != -1; }
Expand Down Expand Up @@ -1158,11 +1136,6 @@ class UdpTarget: public Target, public UdpCommon
return stat;
}

bool Write(const bytevector& data) override
{
return -1 != Write(data.data(), data.size());
}

bool IsOpen() override { return m_sock != -1; }
bool Broken() override { return false; }

Expand Down
3 changes: 1 addition & 2 deletions apps/transmitmedia.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class SrtSource: public Source, public SrtCommon
// Do nothing - create just to prepare for use
}

bool Read(size_t chunk, bytevector& data, ostream& out_stats = cout) override;
int Read(size_t chunk, bytevector& data, ostream& out_stats = cout) override;

/*
In this form this isn't needed.
Expand Down Expand Up @@ -135,7 +135,6 @@ class SrtTarget: public Target, public SrtCommon

int ConfigurePre(SRTSOCKET sock) override;
int Write(const char* data, size_t size, ostream &out_stats = cout) override;
bool Write(const bytevector& data) override;
bool IsOpen() override { return IsUsable(); }
bool Broken() override { return IsBroken(); }
void Close() override { return SrtCommon::Close(); }
Expand Down