Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new srt_epoll_wait2 function with trigger mode available #626

Closed
wants to merge 23 commits into from

Conversation

thomasjammet
Copy link
Contributor

@thomasjammet thomasjammet commented Mar 26, 2019

Add a new CEPoll::wait optimized for performance and with edge trigger mode available.

This new version of CEpoll::wait() implements both edge & level trigger modes of epoll. The edge trigger mode allow to wait only for the changes of the event states listened.
In addition it divides the events in SRT_EPOLL_IN, SRT_EPOLL_OUT and SRT_EPOLL_ERR as epoll does and now return the total number of events concerned.

Usage example :

#define MAXEVENTS  1024
::SRT_EPOLL_EVENT events[MAXEVENTS];
int epoll = ::srt_epoll_create();
int result(0);
for (;;)
{
    result = srt_epoll_uwait(_epoll, &events[0], MAXEVENTS, -1, true);
    if (result>MAXEVENTS) // inform that there are still events waiting to be picked up
        result = MAXEVENTS;

    for(int i=0;i<result;++i)
    {
        SRT_EPOLL_EVENT& event(events[i]);
        // SRTSOCKET => any combination of SRT_EPOLL_IN, SRT_EPOLL_OUT, SRT_EPOLL_ERR
        printf("%d => %u\n", event.fd, event.events);
    }
}

@ethouris
Copy link
Collaborator

Please explain in details what you have changed in the current functionality, what the new functionality brings, and the API usage for the new API function.

Copy link
Collaborator

@maxsharabayko maxsharabayko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit tests for the new functions. Check out the existing epoll tests.
A description of the changes, like @ethouris has already asked about, is also required.

srtcore/srt.h Outdated Show resolved Hide resolved
srtcore/epoll.cpp Outdated Show resolved Hide resolved
srtcore/epoll.cpp Outdated Show resolved Hide resolved
@maxsharabayko maxsharabayko added the [core] Area: Changes in SRT library core label Mar 27, 2019
…uments which are using spaces)

- srt.h: small correction of the operator&(int, SRT_EPOLL_OPT) return condition, more readable
@ethouris
Copy link
Collaborator

ethouris commented Mar 27, 2019

Let me enumerate some things that need fixing so that any review can be reliable:

  1. It's completely unclear to me as to why you need two separate fields - *Set and *Wait. Stating that you have replaced the previous implementation with sets for single purpose, there are two of them: subscribers and eventsinks. Stated that you have grabbed them under one container, which collects under one socket key the subscriber and eventsink, I don't get the purpose of the *Set container. Might be that the name has been selected, which barely describes the purpose. At least I need some reasonable description of the internals, enough if provided as comments. It would be also nice if you avoid std::pair where a structure of two fields with reasonable names would help better understanding the code.

  2. Please note that the library itself must still keep C++03 compatibility. Maybe this will be changed in the future, but for now it's still in force.

  3. Please align to the convention used in this code, in particular:

  • Open brace in a new row
  • Only space indentation
  • Indent in new functions is 4 spaces. The old UDT code was using 3 spaces, so functions with large modifications better be reindented as a whole with 4 spaces.
  • Spaces need to be used around infix operators and after keywords.

And please also avoid assignment in conditional expressions. This doesn't make the code any better, and it worsens readability.

srtcore/core.h Outdated Show resolved Hide resolved
srtcore/epoll.cpp Outdated Show resolved Hide resolved
srtcore/epoll.cpp Outdated Show resolved Hide resolved
srtcore/epoll.h Show resolved Hide resolved
srtcore/epoll.h Outdated Show resolved Hide resolved
- add an exception in epoll_wait2 if the fdsSet is NULL and timeout is infinite
- change "trigger" to "edge" in the variable names (it is the epoll edge trigger mode, as opposed to level trigger mode)
- moved epoll_wait2 code to CUDT::epoll_wait because exception must not be thrown there
- correction of a warning in srtcore/api.cpp
@thomasjammet
Copy link
Contributor Author

Please add unit tests for the new functions. Check out the existing epoll tests.
A description of the changes, like @ethouris has already asked about, is also required.

You will find the unit tests in the commit 6ae572f

srtcore/epoll.cpp Outdated Show resolved Hide resolved
srtcore/srt.h Outdated Show resolved Hide resolved
@maxsharabayko maxsharabayko added this to the v.1.3.3 milestone Mar 28, 2019
srtcore/epoll.cpp Outdated Show resolved Hide resolved
Copy link
Collaborator

@maxsharabayko maxsharabayko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API-functions.md states:

