-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTcpSpec.scala
164 lines (138 loc) · 4.81 KB
/
TcpSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package scalauv
import org.junit.Test
import org.junit.Assert.*
import LibUv.*
import scalanative.unsafe.*
import scalanative.unsigned.*
import scala.scalanative.libc.stdlib
import scala.scalanative.posix.netinet.*
import inOps.*
import scala.scalanative.posix.sys.socket
import scala.scalanative.posix.arpa.inet
final class TcpSpec {
import TcpSpec.*
@Test
def listen(): Unit = {
var runResult = 0
Zone {
val loop = stackalloc[Byte](uv_loop_size()).asInstanceOf[Loop]
uv_loop_init(loop).checkErrorThrowIO()
val port = 10000
val serverTcpHandle = TcpHandle.stackAllocate()
uv_tcp_init(loop, serverTcpHandle).checkErrorThrowIO()
val serverSocketAddress = stackalloc[in.sockaddr_in]()
serverSocketAddress.sin_family = socket.AF_INET.toUShort
serverSocketAddress.sin_port = inet.htons(port.toUShort)
serverSocketAddress.sin_addr.s_addr = in.INADDR_ANY
uv_tcp_bind(
serverTcpHandle,
serverSocketAddress.asInstanceOf[Ptr[socket.sockaddr]],
0.toUInt
)
.checkErrorThrowIO()
uv_listen(serverTcpHandle, 128, onNewConnection).checkErrorThrowIO()
val clientTcpHandle = TcpHandle.stackAllocate()
uv_tcp_init(loop, clientTcpHandle).checkErrorThrowIO()
val clientSocketAddress = stackalloc[in.sockaddr_in]()
clientSocketAddress.sin_family = socket.AF_INET.toUShort
clientSocketAddress.sin_port = inet.htons(port.toUShort)
Net.setLocalHostSocket4(clientSocketAddress)
val connectReq = ConnectReq.stackAllocate()
uv_tcp_connect(
connectReq,
clientTcpHandle,
clientSocketAddress.asInstanceOf[Ptr[socket.sockaddr]],
onConnect
).checkErrorThrowIO()
runResult = uv_run(loop, RunMode.DEFAULT).checkErrorThrowIO()
uv_loop_close(loop).checkErrorThrowIO()
}
assertEquals("runResult", 0, runResult)
assertEquals("failed", None, failed)
assertEquals("recievedData", text + text, receivedData.mkString)
}
}
object TcpSpec {
private val DoneMarker = '!'
private val text = "my country is the world, and my religion is to do good"
private var receivedData = Vector.empty[String]
private var failed = Option.empty[String]
def recordReceived(s: String): Unit = {
receivedData = receivedData :+ s
}
def setFailed(msg: String): Unit = {
failed = Some(msg)
}
def allocBuffer: AllocCallback = {
(handle: Handle, suggestedSize: CSize, buf: Buffer) =>
buf.mallocInit(suggestedSize)
}
def onNewConnection: ConnectionCallback = {
(handle: StreamHandle, status: ErrorCode) =>
val loop = uv_handle_get_loop(handle)
UvUtils.attemptCatch {
status.checkErrorThrowIO()
val clientTcpHandle = TcpHandle.malloc()
println("New connection")
uv_tcp_init(loop, clientTcpHandle)
.onFail(clientTcpHandle.free())
.checkErrorThrowIO()
UvUtils.onFail(uv_close(clientTcpHandle, onClose))
uv_handle_set_data(clientTcpHandle, handle.toPtr)
uv_accept(handle, clientTcpHandle).checkErrorThrowIO()
uv_read_start(clientTcpHandle, allocBuffer, onRead)
.checkErrorThrowIO()
()
} { exception =>
setFailed(exception.getMessage())
}
}
def onClose: CloseCallback = (_: Handle).free()
def onWrite: StreamWriteCallback = { (req: WriteReq, status: ErrorCode) =>
status.onFailMessage(setFailed)
val buf = Buffer.unsafeFromPtr(uv_req_get_data(req))
stdlib.free(buf.base)
buf.free()
req.free()
}
def onConnect: ConnectCallback = { (req: ConnectReq, status: ErrorCode) =>
status.onFailMessage(setFailed)
val stream = req.connectReqStreamHandle
def doWrite(text: String) = {
val writeReq = WriteReq.malloc()
val cText = mallocCString(text)
val buf = Buffer.malloc(cText, text.length.toCSize)
uv_req_set_data(writeReq, buf.toPtr)
uv_write(writeReq, stream, buf, 1.toUInt, onWrite).onFailMessage { s =>
stdlib.free(cText)
buf.free()
writeReq.free()
setFailed(s)
}
}
doWrite(text)
doWrite(text)
doWrite(DoneMarker.toString)
uv_close(stream, null)
()
}
def onRead: StreamReadCallback = {
(handle: StreamHandle, numRead: CSSize, buf: Buffer) =>
numRead.toInt match {
case ErrorCodes.EOF =>
uv_close(handle, onClose)
case code if code < 0 =>
uv_close(handle, onClose)
setFailed(UvUtils.errorMessage(code.toInt))
case _ =>
buf.length = numRead.toInt
val (text, done) = buf.asUtf8String.span(_ != DoneMarker)
recordReceived(text)
if done.nonEmpty then {
val listenHandle = Handle.unsafeFromPtr(uv_handle_get_data(handle))
uv_close(listenHandle, null)
}
}
stdlib.free(buf.base)
}
}