@@ -4,6 +4,9 @@ use std::path::PathBuf;
4
4
5
5
use futures:: { Async , AsyncSink , Poll , Sink , StartSend , Stream } ;
6
6
7
+ #[ cfg( feature = "unstable-futures" ) ]
8
+ use futures2:: { self , task} ;
9
+
7
10
use UnixDatagram ;
8
11
9
12
/// Encoding of frames via buffers.
@@ -72,14 +75,28 @@ impl<C: UnixDatagramCodec> Stream for UnixDatagramFramed<C> {
72
75
type Error = io:: Error ;
73
76
74
77
fn poll ( & mut self ) -> Poll < Option < C :: In > , io:: Error > {
75
- let ( n, addr) = try_nb ! ( self . socket. recv_from( & mut self . rd) ) ;
78
+ let ( n, addr) = try_ready ! ( self . socket. recv_from( & mut self . rd) ) ;
76
79
trace ! ( "received {} bytes, decoding" , n) ;
77
80
let frame = try!( self . codec . decode ( & addr, & self . rd [ ..n] ) ) ;
78
81
trace ! ( "frame decoded from buffer" ) ;
79
82
Ok ( Async :: Ready ( Some ( frame) ) )
80
83
}
81
84
}
82
85
86
+ #[ cfg( feature = "unstable-futures" ) ]
87
+ impl < C : UnixDatagramCodec > futures2:: Stream for UnixDatagramFramed < C > {
88
+ type Item = C :: In ;
89
+ type Error = io:: Error ;
90
+
91
+ fn poll_next ( & mut self , cx : & mut task:: Context ) -> futures2:: Poll < Option < C :: In > , io:: Error > {
92
+ let ( n, addr) = try_ready2 ! ( self . socket. recv_from2( cx, & mut self . rd) ) ;
93
+ trace ! ( "received {} bytes, decoding" , n) ;
94
+ let frame = try!( self . codec . decode ( & addr, & self . rd [ ..n] ) ) ;
95
+ trace ! ( "frame decoded from buffer" ) ;
96
+ Ok ( futures2:: Async :: Ready ( Some ( frame) ) )
97
+ }
98
+ }
99
+
83
100
impl < C : UnixDatagramCodec > Sink for UnixDatagramFramed < C > {
84
101
type SinkItem = C :: Out ;
85
102
type SinkError = io:: Error ;
@@ -104,7 +121,7 @@ impl<C: UnixDatagramCodec> Sink for UnixDatagramFramed<C> {
104
121
}
105
122
106
123
trace ! ( "writing; remaining={}" , self . wr. len( ) ) ;
107
- let n = try_nb ! ( self . socket. send_to( & self . wr, & self . out_addr) ) ;
124
+ let n = try_ready ! ( self . socket. send_to( & self . wr, & self . out_addr) ) ;
108
125
trace ! ( "written {}" , n) ;
109
126
let wrote_all = n == self . wr . len ( ) ;
110
127
self . wr . clear ( ) ;
@@ -124,6 +141,53 @@ impl<C: UnixDatagramCodec> Sink for UnixDatagramFramed<C> {
124
141
}
125
142
}
126
143
144
+ #[ cfg( feature = "unstable-futures" ) ]
145
+ impl < C : UnixDatagramCodec > futures2:: Sink for UnixDatagramFramed < C > {
146
+ type SinkItem = C :: Out ;
147
+ type SinkError = io:: Error ;
148
+
149
+ fn poll_ready ( & mut self , cx : & mut task:: Context ) -> futures2:: Poll < ( ) , io:: Error > {
150
+ if self . wr . len ( ) > 0 {
151
+ try!( self . poll_flush ( cx) ) ;
152
+ if self . wr . len ( ) > 0 {
153
+ return Ok ( futures2:: Async :: Pending ) ;
154
+ }
155
+ }
156
+ Ok ( ( ) . into ( ) )
157
+ }
158
+
159
+ fn start_send ( & mut self , item : C :: Out ) -> Result < ( ) , io:: Error > {
160
+ self . out_addr = try!( self . codec . encode ( item, & mut self . wr ) ) ;
161
+ Ok ( ( ) )
162
+ }
163
+
164
+ fn poll_flush ( & mut self , cx : & mut task:: Context ) -> futures2:: Poll < ( ) , io:: Error > {
165
+ trace ! ( "flushing framed transport" ) ;
166
+
167
+ if self . wr . is_empty ( ) {
168
+ return Ok ( futures2:: Async :: Ready ( ( ) ) ) ;
169
+ }
170
+
171
+ trace ! ( "writing; remaining={}" , self . wr. len( ) ) ;
172
+ let n = try_ready2 ! ( self . socket. send_to2( cx, & self . wr, & self . out_addr) ) ;
173
+ trace ! ( "written {}" , n) ;
174
+ let wrote_all = n == self . wr . len ( ) ;
175
+ self . wr . clear ( ) ;
176
+ if wrote_all {
177
+ Ok ( futures2:: Async :: Ready ( ( ) ) )
178
+ } else {
179
+ Err ( io:: Error :: new (
180
+ io:: ErrorKind :: Other ,
181
+ "failed to write entire datagram to socket" ,
182
+ ) )
183
+ }
184
+ }
185
+
186
+ fn poll_close ( & mut self , cx : & mut task:: Context ) -> futures2:: Poll < ( ) , io:: Error > {
187
+ self . poll_flush ( cx)
188
+ }
189
+ }
190
+
127
191
pub fn new < C : UnixDatagramCodec > ( socket : UnixDatagram , codec : C ) -> UnixDatagramFramed < C > {
128
192
UnixDatagramFramed {
129
193
socket : socket,
0 commit comments