Skip to content

Commit

Permalink
Merge pull request #1 from MonaSolutions/epoll-wait2
Browse files Browse the repository at this point in the history
Merge last changes from epoll-wait2
  • Loading branch information
thomasjammet authored Mar 30, 2019
2 parents 72a9704 + 5560d43 commit 14c2a91
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 85 deletions.
60 changes: 58 additions & 2 deletions docs/API-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ SRT API Functions
* [srt_epoll_add_usock, srt_epoll_add_ssock, srt_epoll_update_usock, srt_epoll_update_ssock](#srt_epoll_add_usock-srt_epoll_add_ssock-srt_epoll_update_usock-srt_epoll_update_ssock)
* [srt_epoll_remove_usock, srt_epoll_remove_ssock](#srt_epoll_remove_usock-srt_epoll_remove_ssock)
* [srt_epoll_wait](#srt_epoll_wait)
* [srt_epoll_uwait](#srt_epoll_uwait)
* [srt_epoll_release](#srt_epoll_release)
- [**Logging control**](#Logging-control)
* [srt_setloglevel](#srt_setloglevel)
Expand Down Expand Up @@ -1048,8 +1049,8 @@ int srt_epoll_wait(int eid, SRTSOCKET* readfds, int* rnum, SRTSOCKET* writefds,
```

Blocks the call until any readiness state occurs in the epoll container.
Mind that the readiness states reported in epoll are **permanent, not
edge-triggered**.
Mind that the readiness states reported in epoll are **permanent (level-
triggered), not edge-triggered**.

Readiness can be on a socket in the container for the event type as per
subscription. The first readiness state causes this function to exit, but
Expand Down Expand Up @@ -1089,6 +1090,61 @@ of error has occurred on the socket.
This is reported only if `msTimeOut` was \>=0, otherwise the function waits
indefinitely.

### srt_epoll_uwait
```
int srt_epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, int edgeMode/* = false*/);
```

Blocks the call until any readiness state occurs in the epoll container.
Mind that the readiness states reported in epoll are **level-triggered
or edge-triggered** following the `edgeMode` argument and this function
only reports user socket (SRT socket) events.
In the **edge-triggered** mode the function only returns socket states
which have changed since last call. On the other hand **level-triggered**
mode will keep on returning while the state stays the same.

This function blocks until the timeout. If timeout is 0, it exits
immediately after checking. If timeout is -1, it blocks indefinitely
until a readiness state occurs.

* `eid`: epoll container
* `fdsSet` : A pointer to an array of `SRT_EPOLL_EVENT`
* `fdsSize` : The size of the fdsSet array
* `msTimeOut` : Timeout specified in milliseconds, or special values (0 or -1)
* `edgeMode` : The triggered mode (default is 0 for **level-triggered**)

The `SRT_MSGCTRL` structure:

```
typedef struct SRT_EPOLL_EVENT_
{
SRTSOCKET fd;
int events; // UDT_EPOLL_IN | UDT_EPOLL_OUT | UDT_EPOLL_ERR
} SRT_EPOLL_EVENT;
```

* `fd` : the user socket (SRT socket)
* `events` : any combination of `SRT_EPOLL_IN`, `SRT_EPOLL_OUT` and `SRT_EPOLL_ERR`

Note that when the `SRT_EPOLL_ERR` is set the error cannot be retrieved
with `srt_getlasterror` but it means that the socket is closed and the
socket state can be read using `srt_getsockstate`.

- Returns:

* The total amount of user socket (SRT socket) states changed. Note that this
can be larger than `fdsSize`, in this case and if `edgeMode` is true only `fdsSize`
events are returned in the `fdsSet` and the remaining events can be retrieved by
a new call to this function
* -1 in case of error

- Errors:

* `SRT_EINVPOLLID`: `eid` designates no valid EID object
* `SRT_ETIMEOUT`: Up to `msTimeOut` no sockets subscribed in `eid` were ready.
This is reported only if `msTimeOut` was \>=0, otherwise the function waits
indefinitely.

### srt_epoll_release
```
int srt_epoll_release(int eid);
Expand Down
53 changes: 14 additions & 39 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1382,13 +1382,14 @@ int CUDTUnited::epoll_wait(
return m_EPoll.wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds);
}

int CUDTUnited::epoll_wait(
int CUDTUnited::epoll_uwait(
const int eid,
map<SRTSOCKET, int>& fds,
SRT_EPOLL_EVENT* fdsSet,
int fdsSize,
int64_t msTimeOut,
int pickup)
bool edgeMode)
{
return m_EPoll.wait(eid, fds, msTimeOut, pickup);
return m_EPoll.uwait(eid, fdsSet, fdsSize, msTimeOut, edgeMode);
}

int CUDTUnited::epoll_release(const int eid)
Expand Down Expand Up @@ -2600,42 +2601,16 @@ int CUDT::epoll_wait(
}
}

int CUDT::epoll_wait(
const int eid,
SRT_EPOLL_EVENT* fdsSet,
int CUDT::epoll_uwait(
const int eid,
SRT_EPOLL_EVENT* fdsSet,
int fdsSize,
int64_t msTimeOut,
int pickup
)
{
// if fdsSet is NULL and waiting time is infinite, then this would be a deadlock
if ((!fdsSet || !fdsSize) && (msTimeOut < 0)) {
s_UDTUnited.setError(new CUDTException(MJ_NOTSUP, MN_INVAL, 0));
return ERROR;
}

int64_t msTimeOut,
bool modeEdge)
{
try
{
// This API is an alternative format for epoll_wait, created for
// compatibility with other languages. Users need to pass in an array
// for holding the returned sockets, with the maximum array length
// stored in *rnum, etc., which will be updated with returned number
// of sockets.
map<SRTSOCKET, int> tmpFdsSet;
int total = s_UDTUnited.epoll_wait(eid, tmpFdsSet, msTimeOut, pickup);
if (total > 0)
{
total = 0;
for (map<SRTSOCKET, int>::const_iterator it = tmpFdsSet.begin(); it != tmpFdsSet.end(); ++it)
{
if (total >= fdsSize)
break;
SRT_EPOLL_EVENT& event = fdsSet[total++];
event.fd = it->first;
event.events = it->second;
}
}
return total;
return s_UDTUnited.epoll_uwait(eid, fdsSet, fdsSize, msTimeOut, modeEdge);
}
catch (CUDTException e)
{
Expand Down Expand Up @@ -3090,9 +3065,9 @@ int epoll_wait2(
return ret;
}

int epoll_wait2(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, bool edgeMode)
int epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, bool edgeMode)
{
return CUDT::epoll_wait(eid, fdsSet, fdsSize, msTimeOut, edgeMode ? fdsSize : 0);
return CUDT::epoll_uwait(eid, fdsSet, fdsSize, msTimeOut, edgeMode);
}

int epoll_release(int eid)
Expand Down
2 changes: 1 addition & 1 deletion srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ friend class CRendezvousQueue;
int epoll_update_usock(const int eid, const SRTSOCKET u, const int* events = NULL);
int epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events = NULL);
int epoll_wait(const int eid, std::set<SRTSOCKET>* readfds, std::set<SRTSOCKET>* writefds, int64_t msTimeOut, std::set<SYSSOCKET>* lrfds = NULL, std::set<SYSSOCKET>* lwfds = NULL);
int epoll_wait(const int eid, std::map<SRTSOCKET, int>& fds, int64_t msTimeOut, int pickup = 0);
int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, bool edgeMode = false);
int epoll_release(const int eid);

/// record the UDT exception.
Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class CUDT
static int epoll_update_usock(const int eid, const SRTSOCKET u, const int* events = NULL);
static int epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events = NULL);
static int epoll_wait(const int eid, std::set<SRTSOCKET>* readfds, std::set<SRTSOCKET>* writefds, int64_t msTimeOut, std::set<SYSSOCKET>* lrfds = NULL, std::set<SYSSOCKET>* wrfds = NULL);
static int epoll_wait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, int pickup = 0);
static int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, bool edgeMode = false);
static int epoll_release(const int eid);
static CUDTException& getlasterror();
static int perfmon(SRTSOCKET u, CPerfMon* perf, bool clear = true);
Expand Down
49 changes: 28 additions & 21 deletions srtcore/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,13 @@ int CEPoll::update_usock(const int eid, const SRTSOCKET& u, const int* events)
pair<int, int>& wait = p->second.m_sUDTSocksWait[u];
wait.first = evts; // watching
evts = wait.first & wait.second; // watching & state!
if(evts) {
if(evts)
{
p->second.m_sUDTSocksSet[u] = evts;
return 0;
}
} else
}
}
else
p->second.m_sUDTSocksWait.erase(u);
p->second.m_sUDTSocksSet.erase(u);
return 0;
Expand Down Expand Up @@ -311,10 +313,14 @@ int CEPoll::update_ssock(const int eid, const SYSSOCKET& s, const int* events)
return 0;
}

