-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathio-service.hpp
161 lines (148 loc) · 4.92 KB
/
io-service.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#pragma once
#include <coroutine>
#include <iostream>
#include <utility>
#include <queue>
#include <unordered_map>
#include <format>
#include <functional>
#include <list>
#include <sys/time.h>
#include <sys/timerfd.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/socket.h>
#include <sys/timerfd.h>
#include <sys/types.h>
#include "task.hpp"
#include "unistd.h"
class io_service
{
public:
using task_t = task<int>;
using handle_t = std::coroutine_handle<typename task_t::promise_type>;
using schedule_t = std::function<void(handle_t)>;
enum class poll_op : uint64_t
{
/// Poll for read operations.
read = EPOLLIN,
/// Poll for write operations.
write = EPOLLOUT,
/// Poll for read and write operations.
read_write = EPOLLIN | EPOLLOUT
};
struct poll_info
{
int fd;
handle_t h;
};
explicit io_service(schedule_t schedule)
: m_schedule(schedule),
m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)),
m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC))
{
epoll_event e{};
e.data.ptr = const_cast<void *>(m_timer_ptr);
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_timer_fd, &e);
std::cout << "m_epoll_fd = " << m_epoll_fd << std::endl;
std::cout << "m_timer_fd = " << m_timer_fd << std::endl;
}
io_service(const io_service &) = delete;
io_service(io_service &&) = delete;
auto operator=(const io_service &) -> io_service & = delete;
auto operator=(io_service &&) -> io_service & = delete;
struct fd_awaiter
{
bool await_ready() { return false; }
void await_suspend(handle_t h)
{
h = get_root_coro(h);
epoll_event e{};
e.events = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLRDHUP;
poll_info *pi = new poll_info{};
pi->fd = fd;
pi->h = h;
e.data.ptr = pi;
//std::cout << "register fd = " << fd << " for " << static_cast<uint32_t>(op) << std::endl;
if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e))
{
std::cerr << "epoll ctl error on fd " << fd << "\n";
if (errno == EEXIST) {
std::cerr << "err = eexist" << std::endl;
}
}
coro = h;
original_state = h.promise().last_await_state;
h.promise().last_await_state = await_state::no_schedule;
}
void await_resume()
{
coro.promise().last_await_state = original_state;
}
int m_epoll_fd;
int fd;
poll_op op;
handle_t coro;
await_state original_state;
};
auto wait_stdin()
{
return fd_awaiter{m_epoll_fd, STDIN_FILENO, poll_op::read};
}
auto wait_read(int fd)
{
// std::cout << "wait read " << fd << std::endl;
return fd_awaiter{m_epoll_fd, fd, poll_op::read};
}
auto wait_write(int fd)
{
return fd_awaiter{m_epoll_fd, fd, poll_op::write};
}
auto wait_read_write(int fd)
{
return fd_awaiter{m_epoll_fd, fd, poll_op::read_write};
}
task_t co_check_io()
{
co_yield "co_check_io";
co_await std::suspend_always{}; // stop at beginning
while (1)
{
// std::cout << "before epoll wait" << std::endl;
auto event_count = epoll_wait(m_epoll_fd, m_events.data(), m_max_events, -1);
if (event_count > 0)
{
// std::cout << "event count = " << event_count << std::endl;
for (std::size_t i = 0; i < static_cast<std::size_t>(event_count); ++i)
{
epoll_event &event = m_events[i];
void *data_ptr = event.data.ptr;
if (data_ptr == m_timer_ptr)
{
// Process all events that have timed out.
}
else if (data_ptr != nullptr)
{
auto pi = static_cast<poll_info *>(data_ptr);
// std::cout << "process data_ptr " << pi << std::endl;
int fd = pi->fd;
handle_t h = pi->h;
delete pi;
epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
//std::cout << "unregister fd = " << fd << std::endl;
m_schedule(h);
}
}
}
co_await std::suspend_always{}; // yield to let other corotines to run
}
}
private:
int m_epoll_fd;
int m_timer_fd;
static constexpr const int m_timer_object{0};
static constexpr const void *m_timer_ptr = &m_timer_object;
static const constexpr std::size_t m_max_events = 16;
std::array<struct epoll_event, m_max_events> m_events{};
schedule_t m_schedule;
};