diff --git a/CMakeLists.txt b/CMakeLists.txt index 9facab690..975f797f0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -467,6 +467,16 @@ if ( ENABLE_CXX11 ) ) endif() + add_executable(sendfile + ${CMAKE_SOURCE_DIR}/apps/legacy/sendfile.cpp + ) + + add_executable(recvfile + ${CMAKE_SOURCE_DIR}/apps/legacy/recvfile.cpp + ) + target_link_libraries(sendfile ${TARGET_srt} ${DEPENDS_srt}) + target_link_libraries(recvfile ${TARGET_srt} ${DEPENDS_srt}) + # This is recommended by cmake, but it doesn't work anyway. # What is needed is that this below CMAKE_INSTALL_RPATH (yes, relative) # is added as is. diff --git a/apps/legacy/recvfile.cpp b/apps/legacy/recvfile.cpp new file mode 100644 index 000000000..a9fcbab61 --- /dev/null +++ b/apps/legacy/recvfile.cpp @@ -0,0 +1,117 @@ +#ifndef WIN32 + #include + #include +#else + #include + #include +#endif +#include +#include +#include +#include +#include + +using namespace std; + +int main(int argc, char* argv[]) +{ + if ((argc != 5) || (0 == atoi(argv[2]))) + { + cout << "usage: recvfile server_ip server_port remote_filename local_filename" << endl; + return -1; + } + + // use this function to initialize the UDT library + srt_startup(); + + srt_setloglevel(logging::LogLevel::debug); + + struct addrinfo hints, *peer; + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_flags = AI_PASSIVE; + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_DGRAM; + + SRTSOCKET fhandle = srt_socket(hints.ai_family, hints.ai_socktype, hints.ai_protocol); + // SRT requires that third argument is always SOCK_DGRAM. The Stream API is set by an option, + // although there's also lots of other options to be set, for which there's a convenience option, + // SRTO_TRANSTYPE. + SRT_TRANSTYPE tt = SRTT_FILE; + srt_setsockopt(fhandle, 0, SRTO_TRANSTYPE, &tt, sizeof tt); + + if (0 != getaddrinfo(argv[1], argv[2], &hints, &peer)) + { + cout << "incorrect server/peer address. " << argv[1] << ":" << argv[2] << endl; + return -1; + } + + // connect to the server, implict bind + if (SRT_ERROR == srt_connect(fhandle, peer->ai_addr, peer->ai_addrlen)) + { + cout << "connect: " << srt_getlasterror_str() << endl; + return -1; + } + + freeaddrinfo(peer); + + + // send name information of the requested file + int len = strlen(argv[3]); + + if (SRT_ERROR == srt_send(fhandle, (char*)&len, sizeof(int))) + { + cout << "send: " << srt_getlasterror_str() << endl; + return -1; + } + + if (SRT_ERROR == srt_send(fhandle, argv[3], len)) + { + cout << "send: " << srt_getlasterror_str() << endl; + return -1; + } + + // get size information + int64_t size; + + if (SRT_ERROR == srt_recv(fhandle, (char*)&size, sizeof(int64_t))) + { + cout << "send: " << srt_getlasterror_str() << endl; + return -1; + } + + if (size < 0) + { + cout << "no such file " << argv[3] << " on the server\n"; + return -1; + } + + // receive the file + //fstream ofs(argv[4], ios::out | ios::binary | ios::trunc); + int64_t recvsize; + int64_t offset = 0; + + SRT_TRACEBSTATS trace; + srt_bstats(fhandle, &trace, true); + + if (SRT_ERROR == (recvsize = srt_recvfile(fhandle, argv[4], &offset, size, SRT_DEFAULT_RECVFILE_BLOCK))) + { + cout << "recvfile: " << srt_getlasterror_str() << endl; + return -1; + } + + srt_bstats(fhandle, &trace, true); + + cout << "speed = " << trace.mbpsRecvRate << "Mbits/sec" << endl; + int losspercent = 100*trace.pktRcvLossTotal/trace.pktRecv; + cout << "loss = " << trace.pktRcvLossTotal << "pkt (" << losspercent << "%)\n"; + + srt_close(fhandle); + + //ofs.close(); + + // use this function to release the UDT library + srt_cleanup(); + + return 0; +} diff --git a/apps/legacy/sendfile.cpp b/apps/legacy/sendfile.cpp new file mode 100644 index 000000000..e396be86c --- /dev/null +++ b/apps/legacy/sendfile.cpp @@ -0,0 +1,185 @@ +#ifndef WIN32 + #include + #include +#else + #include + #include +#endif +#include +#include +#include +#include +#include + +using namespace std; + +#ifndef WIN32 +void* sendfile(void*); +#else +DWORD WINAPI sendfile(LPVOID); +#endif + +int main(int argc, char* argv[]) +{ + //usage: sendfile [server_port] + if ((2 < argc) || ((2 == argc) && (0 == atoi(argv[1])))) + { + cout << "usage: sendfile [server_port]" << endl; + return 0; + } + + // use this function to initialize the UDT library + srt_startup(); + + srt_setloglevel(logging::LogLevel::debug); + + addrinfo hints; + addrinfo* res; + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_flags = AI_PASSIVE; + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_DGRAM; + + string service("9000"); + if (2 == argc) + service = argv[1]; + + if (0 != getaddrinfo(NULL, service.c_str(), &hints, &res)) + { + cout << "illegal port number or port is busy.\n" << endl; + return 0; + } + + SRTSOCKET serv = srt_socket(res->ai_family, res->ai_socktype, res->ai_protocol); + + // SRT requires that third argument is always SOCK_DGRAM. The Stream API is set by an option, + // although there's also lots of other options to be set, for which there's a convenience option, + // SRTO_TRANSTYPE. + SRT_TRANSTYPE tt = SRTT_FILE; + srt_setsockopt(serv, 0, SRTO_TRANSTYPE, &tt, sizeof tt); + + // Windows UDP issue + // For better performance, modify HKLM\System\CurrentControlSet\Services\Afd\Parameters\FastSendDatagramThreshold +#ifdef WIN32 + int mss = 1052; + srt_setsockopt(serv, 0, SRTO_MSS, &mss, sizeof(int)); +#endif + + //int64_t maxbw = 5000000; + //srt_setsockopt(serv, 0, SRTO_MAXBW, &maxbw, sizeof maxbw); + + if (SRT_ERROR == srt_bind(serv, res->ai_addr, res->ai_addrlen)) + { + cout << "bind: " << srt_getlasterror_str() << endl; + return 0; + } + + freeaddrinfo(res); + + cout << "server is ready at port: " << service << endl; + + srt_listen(serv, 10); + + sockaddr_storage clientaddr; + int addrlen = sizeof(clientaddr); + + SRTSOCKET fhandle; + + while (true) + { + if (SRT_INVALID_SOCK == (fhandle = srt_accept(serv, (sockaddr*)&clientaddr, &addrlen))) + { + cout << "accept: " << srt_getlasterror_str() << endl; + return 0; + } + + char clienthost[NI_MAXHOST]; + char clientservice[NI_MAXSERV]; + getnameinfo((sockaddr *)&clientaddr, addrlen, clienthost, sizeof(clienthost), clientservice, sizeof(clientservice), NI_NUMERICHOST|NI_NUMERICSERV); + cout << "new connection: " << clienthost << ":" << clientservice << endl; + + #ifndef WIN32 + pthread_t filethread; + pthread_create(&filethread, NULL, sendfile, new SRTSOCKET(fhandle)); + pthread_detach(filethread); + #else + CreateThread(NULL, 0, sendfile, new SRTSOCKET(fhandle), 0, NULL); + #endif + } + + srt_close(serv); + + // use this function to release the UDT library + srt_cleanup(); + + return 0; +} + +#ifndef WIN32 +void* sendfile(void* usocket) +#else +DWORD WINAPI sendfile(LPVOID usocket) +#endif +{ + SRTSOCKET fhandle = *(SRTSOCKET*)usocket; + delete (SRTSOCKET*)usocket; + + // aquiring file name information from client + char file[1024]; + int len; + + if (SRT_ERROR == srt_recv(fhandle, (char*)&len, sizeof(int))) + { + cout << "recv: " << srt_getlasterror_str() << endl; + return 0; + } + + if (SRT_ERROR == srt_recv(fhandle, file, len)) + { + cout << "recv: " << srt_getlasterror_str() << endl; + return 0; + } + file[len] = '\0'; + + // open the file (only to check the size) + fstream ifs(file, ios::in | ios::binary); + + ifs.seekg(0, ios::end); + int64_t size = ifs.tellg(); + //ifs.seekg(0, ios::beg); + ifs.close(); + + // send file size information + if (SRT_ERROR == srt_send(fhandle, (char*)&size, sizeof(int64_t))) + { + cout << "send: " << srt_getlasterror_str() << endl; + return 0; + } + + SRT_TRACEBSTATS trace; + srt_bstats(fhandle, &trace, true); + + // send the file + int64_t offset = 0; + if (SRT_ERROR == srt_sendfile(fhandle, file, &offset, size, SRT_DEFAULT_SENDFILE_BLOCK)) + { + cout << "sendfile: " << srt_getlasterror_str() << endl; + return 0; + } + + srt_bstats(fhandle, &trace, true); + cout << "speed = " << trace.mbpsSendRate << "Mbits/sec" << endl; + int losspercent = 100*trace.pktSndLossTotal/trace.pktSent; + cout << "loss = " << trace.pktSndLossTotal << "pkt (" << losspercent << "%)\n"; + + srt_close(fhandle); + + //ifs.close(); + + #ifndef WIN32 + return NULL; + #else + return 0; + #endif +} diff --git a/apps/srt-file-transmit.cpp b/apps/srt-file-transmit.cpp index 327eba7dc..6020d8881 100644 --- a/apps/srt-file-transmit.cpp +++ b/apps/srt-file-transmit.cpp @@ -213,7 +213,7 @@ bool DoUpload(UriParser& ut, string path, string filename) size_t shift = 0; while (n > 0) { - int st = srt_send(ss, buf.data()+shift, n, 0); + int st = srt_send(ss, buf.data()+shift, n); Verb() << "Upload: " << n << " --> " << st << (!shift ? string() : "+" + Sprint(shift)); if (st == SRT_ERROR) { @@ -311,7 +311,7 @@ bool DoDownload(UriParser& us, string directory, string filename) for (;;) { - int n = srt_recv(ss, buf.data(), ::g_buffer_size, 0); + int n = srt_recv(ss, buf.data(), ::g_buffer_size); if (n == SRT_ERROR) { cerr << "Download: SRT error: " << srt_getlasterror_str() << endl; diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 4a5309501..a81c8630d 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -252,16 +252,8 @@ int CUDTUnited::cleanup() return 0; } -SRTSOCKET CUDTUnited::newSocket(int af, int type) -{ - // XXX Type will be soon removed from here and moved - // to the depreacted API 'srt_socket()'. The new function - // 'srt_socket_new' will simply require only the 'af' parameter. - // SRT has actually never been supporting "socket type" from UDT and the - // file transfer will be implemented using a different internal - // setup. Possibly a configuration object might be a good idea. - if (type != SOCK_DGRAM) - throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); +SRTSOCKET CUDTUnited::newSocket(int af, int) +{ CUDTSocket* ns = NULL; @@ -805,7 +797,7 @@ int CUDTUnited::connect(const SRTSOCKET u, const sockaddr* name, int namelen, in { if (!s->m_pUDT->m_bRendezvous) { - s->m_pUDT->open(); + s->m_pUDT->open(); // XXX here use the AF_* family value from 'name' updateMux(s); // <<---- updateMux // -> C(Snd|Rcv)Queue::init // -> pthread_create(...C(Snd|Rcv)Queue::worker...) @@ -1875,14 +1867,14 @@ int CUDT::cleanup() return s_UDTUnited.cleanup(); } -SRTSOCKET CUDT::socket(int af, int type, int) +SRTSOCKET CUDT::socket(int af, int, int) { if (!s_UDTUnited.m_bGCStatus) s_UDTUnited.startup(); try { - return s_UDTUnited.newSocket(af, type); + return s_UDTUnited.newSocket(af, 0); } catch (CUDTException& e) { @@ -2220,22 +2212,28 @@ int CUDT::sendmsg( } } -int CUDT::recvmsg(SRTSOCKET u, char* buf, int len) +int CUDT::sendmsg2( + SRTSOCKET u, const char* buf, int len, ref_t r_m) { try { CUDT* udt = s_UDTUnited.lookup(u); - return udt->recvmsg(buf, len); + return udt->sendmsg2(buf, len, r_m); } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } + catch (bad_alloc&) + { + s_UDTUnited.setError(new CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0)); + return ERROR; + } catch (std::exception& ee) { LOGC(mglog.Fatal) - << "recvmsg: UNEXPECTED EXCEPTION: " + << "sendmsg: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what(); s_UDTUnited.setError(new CUDTException(MJ_UNKNOWN, MN_NONE, 0)); return ERROR; @@ -2264,6 +2262,27 @@ int CUDT::recvmsg(SRTSOCKET u, char* buf, int len, uint64_t& srctime) } } +int CUDT::recvmsg2(SRTSOCKET u, char* buf, int len, ref_t r_m) +{ + try + { + CUDT* udt = s_UDTUnited.lookup(u); + return udt->recvmsg2(buf, len, r_m); + } + catch (CUDTException e) + { + s_UDTUnited.setError(new CUDTException(e)); + return ERROR; + } + catch (std::exception& ee) + { + LOGC(mglog.Fatal) + << "recvmsg: UNEXPECTED EXCEPTION: " + << typeid(ee).name() << ": " << ee.what(); + s_UDTUnited.setError(new CUDTException(MJ_UNKNOWN, MN_NONE, 0)); + return ERROR; + } +} int64_t CUDT::sendfile( SRTSOCKET u, fstream& ifs, int64_t& offset, int64_t size, int block) { @@ -2792,18 +2811,11 @@ int sendmsg( return CUDT::sendmsg(u, buf, len, ttl, inorder, srctime); } -// This version is available ADDITIONALLY to that without srctime int recvmsg(SRTSOCKET u, char* buf, int len, uint64_t& srctime) { return CUDT::recvmsg(u, buf, len, srctime); } - -int recvmsg(SRTSOCKET u, char* buf, int len) -{ - return CUDT::recvmsg(u, buf, len); -} - int64_t sendfile( SRTSOCKET u, fstream& ifs, @@ -3106,11 +3118,11 @@ void setlogflags(int flags) srt_logger_config.flags = flags; } -UDT_API bool setstreamid(SRTSOCKET u, const std::string& sid) +SRT_API bool setstreamid(SRTSOCKET u, const std::string& sid) { return CUDT::setstreamid(u, sid); } -UDT_API std::string getstreamid(SRTSOCKET u) +SRT_API std::string getstreamid(SRTSOCKET u) { return CUDT::getstreamid(u); } diff --git a/srtcore/api.h b/srtcore/api.h index f7c3ddaa8..ca2f2fc0b 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -152,10 +152,10 @@ friend class CRendezvousQueue; /// Create a new UDT socket. /// @param [in] af IP version, IPv4 (AF_INET) or IPv6 (AF_INET6). - /// @param [in] type socket type, SOCK_STREAM or SOCK_DGRAM + /// @param [in] type (ignored) /// @return The new UDT socket ID, or INVALID_SOCK. - SRTSOCKET newSocket(int af, int type); + SRTSOCKET newSocket(int af, int ); /// Create a new UDT connection. /// @param [in] listen the listening UDT socket; diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index 0d215a712..41358aff4 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -150,7 +150,7 @@ CSndBuffer::~CSndBuffer() pthread_mutex_destroy(&m_BufLock); } -void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order, uint64_t srctime) +void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order, uint64_t srctime, ref_t r_msgno) { int size = len / m_iMSS; if ((len % m_iMSS) != 0) @@ -173,6 +173,7 @@ void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order, uint6 << (inorder ? "" : " NOT") << " in order"; Block* s = m_pLastBlock; + r_msgno = m_iNextMsgNo; for (int i = 0; i < size; ++ i) { int pktlen = len - i * m_iMSS; @@ -940,7 +941,7 @@ bool CRcvBuffer::getRcvFirstMsg(ref_t tsbpdtime, ref_t passack, // - tsbpdtime: real time when the packet is ready to play (whether ready to play or not) // - passack: false (the report concerns a packet with an exactly next sequence) // - skipseqno == -1: no packets to skip towards the first RTP - // - ppkt: that exactly packet is reported (for debugging purposes) + // - ppkt: that exactly packet that is reported (for debugging purposes) // - @return: whether the reported packet is ready to play /* Check the acknowledged packets */ @@ -1493,23 +1494,25 @@ void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& mut int CRcvBuffer::readMsg(char* data, int len) { - uint64_t tsbpdtime; - return readMsg(data, len, tsbpdtime); + SRT_MSGCTRL dummy = srt_msgctrl_default; + return readMsg(data, len, Ref(dummy)); } -int CRcvBuffer::readMsg(char* data, int len, uint64_t& tsbpdtime) +int CRcvBuffer::readMsg(char* data, int len, ref_t r_msgctl) { + SRT_MSGCTRL& msgctl = r_msgctl; int p, q; bool passack; bool empty = true; + uint64_t& r_tsbpdtime = msgctl.srctime; if (m_bTsbPdMode) { passack = false; int seq = 0; - if (getRcvReadyMsg(Ref(tsbpdtime), Ref(seq))) + if (getRcvReadyMsg(Ref(r_tsbpdtime), Ref(seq))) { empty = false; @@ -1520,12 +1523,12 @@ int CRcvBuffer::readMsg(char* data, int len, uint64_t& tsbpdtime) #ifdef SRT_DEBUG_TSBPD_OUTJITTER uint64_t now = CTimer::getTime(); - if ((now - tsbpdtime)/10 < 10) - m_ulPdHisto[0][(now - tsbpdtime)/10]++; - else if ((now - tsbpdtime)/100 < 10) - m_ulPdHisto[1][(now - tsbpdtime)/100]++; - else if ((now - tsbpdtime)/1000 < 10) - m_ulPdHisto[2][(now - tsbpdtime)/1000]++; + if ((now - r_tsbpdtime)/10 < 10) + m_ulPdHisto[0][(now - r_tsbpdtime)/10]++; + else if ((now - r_tsbpdtime)/100 < 10) + m_ulPdHisto[1][(now - r_tsbpdtime)/100]++; + else if ((now - r_tsbpdtime)/1000 < 10) + m_ulPdHisto[2][(now - r_tsbpdtime)/1000]++; else m_ulPdHisto[3][1]++; #endif /* SRT_DEBUG_TSBPD_OUTJITTER */ @@ -1533,8 +1536,8 @@ int CRcvBuffer::readMsg(char* data, int len, uint64_t& tsbpdtime) } else { - tsbpdtime = 0; - if (scanMsg(p, q, passack)) + r_tsbpdtime = 0; + if (scanMsg(Ref(p), Ref(q), Ref(passack))) empty = false; } @@ -1542,6 +1545,15 @@ int CRcvBuffer::readMsg(char* data, int len, uint64_t& tsbpdtime) if (empty) return 0; + // This should happen just once. By 'empty' condition + // we have a guarantee that m_pUnit[p] exists and is valid. + CPacket& pkt1 = m_pUnit[p]->m_Packet; + + // This returns the sequence number and message number to + // the API caller. + msgctl.pktseq = pkt1.getSeqNo(); + msgctl.msgno = pkt1.getMsgSeq(); + int rs = len; while (p != (q + 1) % m_iSize) { @@ -1603,7 +1615,7 @@ int CRcvBuffer::readMsg(char* data, int len, uint64_t& tsbpdtime) } -bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack) +bool CRcvBuffer::scanMsg(ref_t p, ref_t q, ref_t passack) { // empty buffer if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0)) @@ -1614,7 +1626,8 @@ bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack) //skip all bad msgs at the beginning while (m_iStartPos != m_iLastAckPos) { - if (NULL == m_pUnit[m_iStartPos]) + // Roll up to the first valid unit + if (!m_pUnit[m_iStartPos]) { if (++ m_iStartPos == m_iSize) m_iStartPos = 0; @@ -1704,7 +1717,7 @@ bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack) for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i < n; ++ i) { - if ((NULL != m_pUnit[q]) && (CUnit::GOOD == m_pUnit[q]->m_iFlag)) + if (m_pUnit[q] && m_pUnit[q]->m_iFlag == CUnit::GOOD) { // Equivalent pseudocode: // PacketBoundary bound = m_pUnit[q]->m_Packet.getMsgBoundary(); diff --git a/srtcore/buffer.h b/srtcore/buffer.h index 612dd13af..4b56a2342 100644 --- a/srtcore/buffer.h +++ b/srtcore/buffer.h @@ -89,7 +89,7 @@ class CSndBuffer /// @param [in] ttl time to live in milliseconds /// @param [in] order if the block should be delivered in order, for DGRAM only - void addBuffer(const char* data, int len, int ttl = -1, bool order = false, uint64_t srctime = 0); + void addBuffer(const char* data, int len, int ttl, bool order, uint64_t srctime, ref_t r_msgno); /// Read a block of data from file and insert it into the sending list. /// @param [in] ifs input file stream. @@ -311,7 +311,7 @@ class CRcvBuffer /// @param [out] tsbpdtime localtime-based (uSec) packet time stamp including buffering delay /// @return actuall size of data read. - int readMsg(char* data, int len, uint64_t& tsbpdtime); + int readMsg(char* data, int len, ref_t mctrl); /// Query how many messages are available now. /// @param [out] tsbpdtime localtime-based (uSec) packet time stamp including buffering delay @@ -356,7 +356,7 @@ class CRcvBuffer /// Add packet timestamp for drift caclculation and compensation /// @param [in] timestamp packet time stamp - void addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& mutex_to_lock); + void addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& lock); #ifdef SRT_DEBUG_TSBPD_DRIFT void printDriftHistogram(int64_t iDrift); @@ -413,7 +413,7 @@ class CRcvBuffer void countBytes(int pkts, int bytes, bool acked = false); private: - bool scanMsg(int& start, int& end, bool& passack); + bool scanMsg(ref_t start, ref_t end, ref_t passack); private: CUnit** m_pUnit; // pointer to the protocol buffer diff --git a/srtcore/common.h b/srtcore/common.h index c2d6723e6..0658d1439 100644 --- a/srtcore/common.h +++ b/srtcore/common.h @@ -162,10 +162,6 @@ std::string ConnectStatusStr(EConnectStatus est); const int64_t BW_INFINITE = 30000000/8; //Infinite=> 30Mbps -const int DEFAULT_LIVE_LATENCY = 120; // (mSec) - -const size_t DEFAULT_MPEG_UNIT_SIZE = 188; -const size_t DEFAULT_LIVE_PAYLOAD_SIZE = 7*DEFAULT_MPEG_UNIT_SIZE; // 1316 enum ETransmissionEvent diff --git a/srtcore/core.cpp b/srtcore/core.cpp index b5c72c6fe..a238763ef 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -243,12 +243,12 @@ CUDT::CUDT() m_bDataSender = false; //Sender only if true: does not recv data m_bTwoWayData = false; m_bOPT_TsbPd = true; //Enable TsbPd on sender - m_iOPT_TsbPdDelay = DEFAULT_LIVE_LATENCY; //Receiver TsbPd delay (mSec) + m_iOPT_TsbPdDelay = SRT_LIVE_DEF_LATENCY_MS; m_iOPT_PeerTsbPdDelay = 0; //Peer's TsbPd delay as receiver (here is its minimum value, if used) m_bOPT_TLPktDrop = true; m_bTLPktDrop = true; //Too-late Packet Drop m_bMessageAPI = true; - m_zOPT_ExpPayloadSize = DEFAULT_LIVE_PAYLOAD_SIZE; + m_zOPT_ExpPayloadSize = SRT_LIVE_DEF_PLSIZE; //Runtime m_bRcvNakReport = true; //Receiver's Periodic NAK Reports m_llInputBW = 0; // Application provided input bandwidth (internal input rate sampling == 0) @@ -675,7 +675,7 @@ void CUDT::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen) if (m_bConnected) throw CUDTException(MJ_NOTSUP, MN_ISCONNECTED, 0); - if (*(int*)optval > int(CPacket::SRT_MAX_PAYLOAD_SIZE)) + if (*(int*)optval > SRT_LIVE_MAX_PLSIZE) { LOGC(mglog.Error) << "SRTO_PAYLOADSIZE: value exceeds SRT_LIVE_MAX_PLSIZE, maximum payload per MTU."; throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); @@ -700,12 +700,12 @@ void CUDT::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen) // - smoother: live // - extraction method: message (reading call extracts one message) m_bOPT_TsbPd = true; - m_iOPT_TsbPdDelay = DEFAULT_LIVE_LATENCY; + m_iOPT_TsbPdDelay = SRT_LIVE_DEF_LATENCY_MS; m_iOPT_PeerTsbPdDelay = 0; m_bOPT_TLPktDrop = true; m_bMessageAPI = true; m_bRcvNakReport = true; - m_zOPT_ExpPayloadSize = DEFAULT_LIVE_PAYLOAD_SIZE; + m_zOPT_ExpPayloadSize = SRT_LIVE_DEF_PLSIZE; m_Smoother.select("live"); break; @@ -756,6 +756,11 @@ void CUDT::getOpt(SRT_SOCKOPT optName, void* optval, int& optlen) optlen = sizeof(bool); break; + case SRTO_ISN: + *(int*)optval = m_iISN; + optlen = sizeof(int); + break; + case SRTO_FC: *(int*)optval = m_iFlightFlagSize; optlen = sizeof(int); @@ -4285,6 +4290,10 @@ void CUDT::close() m_bOpened = false; } +/* + Old, mostly original UDT based version of CUDT::send. + Left for historical reasons. + int CUDT::send(const char* data, int len) { // throw an exception if not connected @@ -4311,7 +4320,7 @@ int CUDT::send(const char* data, int len) m_ullLastRspAckTime_tk = currtime_tk; m_iReXmitCount = 1; } - if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) + if (sndBuffersLeft() <= 0) { if (!m_bSynSending) throw CUDTException(MJ_AGAIN, MN_WRAVAIL, 0); @@ -4322,7 +4331,7 @@ int CUDT::send(const char* data, int len) CGuard sendblock_lock(m_SendBlockLock); if (m_iSndTimeOut < 0) { - while (stillConnected() && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth) + while (stillConnected() && (sndBuffersLeft() <= 0) && m_bPeerHealth) pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); } else @@ -4333,7 +4342,7 @@ int CUDT::send(const char* data, int len) locktime.tv_sec = exptime / 1000000; locktime.tv_nsec = (exptime % 1000000) * 1000; - while (stillConnected() && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth && (CTimer::getTime() < exptime)) + while (stillConnected() && (sndBuffersLeft() <= 0) && m_bPeerHealth && (CTimer::getTime() < exptime)) pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime); } } @@ -4351,7 +4360,7 @@ int CUDT::send(const char* data, int len) } } - if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) + if (sndBuffersLeft() <= 0) { if (m_iSndTimeOut >= 0) throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0); @@ -4359,7 +4368,7 @@ int CUDT::send(const char* data, int len) return 0; } - int size = min(len, sndSpaceLeft()); + int size = min(len, sndBuffersLeft() * m_iMaxSRTPayloadSize); // record total time used for sending if (m_pSndBuffer->getCurrBufSize() == 0) @@ -4371,7 +4380,7 @@ int CUDT::send(const char* data, int len) // insert this socket to snd list if it is not on the list yet m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); - if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) + if (sndBuffersLeft() <= 0) { // write is not available any more s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false); @@ -4379,16 +4388,18 @@ int CUDT::send(const char* data, int len) return size; } +*/ -int CUDT::recv(char* data, int len) +int CUDT::receiveBuffer(char* data, int len) { - // throw an exception if not connected - if (!m_bConnected || !m_Smoother.ready()) - throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); + if (!m_Smoother->checkTransArgs(Smoother::STA_BUFFER, Smoother::STAD_RECV, data, len, -1, false)) + throw CUDTException(MJ_NOTSUP, MN_INVALBUFFERAPI, 0); + + CGuard recvguard(m_RecvLock); if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady()) { - if (!m_bMessageAPI && m_bShutdown) + if (m_bShutdown) { // For stream API, return 0 as a sign of EOF for transmission. // That's a bit controversial because theoretically the @@ -4414,14 +4425,6 @@ int CUDT::recv(char* data, int len) throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); } - if (len <= 0) - return 0; - - if (!m_Smoother->checkTransArgs(Smoother::STA_BUFFER, Smoother::STAD_RECV, data, len, -1, false)) - throw CUDTException(MJ_NOTSUP, MN_INVALBUFFERAPI, 0); - - - CGuard recvguard(m_RecvLock); if (!m_pRcvBuffer->isRcvDataReady()) { @@ -4501,8 +4504,84 @@ int CUDT::recv(char* data, int len) return res; } + +void CUDT::checkNeedDrop(ref_t bCongestion) +{ + if (!m_bPeerTLPktDrop) + return; + + if (!m_bMessageAPI) + { + LOGC(dlog.Error) << "The SRTO_TLPKTDROP flag can only be used with message API."; + throw CUDTException(MJ_NOTSUP, MN_INVALBUFFERAPI, 0); + } + + int bytes, timespan_ms; + // (returns buffer size in buffer units, ignored) + m_pSndBuffer->getCurrBufSize(Ref(bytes), Ref(timespan_ms)); + + // high threshold (msec) at tsbpd_delay plus sender/receiver reaction time (2 * 10ms) + // Minimum value must accomodate an I-Frame (~8 x average frame size) + // >>need picture rate or app to set min treshold + // >>using 1 sec for worse case 1 frame using all bit budget. + // picture rate would be useful in auto SRT setting for min latency + // XXX Make SRT_TLPKTDROP_MINTHRESHOLD_MS option-configurable + int threshold_ms = std::max(m_iPeerTsbPdDelay_ms, +SRT_TLPKTDROP_MINTHRESHOLD_MS) + (2*COMM_SYN_INTERVAL_US/1000); + if (timespan_ms > threshold_ms) + { + // protect packet retransmission + CGuard::enterCS(m_AckLock); + int dbytes; + int dpkts = m_pSndBuffer->dropLateData(dbytes, CTimer::getTime() - (threshold_ms * 1000)); + if (dpkts > 0) + { + m_iTraceSndDrop += dpkts; + m_iSndDropTotal += dpkts; + m_ullTraceSndBytesDrop += dbytes; + m_ullSndBytesDropTotal += dbytes; + + int32_t realack = m_iSndLastDataAck; // needed for log only + int32_t fakeack = CSeqNo::incseq(m_iSndLastDataAck, dpkts); + + m_iSndLastAck = fakeack; + m_iSndLastDataAck = fakeack; + + int32_t minlastack = CSeqNo::decseq(m_iSndLastDataAck); + m_pSndLossList->remove(minlastack); + /* If we dropped packets not yet sent, advance current position */ + // THIS MEANS: m_iSndCurrSeqNo = MAX(m_iSndCurrSeqNo, m_iSndLastDataAck-1) + if (CSeqNo::seqcmp(m_iSndCurrSeqNo, minlastack) < 0) + { + m_iSndCurrSeqNo = minlastack; + } + LOGC(dlog.Debug).form("drop,now %lluus,%d-%d seqs,%d pkts,%d bytes,%d ms", + (unsigned long long)CTimer::getTime(), + realack, m_iSndCurrSeqNo, + dpkts, dbytes, timespan_ms); + } + bCongestion = true; + CGuard::leaveCS(m_AckLock); + } + else if (timespan_ms > (m_iPeerTsbPdDelay_ms/2)) + { + LOGC(mglog.Debug).form("cong, NOW: %lluus, BYTES %d, TMSPAN %dms", (unsigned long long)CTimer::getTime(), bytes, timespan_ms); + bCongestion = true; + } +} + + int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder, uint64_t srctime) { + SRT_MSGCTRL mctrl = srt_msgctrl_default; + mctrl.msgttl = msttl; + mctrl.inorder = inorder; + mctrl.srctime = srctime; + return this->sendmsg2(data, len, Ref(mctrl)); +} + +int CUDT::sendmsg2(const char* data, int len, ref_t r_mctrl) +{ + SRT_MSGCTRL& mctrl = r_mctrl; bool bCongestion = false; // throw an exception if not connected @@ -4512,15 +4591,54 @@ int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder, uint64_t s throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); if (len <= 0) + { + LOGC(dlog.Error) << "INVALID: Data size for sending declared with length: " << len; return 0; + } + + int msttl = mctrl.msgttl; + bool inorder = mctrl.inorder; // Sendmsg isn't restricted to the smoother type, however the smoother // may want to have something to say here. - if (!m_Smoother->checkTransArgs(Smoother::STA_MESSAGE, Smoother::STAD_SEND, data, len, msttl, inorder)) - throw CUDTException(MJ_NOTSUP, MN_INVALMSGAPI, 0); + // NOTE: Smoother is also allowed to throw CUDTException() by itself! + { + Smoother::TransAPI api = Smoother::STA_MESSAGE; + CodeMinor mn = MN_INVALMSGAPI; + if ( !m_bMessageAPI ) + { + api = Smoother::STA_BUFFER; + mn = MN_INVALBUFFERAPI; + } + + if (!m_Smoother->checkTransArgs(api, Smoother::STAD_SEND, data, len, msttl, inorder)) + throw CUDTException(MJ_NOTSUP, mn, 0); + } - if (len > int(m_iSndBufSize * m_iMaxSRTPayloadSize)) + // NOTE: the length restrictions differ in STREAM API and in MESSAGE API: + + // - STREAM API: + // At least 1 byte free sending buffer space is needed + // (in practice, one unit buffer of 1456 bytes). + // This function will send as much as possible, and return + // how much was actually sent. + + // - MESSAGE API: + // At least so many bytes free in the sending buffer is needed, + // as the length of the data, otherwise this function will block + // or return MJ_AGAIN until this condition is satisfied. The EXACTLY + // such number of data will be then written out, and this function + // will effectively return either -1 (error) or the value of 'len'. + // This call will be also rejected from upside when trying to send + // out a message of lengh that exceeds the total size of sending + // buffer (configurable by SRTO_SNDBUF). + + if (m_bMessageAPI && len > int(m_iSndBufSize * m_iMaxSRTPayloadSize)) + { + LOGC(dlog.Error) << "Message length (" << len << ") exceeds the size of sending buffer: " + << (m_iSndBufSize * m_iMaxSRTPayloadSize) << ". Use SRTO_SNDBUF if needed."; throw CUDTException(MJ_NOTSUP, MN_XSIZE, 0); + } CGuard sendguard(m_SendLock); @@ -4534,61 +4652,17 @@ int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder, uint64_t s m_iReXmitCount = 1; } - if (m_bPeerTLPktDrop) - { - int bytes, timespan; - m_pSndBuffer->getCurrBufSize(Ref(bytes), Ref(timespan)); + checkNeedDrop(Ref(bCongestion)); - // high threshold (msec) at tsbpd_delay plus sender/receiver reaction time (2 * 10ms) - // Minimum value must accomodate an I-Frame (~8 x average frame size) - // >>need picture rate or app to set min treshold - // >>using 1 sec for worse case 1 frame using all bit budget. - // picture rate would be useful in auto SRT setting for min latency - // XXX Make SRT_TLPKTDROP_MINTHRESHOLD_MS option-configurable - int threshold_ms = std::max(m_iPeerTsbPdDelay_ms, +SRT_TLPKTDROP_MINTHRESHOLD_MS) + (2*COMM_SYN_INTERVAL_US/1000); - if (timespan > threshold_ms) - { - // protect packet retransmission - CGuard::enterCS(m_AckLock); - int dbytes; - int dpkts = m_pSndBuffer->dropLateData(dbytes, CTimer::getTime() - (threshold_ms * 1000)); - if (dpkts > 0) - { - m_iTraceSndDrop += dpkts; - m_iSndDropTotal += dpkts; - m_ullTraceSndBytesDrop += dbytes; - m_ullSndBytesDropTotal += dbytes; - - int32_t realack = m_iSndLastDataAck; // needed for log only - int32_t fakeack = CSeqNo::incseq(m_iSndLastDataAck, dpkts); - - m_iSndLastAck = fakeack; - m_iSndLastDataAck = fakeack; - - int32_t minlastack = CSeqNo::decseq(m_iSndLastDataAck); - m_pSndLossList->remove(minlastack); - /* If we dropped packets not yet sent, advance current position */ - // THIS MEANS: m_iSndCurrSeqNo = MAX(m_iSndCurrSeqNo, m_iSndLastDataAck-1) - if (CSeqNo::seqcmp(m_iSndCurrSeqNo, minlastack) < 0) - { - m_iSndCurrSeqNo = minlastack; - } - LOGC(dlog.Debug).form("drop,now %lluus,%d-%d seqs,%d pkts,%d bytes,%d ms", - (unsigned long long)CTimer::getTime(), - realack, m_iSndCurrSeqNo, - dpkts, dbytes, timespan); - } - bCongestion = true; - CGuard::leaveCS(m_AckLock); - } - else if (timespan > (m_iPeerTsbPdDelay_ms/2)) - { - LOGC(mglog.Debug).form("cong, NOW: %lluus, BYTES %d, TMSPAN %dms", (unsigned long long)CTimer::getTime(), bytes, timespan); - bCongestion = true; - } + int minlen = 1; // Minimum sender buffer space required for STREAM API + if (m_bMessageAPI) + { + // For MESSAGE API the minimum outgoing buffer space required is + // the size that can carry over the whole message as passed here. + minlen = (len+m_iMaxSRTPayloadSize-1)/m_iMaxSRTPayloadSize; } - if (len > sndSpaceLeft()) + if (sndBuffersLeft() < minlen) { //>>We should not get here if SRT_ENABLE_TLPKTDROP // XXX Check if this needs to be removed, or put to an 'else' condition for m_bTLPktDrop. @@ -4602,7 +4676,9 @@ int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder, uint64_t s if (m_iSndTimeOut < 0) { - while (stillConnected() && len > sndSpaceLeft()) + while (stillConnected() + && sndBuffersLeft() < minlen + && m_bPeerHealth) pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); } else @@ -4613,7 +4689,10 @@ int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder, uint64_t s locktime.tv_sec = exptime / 1000000; locktime.tv_nsec = (exptime % 1000000) * 1000; - while (stillConnected() && len > sndSpaceLeft() && exptime > CTimer::getTime()) + while (stillConnected() + && sndBuffersLeft() < minlen + && m_bPeerHealth + && exptime > CTimer::getTime()) pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime); } } @@ -4623,6 +4702,11 @@ int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder, uint64_t s throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); else if (!m_bConnected) throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); + else if (!m_bPeerHealth) + { + m_bPeerHealth = true; + throw CUDTException(MJ_PEERERROR); + } } /* * The code below is to return ETIMEOUT when blocking mode could not get free buffer in time. @@ -4630,14 +4714,27 @@ int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder, uint64_t s * we test twice if this code is outside the else section. * This fix move it in the else (blocking-mode) section */ - if (len > sndSpaceLeft()) + if (sndBuffersLeft() < minlen) { if (m_iSndTimeOut >= 0) throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0); - // XXX Not sure if this was intended: - // The 'len' exceeds the bytes left in the send buffer... - // ... so we do nothing and return success??? + // XXX This looks very weird here, however most likely + // this will happen only in the following case, when + // the above loop has been interrupted, which happens when: + // 1. The buffers left gets enough for minlen - but this is excluded + // in the first condition here. + // 2. In the case of sending timeout, the above loop was interrupted + // due to reaching timeout, but this is excluded by the second + // condition here + // 3. The 'stillConnected()' or m_bPeerHealth condition is false, of which: + // - broken/closing status is checked and responded with CONNECTION/CONNLOST + // - not connected status is checked and responded with CONNECTION/NOCONN + // - m_bPeerHealth condition is checked and responded with PEERERROR + // + // ERGO: never happens? + LOGC(mglog.Fatal) << "IPE: sendmsg: the loop exited, while not enough size, still connected, peer healthy. Impossible."; + return 0; } } @@ -4646,24 +4743,34 @@ int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder, uint64_t s if (m_pSndBuffer->getCurrBufSize() == 0) m_llSndDurationCounter = CTimer::getTime(); + int size = len; + if (!m_bMessageAPI) + { + // For STREAM API it's allowed to send less bytes than the given buffer. + // Just return how many bytes were actually scheduled for writing. + // XXX May be reasonable to add a flag that requires that the function + // not return until the buffer is sent completely. + size = min(len, sndBuffersLeft() * m_iMaxSRTPayloadSize); + } + // insert the user buffer into the sending list #ifdef SRT_ENABLE_CBRTIMESTAMP - if (srctime == 0) + if (mctrl.srctime == 0) { uint64_t currtime_tk; CTimer::rdtsc(currtime_tk); m_ullSndLastCbrTime_tk = max(currtime_tk, m_ullSndLastCbrTime_tk + m_ullInterval_tk); - srctime = m_ullSndLastCbrTime_tk / m_ullCPUFrequency; + mctrl.srctime = m_ullSndLastCbrTime_tk / m_ullCPUFrequency; } #endif - m_pSndBuffer->addBuffer(data, len, msttl, inorder, srctime); - LOGC(dlog.Debug) << CONID() << "sock:SENDING srctime: " << srctime << "us DATA SIZE: " << len; + m_pSndBuffer->addBuffer(data, size, mctrl.msgttl, mctrl.inorder, mctrl.srctime, Ref(mctrl.msgno)); + LOGC(dlog.Debug) << CONID() << "sock:SENDING srctime: " << mctrl.srctime << "us DATA SIZE: " << size; // insert this socket to the snd list if it is not on the list yet m_pSndQueue->m_pSndUList->update(this, CSndUList::rescheduleIf(bCongestion)); - if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) + if (sndBuffersLeft() < 1) // XXX Not sure if it should test if any space in the buffer, or as requried. { // write is not available any more s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false); @@ -4673,24 +4780,71 @@ int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder, uint64_t s if (bCongestion) throw CUDTException(MJ_AGAIN, MN_CONGESTION, 0); #endif /* SRT_ENABLE_ECN */ - return len; + return size; } -int CUDT::recvmsg(char* data, int len) +int CUDT::recv(char* data, int len) { - uint64_t srctime; - return(CUDT::recvmsg(data, len, srctime)); + if (!m_bConnected || !m_Smoother.ready()) + throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); + + if (len <= 0) + { + LOGC(dlog.Error) << "Length of '" << len << "' supplied to srt_recv."; + throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); + } + + if (m_bMessageAPI) + { + SRT_MSGCTRL mctrl = srt_msgctrl_default; + return receiveMessage(data, len, Ref(mctrl)); + } + + return receiveBuffer(data, len); } int CUDT::recvmsg(char* data, int len, uint64_t& srctime) { + if (!m_bConnected || !m_Smoother.ready()) + throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); + if (len <= 0) + { + LOGC(dlog.Error) << "Length of '" << len << "' supplied to srt_recvmsg."; + throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); + } + + if (m_bMessageAPI) + { + SRT_MSGCTRL mctrl = srt_msgctrl_default; + int ret = receiveMessage(data, len, Ref(mctrl)); + srctime = mctrl.srctime; + return ret; + } + + return receiveBuffer(data, len); +} + +int CUDT::recvmsg2(char* data, int len, ref_t mctrl) +{ if (!m_bConnected || !m_Smoother.ready()) throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); if (len <= 0) - return 0; + { + LOGC(dlog.Error) << "Length of '" << len << "' supplied to srt_recvmsg."; + throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); + } + if (m_bMessageAPI) + return receiveMessage(data, len, mctrl); + + return receiveBuffer(data, len); +} + +int CUDT::receiveMessage(char* data, int len, ref_t r_mctrl) +{ + SRT_MSGCTRL& mctrl = r_mctrl; // Recvmsg isn't restricted to the smoother type, it's the most // basic method of passing the data. You can retrieve data as // they come in, however you need to match the size of the buffer. @@ -4715,6 +4869,7 @@ int CUDT::recvmsg(char* data, int len, uint64_t& srctime) if (m_bBroken || m_bClosing) { int res = m_pRcvBuffer->readMsg(data, len); + mctrl.srctime = 0; /* Kick TsbPd thread to schedule next wakeup (if running) */ if (m_bTsbPd) @@ -4739,7 +4894,7 @@ int CUDT::recvmsg(char* data, int len, uint64_t& srctime) if (!m_bSynRecving) { - int res = m_pRcvBuffer->readMsg(data, len, srctime); + int res = m_pRcvBuffer->readMsg(data, len, r_mctrl); if (res == 0) { // read is not available any more @@ -4820,7 +4975,7 @@ int CUDT::recvmsg(char* data, int len, uint64_t& srctime) fputs(ptrn, stderr); // */ - res = m_pRcvBuffer->readMsg(data, len, srctime); + res = m_pRcvBuffer->readMsg(data, len, r_mctrl); if (m_bBroken || m_bClosing) { @@ -4871,97 +5026,112 @@ int CUDT::recvmsg(char* data, int len, uint64_t& srctime) int64_t CUDT::sendfile(fstream& ifs, int64_t& offset, int64_t size, int block) { - if (m_bBroken || m_bClosing) - throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); - else if (!m_bConnected || !m_Smoother.ready()) - throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); + if (m_bBroken || m_bClosing) + throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + else if (!m_bConnected || !m_Smoother.ready()) + throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); - if (size <= 0) - return 0; + if (size <= 0 && size != -1) + return 0; - if (!m_Smoother->checkTransArgs(Smoother::STA_FILE, Smoother::STAD_SEND, 0, size, -1, false)) - throw CUDTException(MJ_NOTSUP, MN_INVALBUFFERAPI, 0); + if (!m_Smoother->checkTransArgs(Smoother::STA_FILE, Smoother::STAD_SEND, 0, size, -1, false)) + throw CUDTException(MJ_NOTSUP, MN_INVALBUFFERAPI, 0); - CGuard sendguard(m_SendLock); + CGuard sendguard(m_SendLock); - if (m_pSndBuffer->getCurrBufSize() == 0) - { - // delay the EXP timer to avoid mis-fired timeout - uint64_t currtime_tk; - CTimer::rdtsc(currtime_tk); - // (fix keepalive) m_ullLastRspTime_tk = currtime_tk; - m_ullLastRspAckTime_tk = currtime_tk; - m_iReXmitCount = 1; - } + if (m_pSndBuffer->getCurrBufSize() == 0) + { + // delay the EXP timer to avoid mis-fired timeout + uint64_t currtime_tk; + CTimer::rdtsc(currtime_tk); + // (fix keepalive) m_ullLastRspTime_tk = currtime_tk; + m_ullLastRspAckTime_tk = currtime_tk; + m_iReXmitCount = 1; + } - int64_t tosend = size; - int unitsize; + // positioning... + try + { + if (size == -1) + { + ifs.seekg(0, std::ios::end); + size = ifs.tellg(); + if (offset > size) + throw 0; // let it be caught below + } - // positioning... - try - { - ifs.seekg((streamoff)offset); - } - catch (...) - { - // XXX It would be nice to note that this is reported - // by exception only if explicitly requested by setting - // the exception flags in the stream. - throw CUDTException(MJ_FILESYSTEM, MN_SEEKGFAIL); - } + // This will also set the position back to the beginning + // in case when it was moved to the end for measuring the size. + // This will also fail if the offset exceeds size, so measuring + // the size can be skipped if not needed. + ifs.seekg((streamoff)offset); + if (!ifs.good()) + throw 0; + } + catch (...) + { + // XXX It would be nice to note that this is reported + // by exception only if explicitly requested by setting + // the exception flags in the stream. Here it's fixed so + // that when this isn't set, the exception is "thrown manually". + throw CUDTException(MJ_FILESYSTEM, MN_SEEKGFAIL); + } - // sending block by block - while (tosend > 0) - { - if (ifs.fail()) - throw CUDTException(MJ_FILESYSTEM, MN_WRITEFAIL); + int64_t tosend = size; + int unitsize; - if (ifs.eof()) - break; + // sending block by block + while (tosend > 0) + { + if (ifs.fail()) + throw CUDTException(MJ_FILESYSTEM, MN_WRITEFAIL); - unitsize = int((tosend >= block) ? block : tosend); + if (ifs.eof()) + break; - { - CGuard lk(m_SendBlockLock); + unitsize = int((tosend >= block) ? block : tosend); - while (stillConnected() && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth) - pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); - } + { + CGuard lk(m_SendBlockLock); - if (m_bBroken || m_bClosing) - throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); - else if (!m_bConnected) - throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); - else if (!m_bPeerHealth) - { - // reset peer health status, once this error returns, the app should handle the situation at the peer side - m_bPeerHealth = true; - throw CUDTException(MJ_PEERERROR); - } + while (stillConnected() && (sndBuffersLeft() <= 0) && m_bPeerHealth) + pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); + } - // record total time used for sending - if (m_pSndBuffer->getCurrBufSize() == 0) - m_llSndDurationCounter = CTimer::getTime(); + if (m_bBroken || m_bClosing) + throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + else if (!m_bConnected) + throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); + else if (!m_bPeerHealth) + { + // reset peer health status, once this error returns, the app should handle the situation at the peer side + m_bPeerHealth = true; + throw CUDTException(MJ_PEERERROR); + } - int64_t sentsize = m_pSndBuffer->addBufferFromFile(ifs, unitsize); + // record total time used for sending + if (m_pSndBuffer->getCurrBufSize() == 0) + m_llSndDurationCounter = CTimer::getTime(); - if (sentsize > 0) - { - tosend -= sentsize; - offset += sentsize; - } + int64_t sentsize = m_pSndBuffer->addBufferFromFile(ifs, unitsize); - // insert this socket to snd list if it is not on the list yet - m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); - } + if (sentsize > 0) + { + tosend -= sentsize; + offset += sentsize; + } - if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) - { - // write is not available any more - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false); - } + // insert this socket to snd list if it is not on the list yet + m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); + } - return size - tosend; + if (sndBuffersLeft() <= 0) + { + // write is not available any more + s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false); + } + + return size - tosend; } int64_t CUDT::recvfile(fstream& ofs, int64_t& offset, int64_t size, int block) @@ -4981,25 +5151,59 @@ int64_t CUDT::recvfile(fstream& ofs, int64_t& offset, int64_t size, int block) if (!m_Smoother->checkTransArgs(Smoother::STA_FILE, Smoother::STAD_RECV, 0, size, -1, false)) throw CUDTException(MJ_NOTSUP, MN_INVALBUFFERAPI, 0); + if (m_bTsbPd) + { + LOGC(dlog.Error) << "Reading from file is incompatible with TSBPD mode and would cause a deadlock\n"; + throw CUDTException(MJ_NOTSUP, MN_INVALBUFFERAPI, 0); + } + CGuard recvguard(m_RecvLock); - int64_t torecv = size; - int unitsize = block; - int recvsize; + // Well, actually as this works over a FILE (fstream), not just a stream, + // the size can be measured anyway and predicted if setting the offset might + // have a chance to work or not. // positioning... try { - ofs.seekp((streamoff)offset); + if (offset > 0) + { + // Don't do anything around here if the offset == 0, as this + // is the default offset after opening. Whether this operation + // is performed correctly, it highly depends on how the file + // has been open. For example, if you want to overwrite parts + // of an existing file, the file must exist, and the ios::trunc + // flag must not be set. If the file is open for only ios::out, + // then the file will be truncated since the offset position on + // at the time when first written; if ios::in|ios::out, then + // it won't be truncated, just overwritten. + + // What is required here is that if offset is 0, don't try to + // change the offset because this might be impossible with + // the current flag set anyway. + + // Also check the status and CAUSE exception manually because + // you don't know, as well, whether the user has set exception + // flags. + + ofs.seekp((streamoff)offset); + if (!ofs.good()) + throw 0; // just to get caught :) + } } catch (...) { // XXX It would be nice to note that this is reported // by exception only if explicitly requested by setting - // the exception flags in the stream. + // the exception flags in the stream. For a case, when it's not, + // an additional explicit throwing happens when failbit is set. throw CUDTException(MJ_FILESYSTEM, MN_SEEKPFAIL); } + int64_t torecv = size; + int unitsize = block; + int recvsize; + // receiving... "recvfile" is always blocking while (torecv > 0) { @@ -5027,7 +5231,7 @@ int64_t CUDT::recvfile(fstream& ofs, int64_t& offset, int64_t size, int block) throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); } - unitsize = int((torecv >= block) ? block : torecv); + unitsize = int((torecv == -1 || torecv >= block) ? block : torecv); recvsize = m_pRcvBuffer->readBufferToFile(ofs, unitsize); if (recvsize > 0) @@ -5097,7 +5301,7 @@ void CUDT::sample(CPerfMon* perf, bool clear) if (pthread_mutex_trylock(&m_ConnectionLock) == 0) { perf->byteAvailSndBuf = (m_pSndBuffer == NULL) ? 0 - : (m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iMSS; + : sndBuffersLeft() * m_iMSS; perf->byteAvailRcvBuf = (m_pRcvBuffer == NULL) ? 0 : m_pRcvBuffer->getAvailBufSize() * m_iMSS; diff --git a/srtcore/core.h b/srtcore/core.h index 28013eaac..af6848610 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -189,9 +189,10 @@ class CUDT static int recv(SRTSOCKET u, char* buf, int len, int flags); static int sendmsg(SRTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false, uint64_t srctime = 0LL); static int recvmsg(SRTSOCKET u, char* buf, int len, uint64_t& srctime); - static int recvmsg(SRTSOCKET u, char* buf, int len); - static int64_t sendfile(SRTSOCKET u, std::fstream& ifs, int64_t& offset, int64_t size, int block = 364000); - static int64_t recvfile(SRTSOCKET u, std::fstream& ofs, int64_t& offset, int64_t size, int block = 7280000); + static int sendmsg2(SRTSOCKET u, const char* buf, int len, ref_t mctrl); + static int recvmsg2(SRTSOCKET u, char* buf, int len, ref_t mctrl); + static int64_t sendfile(SRTSOCKET u, std::fstream& ifs, int64_t& offset, int64_t size, int block = SRT_DEFAULT_SENDFILE_BLOCK); + static int64_t recvfile(SRTSOCKET u, std::fstream& ofs, int64_t& offset, int64_t size, int block = SRT_DEFAULT_RECVFILE_BLOCK); static int select(int nfds, ud_set* readfds, ud_set* writefds, ud_set* exceptfds, const timeval* timeout); static int selectEx(const std::vector& fds, std::vector* readfds, std::vector* writefds, std::vector* exceptfds, int64_t msTimeOut); static int epoll_create(); @@ -346,6 +347,8 @@ class CUDT void updateSrtRcvSettings(); void updateSrtSndSettings(); + void checkNeedDrop(ref_t bCongestion); + /// Connect to a UDT entity listening at address "peer", which has sent "hs" request. /// @param peer [in] The address of the listening UDT entity. /// @param hs [in/out] The handshake information sent by the peer side (in), negotiated value (out). @@ -361,7 +364,10 @@ class CUDT /// @param len [in] The size of the data block. /// @return Actual size of data sent. - int send(const char* data, int len); + int send(const char* data, int len) + { + return sendmsg(data, len, -1, false, 0); + } /// Request UDT to receive data to a memory block "data" with size of "len". /// @param data [out] data received. @@ -384,8 +390,14 @@ class CUDT /// @param len [in] size of the buffer. /// @return Actual size of data received. + int sendmsg2(const char* data, int len, ref_t m); + int recvmsg(char* data, int len, uint64_t& srctime); - int recvmsg(char* data, int len); + + int recvmsg2(char* data, int len, ref_t m); + + int receiveMessage(char* data, int len, ref_t m); + int receiveBuffer(char* data, int len); /// Request UDT to send out a file described as "fd", starting from "offset", with size of "size". /// @param ifs [in] The input file stream. diff --git a/srtcore/srt.h b/srtcore/srt.h index 0d1fab765..f8a66d59f 100644 --- a/srtcore/srt.h +++ b/srtcore/srt.h @@ -128,6 +128,7 @@ typedef enum SRT_SOCKOPT { SRTO_MSS = 0, // the Maximum Transfer Unit SRTO_SNDSYN = 1, // if sending is blocking SRTO_RCVSYN = 2, // if receiving is blocking + SRTO_ISN = 3, // Initial Sequence Number (valid only after srt_connect or srt_accept-ed sockets) SRTO_FC = 4, // Flight flag size (window size) SRTO_SNDBUF = 5, // maximum buffer in sending queue SRTO_RCVBUF = 6, // UDT receiving buffer size @@ -194,8 +195,8 @@ static const SRT_SOCKOPT SRTO_TSBPDMAXLAG SRT_ATR_DEPRECATED = (SRT_SOCKOPT)32; // This option is a derivative from UDT; the mechanism that uses it is now // known as Smoother and settable by SRTO_SMOOTHER, or more generally by -// SRTO_TRANSTYPE. The freed number will be reused for some other -// option. This option should have never been used anywhere, just for safety +// SRTO_TRANSTYPE. The freed number has been reused for a read-only option +// SRTO_ISN. This option should have never been used anywhere, just for safety // this is temporarily declared as deprecated. static const SRT_SOCKOPT SRTO_CC SRT_ATR_DEPRECATED = (SRT_SOCKOPT)3; @@ -220,6 +221,19 @@ typedef enum SRT_TRANSTYPE SRTT_INVALID } SRT_TRANSTYPE; +// These sizes should be used for Live mode. In Live mode you should not +// exceed the size that fits in a single MTU. + +// This is for MPEG TS and it's a default SRTO_PAYLOADSIZE for SRTT_LIVE. +static const int SRT_LIVE_DEF_PLSIZE = 1316; // = 188*7, recommended for MPEG TS + +// This is the maximum payload size for Live mode, should you have a different +// payload type than MPEG TS. +static const int SRT_LIVE_MAX_PLSIZE = 1456; // MTU(1500) - UDP.hdr(28) - SRT.hdr(16) + +// Latency for Live transmission: default is 120 +static const int SRT_LIVE_DEF_LATENCY_MS = 120; + struct CBytePerfMon { @@ -480,17 +494,6 @@ inline SRT_EPOLL_OPT operator|(SRT_EPOLL_OPT a1, SRT_EPOLL_OPT a2) typedef struct CPerfMon SRT_TRACEINFO; typedef struct CBytePerfMon SRT_TRACEBSTATS; -// This structure is only a kind-of wannabe. The only use of it is currently -// the 'srctime', however the functionality of application-supplied timestamps -// also doesn't work properly. Left for future until the problems are solved. -// This may prove useful as currently there's no way to tell the application -// that TLPKTDROP facility has dropped some data in favor of timely delivery. -typedef struct SRT_MsgCtrl_ { - int flags; - int boundary; //0:mid pkt, 1(01b):end of frame, 2(11b):complete frame, 3(10b): start of frame - uint64_t srctime; //source timestamp (usec), 0LL: use internal time -} SRT_MSGCTRL; - static const SRTSOCKET SRT_INVALID_SOCK = -1; static const int SRT_ERROR = -1; @@ -500,11 +503,13 @@ SRT_API extern int srt_cleanup(void); // socket operations SRT_API extern SRTSOCKET srt_socket(int af, int type, int protocol); +SRT_API extern SRTSOCKET srt_create_socket(); SRT_API extern int srt_bind(SRTSOCKET u, const struct sockaddr* name, int namelen); SRT_API extern int srt_bind_peerof(SRTSOCKET u, UDPSOCKET udpsock); SRT_API extern int srt_listen(SRTSOCKET u, int backlog); SRT_API extern SRTSOCKET srt_accept(SRTSOCKET u, struct sockaddr* addr, int* addrlen); SRT_API extern int srt_connect(SRTSOCKET u, const struct sockaddr* name, int namelen); +SRT_API extern int srt_connect_debug(SRTSOCKET u, const struct sockaddr* name, int namelen, int forced_isn); SRT_API extern int srt_rendezvous(SRTSOCKET u, const struct sockaddr* local_name, int local_namelen, const struct sockaddr* remote_name, int remote_namelen); SRT_API extern int srt_close(SRTSOCKET u); @@ -514,16 +519,63 @@ SRT_API extern int srt_getsockopt(SRTSOCKET u, int level /*ignored*/, SRT_SOCKOP SRT_API extern int srt_setsockopt(SRTSOCKET u, int level /*ignored*/, SRT_SOCKOPT optname, const void* optval, int optlen); SRT_API extern int srt_getsockflag(SRTSOCKET u, SRT_SOCKOPT opt, void* optval, int* optlen); SRT_API extern int srt_setsockflag(SRTSOCKET u, SRT_SOCKOPT opt, const void* optval, int optlen); -SRT_API extern int srt_send(SRTSOCKET u, const char* buf, int len, int flags); -SRT_API extern int srt_recv(SRTSOCKET u, char* buf, int len, int flags); -// The sendmsg/recvmsg and their 2 counterpart require MAXIMUM the size of SRT payload size (1316). -// Any data over that size will be ignored. +// XXX Note that the srctime functionality doesn't work yet and needs fixing. +typedef struct SRT_MsgCtrl_ +{ + int flags; // Left for future + int msgttl; // TTL for a message, default -1 (delivered always) + int inorder; // Whether a message is allowed to supersede partially lost one. Unused in stream and live mode. + int boundary; //0:mid pkt, 1(01b):end of frame, 2(11b):complete frame, 3(10b): start of frame + uint64_t srctime; // source timestamp (usec), 0LL: use internal time + int32_t pktseq; // sequence number of the first packet in received message (unused for sending) + int32_t msgno; // message number (output value for both sending and receiving) +} SRT_MSGCTRL; + +// You are free to use either of these two methods to set SRT_MSGCTRL object +// to default values: either call srt_msgctrl_init(&obj) or obj = srt_msgctrl_default. +SRT_API extern void srt_msgctrl_init(SRT_MSGCTRL* mctrl); +SRT_API extern const SRT_MSGCTRL srt_msgctrl_default; + +// The send/receive functions. +// These functions have different names due to different sets of parameters +// to be supplied. Not all of them are needed or make sense in all modes: + +// Plain: supply only the buffer and its size. +// Msg: supply additionally +// - TTL (message is not delivered when exceeded) and +// - INORDER (when false, the message is allowed to be delivered in different +// order than when it was sent, when the later message is earlier ready to +// deliver) +// Msg2: Supply extra parameters in SRT_MSGCTRL. When receiving, these +// parameters will be filled, as needed. NULL is acceptable, in which case +// the defaults are used. + +// NOTE: srt_send and srt_recv have the last "..." left to allow ignore a +// deprecated and unused "flags" parameter. After confirming that all +// compat applications that pass useless 0 there are fixed, this will be +// removed. + +// Sending +SRT_API extern int srt_send(SRTSOCKET u, const char* buf, int len); SRT_API extern int srt_sendmsg(SRTSOCKET u, const char* buf, int len, int ttl/* = -1*/, int inorder/* = false*/); -SRT_API extern int srt_recvmsg(SRTSOCKET u, char* buf, int len); SRT_API extern int srt_sendmsg2(SRTSOCKET u, const char* buf, int len, SRT_MSGCTRL *mctrl); + +// Receiving +SRT_API extern int srt_recv(SRTSOCKET u, char* buf, int len); + +// srt_recvmsg is actually an alias to srt_recv, it stays under the old name for compat reasons. +SRT_API extern int srt_recvmsg(SRTSOCKET u, char* buf, int len); SRT_API extern int srt_recvmsg2(SRTSOCKET u, char *buf, int len, SRT_MSGCTRL *mctrl); + +// Special send/receive functions for files only. +#define SRT_DEFAULT_SENDFILE_BLOCK 364000 +#define SRT_DEFAULT_RECVFILE_BLOCK 7280000 +SRT_API int64_t srt_sendfile(SRTSOCKET u, const char* path, int64_t* offset, int64_t size, int block); +SRT_API int64_t srt_recvfile(SRTSOCKET u, const char* path, int64_t* offset, int64_t size, int block); + + // last error detection SRT_API extern const char* srt_getlasterror_str(void); SRT_API extern int srt_getlasterror(int* errno_loc); diff --git a/srtcore/srt_c_api.cpp b/srtcore/srt_c_api.cpp index 6abdf265d..602c60eb0 100644 --- a/srtcore/srt_c_api.cpp +++ b/srtcore/srt_c_api.cpp @@ -23,18 +23,41 @@ written by *****************************************************************************/ #include +#include #if __APPLE__ #include "TargetConditionals.h" #endif #include "srt.h" #include "common.h" #include "core.h" +#include "utilities.h" + +using namespace std; + extern "C" { int srt_startup() { return CUDT::startup(); } int srt_cleanup() { return CUDT::cleanup(); } + SRTSOCKET srt_socket(int af, int type, int protocol) { return CUDT::socket(af, type, protocol); } +SRTSOCKET srt_create_socket() +{ + // XXX This must include rework around m_iIPVersion. This must be + // abandoned completely and all "IP VERSION" thing should rely on + // the exact specification in the 'sockaddr' objects passed to other functions, + // that is, the "current IP Version" remains undefined until any of + // srt_bind() or srt_connect() function is done. And when any of these + // functions are being called, the IP version is contained in the + // sockaddr object passed there. + + // Until this rework is done, srt_create_socket() will set the + // default AF_INET family. + + // Note that all arguments except the first one here are ignored. + return CUDT::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); +} + int srt_bind(SRTSOCKET u, const struct sockaddr * name, int namelen) { return CUDT::bind(u, name, namelen); } int srt_bind_peerof(SRTSOCKET u, UDPSOCKET udpsock) { return CUDT::bind(u, udpsock); } int srt_listen(SRTSOCKET u, int backlog) { return CUDT::listen(u, backlog); } @@ -94,31 +117,64 @@ int srt_getsockflag(SRTSOCKET u, SRT_SOCKOPT opt, void* optval, int* optlen) int srt_setsockflag(SRTSOCKET u, SRT_SOCKOPT opt, const void* optval, int optlen) { return CUDT::setsockopt(u, 0, opt, optval, optlen); } -int srt_send(SRTSOCKET u, const char * buf, int len, int flags) { return CUDT::send(u, buf, len, flags); } -int srt_recv(SRTSOCKET u, char * buf, int len, int flags) { return CUDT::recv(u, buf, len, flags); } +int srt_send(SRTSOCKET u, const char * buf, int len) { return CUDT::send(u, buf, len, 0); } +int srt_recv(SRTSOCKET u, char * buf, int len) { return CUDT::recv(u, buf, len, 0); } int srt_sendmsg(SRTSOCKET u, const char * buf, int len, int ttl, int inorder) { return CUDT::sendmsg(u, buf, len, ttl, 0!= inorder); } -int srt_recvmsg(SRTSOCKET u, char * buf, int len) { return CUDT::recvmsg(u, buf, len); } +int srt_recvmsg(SRTSOCKET u, char * buf, int len) { uint64_t ign_srctime; return CUDT::recvmsg(u, buf, len, ign_srctime); } +int64_t srt_sendfile(SRTSOCKET u, const char* path, int64_t* offset, int64_t size, int block) +{ + if (!path || !offset ) + { + return CUDT::setError(CUDTException(MJ_NOTSUP, MN_INVAL, 0)); + } + fstream ifs(path, ios::binary | ios::in); + if (!ifs) + { + return CUDT::setError(CUDTException(MJ_FILESYSTEM, MN_READFAIL, 0)); + } + int64_t ret = CUDT::sendfile(u, ifs, *offset, size, block); + ifs.close(); + return ret; +} + +int64_t srt_recvfile(SRTSOCKET u, const char* path, int64_t* offset, int64_t size, int block) +{ + if (!path || !offset ) + { + return CUDT::setError(CUDTException(MJ_NOTSUP, MN_INVAL, 0)); + } + fstream ofs(path, ios::binary | ios::out); + if (!ofs) + { + return CUDT::setError(CUDTException(MJ_FILESYSTEM, MN_WRAVAIL, 0)); + } + int64_t ret = CUDT::recvfile(u, ofs, *offset, size, block); + ofs.close(); + return ret; +} + +extern const SRT_MSGCTRL srt_msgctrl_default = { 0, -1, false, 0, 0, 0, 0 }; + +void srt_msgctrl_init(SRT_MSGCTRL* mctrl) +{ + *mctrl = srt_msgctrl_default; +} int srt_sendmsg2(SRTSOCKET u, const char * buf, int len, SRT_MSGCTRL *mctrl) { + // Allow NULL mctrl in the API, but not internally. if (mctrl) - return CUDT::sendmsg(u, buf, len, -1, true, mctrl->srctime); - else - return CUDT::sendmsg(u, buf, len); + return CUDT::sendmsg2(u, buf, len, Ref(*mctrl)); + SRT_MSGCTRL mignore = srt_msgctrl_default; + return CUDT::sendmsg2(u, buf, len, Ref(mignore)); } int srt_recvmsg2(SRTSOCKET u, char * buf, int len, SRT_MSGCTRL *mctrl) { - uint64_t srctime = 0; - int rc = CUDT::recvmsg(u, buf, len, srctime); - if (rc == UDT::ERROR) { - // error happen - return -1; - } - if (mctrl) - mctrl->srctime = srctime; - return rc; + return CUDT::recvmsg2(u, buf, len, Ref(*mctrl)); + SRT_MSGCTRL mignore = srt_msgctrl_default; + return CUDT::recvmsg2(u, buf, len, Ref(mignore)); } const char* srt_getlasterror_str() { return UDT::getlasterror().getErrorMessage(); }