int CEPoll::wait(const int eid, map<SRTSOCKET, int>& fdsSet, int64_t msTimeOut, int pickup)
int CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, bool edgeMode)
{
int64_t entertime = CTimer::getTime();
while(true)
// if fdsSet is NULL and waiting time is infinite, then this would be a deadlock
if ((!fdsSet || !fdsSize) && (msTimeOut < 0))
throw CUDTException(MJ_NOTSUP, MN_INVAL);

while (true)
{
{
CGuard pg(m_EPollLock);
Expand All @@ -323,26 +329,27 @@ int CEPoll::wait(const int eid, map<SRTSOCKET, int>& fdsSet, int64_t msTimeOut,
throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
if (p->second.m_sUDTSocksWait.empty() && p->second.m_sLocals.empty() && (msTimeOut < 0))
throw CUDTException(MJ_NOTSUP, MN_INVAL);

fdsSet = p->second.m_sUDTSocksSet;
if(pickup>=0)
{ // pickup events (edge mode)
if (pickup < (int)fdsSet.size())
{
map<SRTSOCKET, int>::iterator next = p->second.m_sUDTSocksSet.begin();
std::advance(next, pickup);
p->second.m_sUDTSocksSet.erase(p->second.m_sUDTSocksSet.begin(), next);
}
else
p->second.m_sUDTSocksSet.clear();
int total = p->second.m_sUDTSocksSet.size();
if (total)
{
int size = 0;
map<SRTSOCKET, int>::iterator it = p->second.m_sUDTSocksSet.begin();
while (size < fdsSize && it != p->second.m_sUDTSocksSet.end())
{
SRT_EPOLL_EVENT& event = fdsSet[size++];
event.fd = it->first;
event.events = it->second;
if (edgeMode)
p->second.m_sUDTSocksSet.erase(it++);
else
++it;
}
return total;
}
}

if(!fdsSet.empty())
return fdsSet.size();

if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * int64_t(1000)))
throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);

CTimer::waitForEvent();
}
Expand Down
11 changes: 6 additions & 5 deletions srtcore/epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,15 @@ friend class CRendezvousQueue;