In the edge-triggered mode the function only returns socket states which have changed since last call.

So I expect the following test to succeed when the epoll state of the socket is changed from false to true and back. See stage 3. Or are those changes intentionally not signaled?

HandleEpollEvent2EdgeTrue2False (click to expand)

TEST(CEPoll, HandleEpollEvent2EdgeTrue2False)
{
    ASSERT_EQ(srt_startup(), 0);

    SRTSOCKET client_sock = srt_socket(AF_INET, SOCK_DGRAM, 0);
    EXPECT_NE(client_sock, SRT_ERROR);

    CEPoll epoll;
    const int epoll_id = epoll.create();
    ASSERT_GE(epoll_id, 0);

    // 1. Add socket to epoll
    const int epoll_out = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
    ASSERT_NE(epoll.add_usock(epoll_id, client_sock, &epoll_out), SRT_ERROR);

    SRT_EPOLL_EVENT fds[1024];
    int result = SRT_ERROR;
    // 2. The state is not signaled, so the edge-type triggering should not succeed,
    // and should return by timeout instead.
    bool exception_happened = false;
    try
    {
        result = epoll.uwait(epoll_id, fds, 1024, 150, true);
    }
    catch (const CUDTException &e)
    {
        exception_happened = true;
        EXPECT_EQ(e.getErrorCode(), MJ_AGAIN * 1000 + MN_XMTIMEOUT);
    }
    ASSERT_EQ(exception_happened, true);

    // 3. Update event on the socket.
    set<int> epoll_ids = { epoll_id };
    epoll.update_events(client_sock, epoll_ids, SRT_EPOLL_OUT, true);
    epoll.update_events(client_sock, epoll_ids, SRT_EPOLL_OUT, false);

    // 4. The state is changed, so we should catch it with wait.
    result = epoll.uwait(epoll_id, fds, 1024, 150, true);
    ASSERT_EQ(result, 1);
    ASSERT_EQ(fds[0].events, SRT_EPOLL_OUT);

    try
    {
        EXPECT_EQ(epoll.remove_usock(epoll_id, client_sock), 0);
    }
    catch (CUDTException &ex)
    {
        cerr << ex.getErrorMessage() << endl;
        EXPECT_EQ(0, 1);
    }

    try
    {
        EXPECT_EQ(epoll.release(epoll_id), 0);
    }
    catch (CUDTException &ex)
    {
        cerr << ex.getErrorMessage() << endl;
        EXPECT_EQ(0, 1);
    }

    EXPECT_EQ(srt_cleanup(), 0);
}

Copy link
Collaborator

@maxsharabayko maxsharabayko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following test fails on stage 6, that does not look well together with the behavior on state 2. Please comment.

TEST: HandleEpollEvent2Edge (click to expand)

TEST(CEPoll, HandleEpollEvent2Edge)
{
    ASSERT_EQ(srt_startup(), 0);

    SRTSOCKET client_sock = srt_socket(AF_INET, SOCK_DGRAM, 0);
    EXPECT_NE(client_sock, SRT_ERROR);

    CEPoll epoll;
    const int epoll_id = epoll.create();
    ASSERT_GE(epoll_id, 0);

    // 1. Add socket to epoll
    const int epoll_out = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
    ASSERT_NE(epoll.add_usock(epoll_id, client_sock, &epoll_out), SRT_ERROR);

    SRT_EPOLL_EVENT fds[1024];
    int result = SRT_ERROR;
    // 2. The state is not signaled, so the edge-type triggering should not succeed,
    // and should return by timeout instead.
    bool exception_happened = false;
    try
    {
        result = epoll.uwait(epoll_id, fds, 1024, 150, true);
    }
    catch (const CUDTException &e)
    {
        exception_happened = true;
        EXPECT_EQ(e.getErrorCode(), MJ_AGAIN * 1000 + MN_XMTIMEOUT);
    }
    ASSERT_EQ(exception_happened, true);

    // 3. Update event on the socket.
    set<int> epoll_ids = { epoll_id };
    epoll.update_events(client_sock, epoll_ids, SRT_EPOLL_OUT, true);

    // 4. The state is changed, so we should catch it with wait.
    result = epoll.uwait(epoll_id, fds, 1024, 150, true);
    ASSERT_EQ(result, 1);
    ASSERT_EQ(fds[0].events, SRT_EPOLL_OUT);

    // 5. Add the socket back to epoll, because it was deleted by edge-triggering.
    ASSERT_NE(epoll.add_usock(epoll_id, client_sock, &epoll_out), SRT_ERROR);

    // 6. Now the state remains signaled, so the edge-type triggering should not succeed,
    // and return by timeout.
    exception_happened = false;
    try
    {
        result = epoll.uwait(epoll_id, fds, 1024, 150, true);
    }
    catch (const CUDTException &e)
    {
        exception_happened = true;
        EXPECT_EQ(e.getErrorCode(), MJ_AGAIN * 1000 + MN_XMTIMEOUT);
    }
    ASSERT_EQ(exception_happened, true);

    // Change state to false. Should trigger the edge-wait.
    epoll.update_events(client_sock, epoll_ids, SRT_EPOLL_OUT, false);

    result = epoll.uwait(epoll_id, fds, 1024, 150, true);
    ASSERT_EQ(result, 1);
    ASSERT_EQ(fds[0].events, SRT_EPOLL_OUT);

    try
    {
        EXPECT_EQ(epoll.remove_usock(epoll_id, client_sock), 0);
    }
    catch (CUDTException &ex)
    {
        cerr << ex.getErrorMessage() << endl;
        EXPECT_EQ(0, 1);
    }

    try
    {
        EXPECT_EQ(epoll.release(epoll_id), 0);
    }
    catch (CUDTException &ex)
    {
        cerr << ex.getErrorMessage() << endl;
        EXPECT_EQ(0, 1);
    }

    EXPECT_EQ(srt_cleanup(), 0);
}

