-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathaio.pyx
307 lines (270 loc) · 8.98 KB
/
aio.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# Copyright (c) 2012 Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Interface to aio I/O event notification facility.
"""
#read setup.py for build details
#TODO document methods and classes
#TODO comment why i did stuff the way i did
#TODO we still aren't actually getting the data out of the objects
#what works:
# basic infrastructure is put together, it compiles and loads in python
#what doesn't work:
# not functional for any real work at this point
from libc.stdio cimport const_char
from libc cimport errno
from posix.unistd cimport off_t
from cpython cimport mem
import time #FIXME use native time calls in the future
cdef extern from "errno.h":
#function not defined in libc.errno
cdef extern char *strerror(int)
cdef extern from "Python.h":
ctypedef struct PyObject
ctypedef struct PyThreadState
cdef extern PyThreadState *PyEval_SaveThread()
cdef extern void PyEval_RestoreThread(PyThreadState*)
cdef extern from "sys/time.h" nogil:
cdef struct timeval:
long tv_sec
long tv_usec
cdef extern from "sys/aio.h" nogil:
ctypedef struct aio_result_t:
ssize_t aio_return # return value of read or write
int aio_errno # errno generated by the IO
enum: AIOREAD
enum: AIOWRITE
enum: AIOWAIT
enum: AIOCANCEL
enum: AIONOTIFY
enum: AIOINIT
enum: AIOSTART
enum: AIOLIO
enum: AIOSUSPEND
enum: AIOERROR
enum: AIOLIOWAIT
enum: AIOAREAD
enum: AIOAWRITE
enum: AIOFSYNC
enum: AIOWAITN
enum: AIORESERVED1
enum: AIORESERVED2
enum: AIORESERVED3
cdef extern from "sys/asynch.h" nogil:
enum: AIO_INPROGRESS #we can use this as a marker
#TODO throw exception
cdef int aioread(
int fildes,
char* bufp,
int bufs,
off_t offset,
int whence,
aio_result_t* resultp
)
#TODO throw exception
cdef int aiowrite(
int fildes,
const_char* bufp,
int bufs,
off_t offset,
int whence,
aio_result_t* resultp
)
#TODO throw exception
cdef aio_result_t *aiowait(timeval *)
#user defined structure
cdef extern struct _state_t:
aio_result_t aiores #leave this here so we can cast!!!
PyObject* this #convience so we can get to the reader or writer
int status
int fd
ctypedef _state_t reactor_state_t
cdef class aio(object):
"""handles the low level memory management"""
cdef reactor_state_t* fdio
#reader support
cdef char* _buffer
cdef int len_buffer
cdef bint _ready
cdef char* data
cdef int _whence
cdef int _offset
def __cinit__(self):
self.fdio = <reactor_state_t*>mem.PyMem_Malloc(
sizeof(reactor_state_t)
)
if self.fdio is NULL:
raise MemoryError()
#this is so we can look up self in aio.wait
self.fdio.this = <PyObject *>self
#TODO ^figure out if we need to reference count that
#reader support
self._whence = 0
self._offset = 0
self.len_buffer = 0
self._buffer = NULL
def __dealloc__(self):
if self.fdio is not NULL:
mem.PyMem_Free(self.fdio)
#reader support
if self._buffer is not NULL:
mem.PyMem_Free(self._buffer)
def __init__(self, int fd):
self.fdio.fd = fd
self.fdio.status = AIO_READY
d = str()
self.data = d #this is your data
def fileno(self):
return self.fdio.fd
def seek(self, int offset, int whence=0):
"""
seek(offset[, whence]) -> None. Move to new file position.
Argument offset is a byte count. Optional argument whence defaults to
0 (offset from start of file, offset should be >= 0); other values are 1
(move relative to current position, positive or negative), and 2 (move
relative to end of file, usually negative, although many platforms allow
seeking beyond the end of a file). If the file is opened in text mode,
only offsets returned by tell() are legal. Use of other offsets causes
undefined behavior.
Note that not all file objects are seekable.
"""
self._offset = offset
self._whence = whence
def tell(self):
return self._offset
cdef int write_dispatch(self, char* data) except -1:
#set a marker so we can check it later
self.fdio.status = AIO_WRITING
return aiowrite(
self.fdio.fd,
data,
len(data),
self._offset
self._whence
&self.fdio.aiores
)
def read(self, int size):
if self.fdio.status == READING_READY:
result = self.fdio.aiores.aio_return
if result == -1:
err = self.fdio.aiores.aio_errno
raise IOError(err, strerror(err))
data = <bytes>self._buffer
#prepare the buffer for the next use.
self._buffer = <char *>mem.PyMem_Realloc(
self._buffer, size
)
#mark as ready for next operation
self.fdio.status = AIO_READY
return data[0:result]
elif self.fdio.status == AIO_READY:
#set a marker so we can check it later
if self._buffer is NULL:
self._buffer = <char *>mem.PyMem_Malloc(
size
)
if self._buffer is NULL:
raise MemoryError()
x = aioread(
self.fdio.fd,
self._buffer,
size,
self._offset
self._whence
&self.fdio.aiores
)
if x == -1:
raise IOError(
errno.errno,
strerror(errno.errno)
)
self.fdio.status = AIO_READING
#should probably raise something else
raise IOError(errno.EAGAIN, strerror(errno.EAGAIN))
def write(self, char* data):
if self.fdio.status == WRITING_READY:
result = self.fdio.aiores.aio_return
if result == -1:
err = self.fdio.aiores.aio_errno
raise IOError(err, strerror(err))
self.fdio.status = AIO_READY
return result
elif self.fdio.status == AIO_READY:
try: self.write_dispatch(data)
except:
raise IOError(
errno.errno,
strerror(errno.errno)
)
raise IOError(errno.EAGAIN, strerror(errno.EAGAIN))
def wait(int seconds, int useconds=0):
"""
Wait for an I/O event, wrap aiowait(3C).
@type timeout: C{int} or C{float}
@param timeout: Maximum time waiting for events. 0 makes it wait
indefinitely. Don't use 0 for your timeout!!!
@raise IOError: Raised if the underlying aiowait() call fails.
"""
cdef reactor_state_t* result
cdef aio_result_t* aioptr
cdef timeval val
val.tv_sec = int(seconds)
val.tv_usec = int(useconds)
#collect results
reading = []
writing = []
future = time.time() + seconds
#we can only dequeue one item at a time
while future > time.time():
#de-queue one item
aioptr = aiowait(&val)
if errno.errno == errno.EFAULT:
#`timeout` points to an address outside the
#address space of the requesting process.
break
if errno.errno == errno.EINTR:
#aiowait() was interrupted by a signal.
#we should probably break and let the
#reactor deal with it.
break
if errno.errno == errno.EINVAL:
#There are no outstanding asynchronous I/O requests
break
#cast the aio_result_t to reactor_state_t
result = <reactor_state_t*>aioptr
if result is NULL: continue
#push/pull data from the original reader/writer object
if result.this is NULL: continue
x = <object>result.this
if result.status == AIO_READING: reading.append(x)
if result.status == AIO_WRITING: writing.append(x)
#to help the read/write methods
result.status = result.status & AIO_READY
return (reading, writing)
#enumerated values mapped to python values
AIO_READ = AIOREAD
AIO_WRITE = AIOWRITE
AIO_WAIT = AIOWAIT
AIO_CANCEL = AIOCANCEL
AIO_NOTIFY = AIONOTIFY
AIO_INIT = AIOINIT
AIO_START = AIOSTART
AIO_LIO = AIOLIO
AIO_SUSPEND = AIOSUSPEND
AIO_ERROR = AIOERROR
AIO_LIOWAIT = AIOLIOWAIT
AIO_AREAD = AIOAREAD
AIO_AWRITE = AIOAWRITE
AIO_FSYNC = AIOFSYNC
AIO_WAITN = AIOWAITN
#user defined
AIO_READING = AIORESERVED1
AIO_WRITING = AIORESERVED2
AIO_READY = AIORESERVED3
READING_READY = AIO_READING & AIO_READY
WRITING_READY = AIO_WRITING & AIO_READY
__all__ = ['aio', 'wait', 'AIO_READ', 'AIO_WRITE', 'AIO_WAIT',
'AIO_CANCEL', 'AIO_NOTIFY', 'AIO_INIT', 'AIO_START', 'AIO_LIO', 'AIO_SUSPEND',
'AIO_ERROR', 'AIO_LIOWAIT', 'AIO_AREAD', 'AIO_AWRITE', 'AIO_FSYNC', 'AIO_WAITN',
'AIO_RESERVED1', 'AIO_RESERVED2', 'AIO_RESERVED3'
]