int wait(const int eid, std::set<SRTSOCKET>* readfds, std::set<SRTSOCKET>* writefds, int64_t msTimeOut, std::set<SYSSOCKET>* lrfds, std::set<SYSSOCKET>* lwfds);

/// wait for EPoll events or timeout optimized with an explicit ERR event and the edge mode option.
/// wait for EPoll events or timeout optimized with explicit EPOLL_ERR event and the edge mode option.
/// @param [in] eid EPoll ID.
/// @param [out] fds sets (UDT_EPOLL_IN | UDT_EPOLL_OUT | UDT_EPOLL_ERR).
/// @param [out] fdsSet array of user socket events (UDT_EPOLL_IN | UDT_EPOLL_OUT | UDT_EPOLL_ERR).
/// @param [int] fdsSize of fds array
/// @param [in] msTimeOut timeout threshold, in milliseconds.
/// @param [in] pickup events to get a edge trigger mode (0 means level trigger mode).
/// @return number of sockets available for IO.
/// @param [bool] edgeMode if true the events returned in fdsSet are then erased
/// @return total of available events in the epoll system (can be greater than fdsSize)

int wait(const int eid, std::map<SRTSOCKET, int>& fdsSet, int64_t msTimeOut, int pickup = 0);
int uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, bool edgeMode = false);

/// close and release an EPoll.
/// @param [in] eid EPoll ID.
Expand Down
6 changes: 3 additions & 3 deletions srtcore/srt.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,10 +629,10 @@ SRT_API int srt_epoll_wait(int eid, SRTSOCKET* readfds, int* rnum, SRTSOCKET* wr
SYSSOCKET* lrfds, int* lrnum, SYSSOCKET* lwfds, int* lwnum);
typedef struct SRT_EPOLL_EVENT_
{
SRTSOCKET fd;
int events; // UDT_EPOLL_IN | UDT_EPOLL_OUT | UDT_EPOLL_ERR
SRTSOCKET fd;
int events; // UDT_EPOLL_IN | UDT_EPOLL_OUT | UDT_EPOLL_ERR
} SRT_EPOLL_EVENT;
SRT_API int srt_epoll_wait2(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, bool edgeMode=false);
SRT_API int srt_epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, int edgeMode/*=false*/);
SRT_API int srt_epoll_release(int eid);