Copy link
Collaborator

@maxsharabayko maxsharabayko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've missed that that there is a separate PR #608 on SRTO_IPV6ONLY. Then this PR is blocked untill PR #608 is merged. Can you please move the added description of SRTO_IPV6ONLY in API.md to PR #608 instead?

@thomasjammet
Copy link
Contributor Author

Please tell in the description of this PR that a new socket option SRTO_IPV6ONLY is added. Might be better to move this change to a separate PR.

I've missed that that there is a separate PR #608 on SRTO_IPV6ONLY. Then this PR is blocked untill PR #608 is merged. Can you please move the added description of SRTO_IPV6ONLY in API.md to PR #608 instead?

Ok I have updated API.md in the PR #608

small correction of SRTO_IPV6ONLY option description, reported from PR Haivision#608
@maxsharabayko maxsharabayko self-requested a review May 29, 2019 09:37
@maxsharabayko maxsharabayko modified the milestones: v.1.3.3, v.1.3.4 May 29, 2019
@ethouris
Copy link
Collaborator

ethouris commented Aug 5, 2019

Let me ask a question: is then the "edge-triggered mode" such a mode that starts waiting in some given mode for every socket (some may be ready, some others not ready), this state is persistent between the calls, and the function exits only if a status on any of the socket is different than the previous persistent state?

Example: I state sockets are not ready, so I subscribe two sockets: 10 and 11

[10] no
[11] no

Then I do this wait for edge, I get readiness on 10

[10] yes
[11] no

I handle this case and enter waiting function again with this state. Then:

  • if my handling of [10] has turned it into a non-ready state, does the function exit immediately because it's different than in the state structure?
  • how do I get which exactly socket has changed the state, as the change might be to yes or to no?

@thomasjammet
Copy link
Contributor Author

Let me ask a question: is then the "edge-triggered mode" such a mode that starts waiting in some given mode for every socket (some may be ready, some others not ready), this state is persistent between the calls, and the function exits only if a status on any of the socket is different than the previous persistent state?

Yes exactly.

Example: I state sockets are not ready, so I subscribe two sockets: 10 and 11

[10] no
[11] no

Then I do this wait for edge, I get readiness on 10

[10] yes
[11] no

I handle this case and enter waiting function again with this state. Then:

* if my handling of [10] has turned it into a non-ready state, does the function exit immediately because it's different than in the state structure?

If there is no event handled (SRT_EPOLL_EVENT_.events == 0) the function does not return immediately, there is nothing to do, if there was an event it has been catched.

* how do I get which exactly socket has changed the state, as the change might be to yes or to no?

The SRTSOCKET is returned in the SRT_EPOLL_EVENT_ here in the fd property.
There is an example of implementation that you can find in MonaServer2 in the file IOSRTSocket.cpp which demonstrates how to retrieve the SRTSOCKET and use it to find a custom mapped instance related to it.

@ethouris
Copy link
Collaborator

ethouris commented Aug 6, 2019

Ok, maybe different way.