// Logging control
Expand Down
6 changes: 3 additions & 3 deletions srtcore/srt_c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,14 @@ int srt_epoll_wait(
lrfds, lrnum, lwfds, lwnum);
}

int srt_epoll_wait2(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, bool edgeMode)
int srt_epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, int edgeMode)
{
return UDT::epoll_wait2(
return UDT::epoll_uwait(
eid,
fdsSet,
fdsSize,
msTimeOut,
edgeMode);
edgeMode != 0);
}


Expand Down
2 changes: 1 addition & 1 deletion srtcore/udt.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ UDT_API int epoll_wait(int eid, std::set<UDTSOCKET>* readfds, std::set<UDTSOCKET
std::set<SYSSOCKET>* lrfds = NULL, std::set<SYSSOCKET>* wrfds = NULL);
UDT_API int epoll_wait2(int eid, UDTSOCKET* readfds, int* rnum, UDTSOCKET* writefds, int* wnum, int64_t msTimeOut,
SYSSOCKET* lrfds = NULL, int* lrnum = NULL, SYSSOCKET* lwfds = NULL, int* lwnum = NULL);
UDT_API int epoll_wait2(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, bool triggerMode=false);
UDT_API int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, bool edgeMode=false);
UDT_API int epoll_release(int eid);
UDT_API ERRORINFO& getlasterror();
UDT_API int getlasterror_code();
Expand Down
17 changes: 8 additions & 9 deletions test/test_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ TEST(CEPoll, InfiniteWait2)
const int epoll_id = srt_epoll_create();
ASSERT_GE(epoll_id, 0);

ASSERT_EQ(srt_epoll_wait2(epoll_id, nullptr, 0, -1), SRT_ERROR);
ASSERT_EQ(srt_epoll_uwait(epoll_id, nullptr, 0, -1, 0), SRT_ERROR);

try
{
Expand Down Expand Up @@ -90,7 +90,7 @@ TEST(CEPoll, WaitNoSocketsInEpoll2)

SRT_EPOLL_EVENT events[2];

ASSERT_EQ(srt_epoll_wait2(epoll_id, events, 2, -1), SRT_ERROR);
ASSERT_EQ(srt_epoll_uwait(epoll_id, events, 2, -1, 0), SRT_ERROR);

try
{
Expand Down Expand Up @@ -159,7 +159,7 @@ TEST(CEPoll, WaitEmptyCall2)
const int epoll_out = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
ASSERT_NE(srt_epoll_add_usock(epoll_id, client_sock, &epoll_out), SRT_ERROR);

ASSERT_EQ(srt_epoll_wait2(epoll_id, NULL, 0, -1), SRT_ERROR);
ASSERT_EQ(srt_epoll_uwait(epoll_id, NULL, 0, -1, 0), SRT_ERROR);

try
{
Expand Down Expand Up @@ -239,7 +239,7 @@ TEST(CEPoll, WaitAllSocketsInEpollReleased2)

SRT_EPOLL_EVENT events[2];

ASSERT_EQ(srt_epoll_wait2(epoll_id, events, 2, -1), SRT_ERROR);
ASSERT_EQ(srt_epoll_uwait(epoll_id, events, 2, -1, 0), SRT_ERROR);

try
{
Expand Down Expand Up @@ -369,12 +369,11 @@ TEST(CEPoll, HandleEpollEvent2)

epoll.update_events(client_sock, epoll_ids, SRT_EPOLL_ERR, true);

map<SRTSOCKET, int> events;
SRT_EPOLL_EVENT fds[1024];

int result = epoll.wait(epoll_id, events, -1);
ASSERT_EQ(result, 1);
map<SRTSOCKET, int>::iterator it = events.begin();
ASSERT_EQ(it->second, SRT_EPOLL_ERR);
int result = epoll.uwait(epoll_id, fds, 1024, -1, false);
ASSERT_EQ(result, 1);
ASSERT_EQ(fds[0].events, SRT_EPOLL_ERR);

try
{
Expand Down

0 comments on commit 14c2a91

Please sign in to comment.