Do I understand it correctly that "edge triggered" only means that upon returning from the waiting function the actual ready sockets are reported in the SRT_EPOLL_EVENT structure, but the readiness state is already cleared?

If that's the case, your function is unfortunately not universal enough. In the branch where I'm working on redundancy, I found last time a case that I need to report a second link to be attached to the group. This is not reported by usual write-readiness on the connecting group because the group is considered connected when at least one link is active. As I need this event anyway, I introduced a new event type. The problem is that in my waiting function that is used in the receiving function for groups I need to get this event, but this one I need to be "edge triggered" (gets cleared just after at least one waiting function has reported it), whereas all connected sockets from the group I need to get the current (not triggered) readiness state.

In my version of the waiting function I simply clear the event if this was this special one, whereas normal events are unchanged (can only be updated). Not the best solution, but works.

Therefore for that case I need that whether this is to be edge-triggered or persistent state I need to decide during subscription, not during waiting.

Moreover, if I'm not mistaken, the same rule is used in libevent and kevent systems.

@ethouris
Copy link
Collaborator

ethouris commented Aug 6, 2019

One more question to the code: in CEPoll::update_events, why in case of clearing are you using the insert-and-delete check instead of simply searching for the element and update+possibly-remove it?

I mean: won't this code do functionally exactly the same thing, and even slightly faster?

            map<SRTSOCKET, int>::iterator it = p->second.m_sUDTSocksSet.find(uid);
            if (it != p->second.m_sUDTSocksSet.end())
            {
                it->second &= ~changes;
                if (!it->second)  // all bits are now set to 0 => remove SocksSet!
                    p->second.m_sUDTSocksSet.erase(it);
            }

int CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut, bool edgeMode)
{
int64_t entertime = CTimer::getTime();
if (!fdsSet && fdsSize)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that zero size is allowed if you pass NULL fdsSet? What about negative size, stating that the type is int? Is nonempty array officially allowed and what sense does it make?

map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
if (p->second.m_sUDTSocksWait.empty() && p->second.m_sLocals.empty() && (msTimeOut < 0))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provided that the uwait version doesn't exit on checking readiness on system sockets, shouldn't the m_sLocals be even required to be empty?

@ethouris
Copy link
Collaborator

ethouris commented Aug 6, 2019

As I analyze this code, guys, one more idea.

Instead of the m_sUDTSockSet field, you should use a list<pair<SRTSOCKET, int>> with events. And whenever an item is inserted into this list (just normally at the end, ordering doesn't matter here), it must be updated in the m_SUDTSocksWait field, which will contain, as a field, a list iterator. This iterator will point to the state element in m_sUDTSockSet, if any exists, otherwise it will point to m_sUDTSockSet.end(). What you gain from it:

  1. In CEPoll::uwait you'll iterate over a list, not over a set, which is more efficient (set is described as less efficient iteration and more efficient searching).
  2. In all places where any search for an element by key uid (SRT socket ID) happens, it happens in both m_sUDTSocksWait and m_sUDTSockSet. This happens in update_usock and update_events. So a possibility to search for a socket in m_sUDTSockSet if you already have it found in ...Wait and get access to the item's node by the list iterator, you have it found faster.
  3. There is not possible state that the socket key exists in ...Set, but not in ...Wait, but it may exist only in ...Wait. So it must simply ensure that whenever you need to remove a socket from ...Set, the iterator field in ...Wait must be updated to "end" to mark that it doesn't have a corresponding object in ...Set.

By connecting it with the need of specifying edge option in the subscription, it needs first of all to define the value type for m_sUDTSocksWait map as more-less:

struct Wait
{
    int watch;
    int state;
    list< pair<SRTSOCKET, int> >::iterator setit;
    bool edge;
};

map<SRTSOCKET, Wait> m_sUDTSocksWait;
list< pair<SRTSOCKET, int> > m_sUDTSocksSet;

This way, when you have the state update into readiness, you add the socket-event pair to the list and sets its iterator to Wait::setit. When the change was about removing readiness, you simply erase the object from the list and set setit to list's end().

[EDIT]: It turns out that the m_sUDTSocksSet field should contain additionally the pointer to the Wait structure. With this the iterator to be updated when removing the event notice item can be updated here.

Please tell me what you think. I'm not sure about this what I write because I may still miss something in my analysis.

@ethouris
Copy link
Collaborator

ethouris commented Aug 8, 2019

Alright, @MathieuPOUX please review when you can. I have made an alternative implementation basing on the above description: my branch dev-epoll-uwait.

Differences in API:

  1. The edge-triggered is specified in the subscribing function. Persistent is default, use e.g. srt_epoll_add_usock_edge in order to subscribe for edge-triggered event.
  2. The waiting function therefore doesn't specify edge-trigger. The edge-trigger functionality is also added now to both wait and uwait, and these functions now differ only in the way how they report the changes.

Implementation:

  1. The object representing the state, possibly with getting it cleared due to edge trigger, is now a list.
  2. Whenever there's a need that the object be created, a new item is added to the list and its iterator is remembered in the watch object. The state of no attached notice object is represented by the list's end() iterator.
  3. When the state is cleared, the corresponding object is removed from the list and its associated iterator field reset to end().
  4. For edge trigger there's another bitset used in the watch container's item. After wait report, only those state fields are removed that were subscribed as edge, otherwise they stay there. This check is done in both waiting functions.

It is expected also performance improvements (tho I could not fully test it yet):

  1. Iteration on a list is better performance than on a set
  2. The previous implementation was using search for a subscribed socket in the watch map and in the notice set, so twice. This implementation searches only in the watch map, and the associated notice object is accessed directly from there.

The notice object has also a back pointer to its watch item. This is considered safe enough, as there cannot exist a notice object without a watch object, while there can only exist a watch object without associated notice object. The consistency is ensured through forbidding external access to the containers directly, and updating only through internal functions that guard consistency.

@maxsharabayko
Copy link
Collaborator

  • A patch to remove unused UDT macro definitions.
From eeb4b96b830ad1d2a50b7415ac1f61f3f05d7be4 Mon Sep 17 00:00:00 2001
From: Maxim Sharabayko <maxlovic@gmail.com>
Date: Tue, 13 Aug 2019 12:37:11 +0200
Subject: [PATCH] [core] Removed unused macro definitions

---
 srtcore/udt.h | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/srtcore/udt.h b/srtcore/udt.h
index dc41e5d..1e694db 100644
--- a/srtcore/udt.h
+++ b/srtcore/udt.h
@@ -115,10 +115,6 @@ modified by
 // This facility is used only for select() function.
 // This is considered obsolete and the epoll() functionality rather should be used.
 typedef std::set<SRTSOCKET> ud_set;
-#define UD_CLR(u, uset) ((uset)->erase(u))
-#define UD_ISSET(u, uset) ((uset)->find(u) != (uset)->end())
-#define UD_SET(u, uset) ((uset)->insert(u))
-#define UD_ZERO(uset) ((uset)->clear())
 #endif
 
 ////////////////////////////////////////////////////////////////////////////////
-- 
2.21.0.windows.1


@thomasjammet
Copy link
Contributor Author

Alright, @MathieuPOUX please review when you can. I have made an alternative implementation basing on the above description: my branch dev-epoll-uwait.

Differences in API:

1. The edge-triggered is specified in the subscribing function. Persistent is default, use e.g. `srt_epoll_add_usock_edge` in order to subscribe for edge-triggered event.

2. The waiting function therefore doesn't specify edge-trigger. The edge-trigger functionality is also added now to both `wait` and `uwait`, and these functions now differ only in the way how they report the changes.

Implementation:

1. The object representing the state, possibly with getting it cleared due to edge trigger, is now a list.

2. Whenever there's a need that the object be created, a new item is added to the list and its iterator is remembered in the watch object. The state of no attached notice object is represented by the list's end() iterator.

3. When the state is cleared, the corresponding object is removed from the list and its associated iterator field reset to end().

4. For edge trigger there's another bitset used in the watch container's item. After wait report, only those state fields are removed that were subscribed as edge, otherwise they stay there. This check is done in both waiting functions.

It is expected also performance improvements (tho I could not fully test it yet):

1. Iteration on a list is better performance than on a set

2. The previous implementation was using search for a subscribed socket in the watch map and in the notice set, so twice. This implementation searches only in the watch map, and the associated notice object is accessed directly from there.

The notice object has also a back pointer to its watch item. This is considered safe enough, as there cannot exist a notice object without a watch object, while there can only exist a watch object without associated notice object. The consistency is ensured through forbidding external access to the containers directly, and updating only through internal functions that guard consistency.

I think the dev-epoll-uwait version is correct now, I don't know if you have to apply the patch mentioned by Max above.

@ethouris
Copy link
Collaborator

Moved to #832

@ethouris
Copy link
Collaborator

Closed as the replacement PR has been merged.

@ethouris ethouris closed this Sep 13, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
[core] Area: Changes in SRT library core Priority: High Type: Enhancement Indicates new feature requests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants