Skip to content

Commit

Permalink
fix: handle blob message data from websocket (#3732)
Browse files Browse the repository at this point in the history
Co-authored-by: Tim <hello@timsmart.co>
  • Loading branch information
sukovanej and tim-smart authored Oct 6, 2024
1 parent 88e85db commit 8e94585
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 27 deletions.
5 changes: 5 additions & 0 deletions .changeset/poor-seals-search.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/platform": patch
---

Fix: handle `Blob` message data from a websocket.
37 changes: 19 additions & 18 deletions packages/experimental/test/Socket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as SocketServer from "@effect/experimental/SocketServer/Node"
import * as NodeSocket from "@effect/platform-node/NodeSocket"
import * as Socket from "@effect/platform/Socket"
import { assert, describe, expect, it } from "@effect/vitest"
import { Chunk, Effect, Fiber, Queue, Stream } from "effect"
import { Chunk, Effect, Queue, Stream } from "effect"
import WS from "vitest-websocket-mock"

const makeServer = Effect.gen(function*(_) {
Expand Down Expand Up @@ -53,30 +53,31 @@ describe("Socket", () => {
)

it.effect("messages", () =>
Effect.gen(function*(_) {
const server = yield* _(makeServer)
const socket = yield* _(Socket.makeWebSocket(Effect.succeed(url)))
const messages = yield* _(Queue.unbounded<Uint8Array>())
const fiber = yield* _(Effect.fork(socket.run((_) => messages.offer(_))))
yield* _(
Effect.gen(function*(_) {
const write = yield* _(socket.writer)
yield* _(write(new TextEncoder().encode("Hello")))
yield* _(write(new TextEncoder().encode("World")))
}),
Effect.scoped
)
yield* _(Effect.promise(async () => {
Effect.gen(function*() {
const server = yield* makeServer
const socket = yield* Socket.makeWebSocket(Effect.succeed(url))
const messages = yield* Queue.unbounded<Uint8Array>()
const fiber = yield* Effect.fork(socket.run((_) => messages.offer(_)))
yield* Effect.gen(function*() {
const write = yield* socket.writer
yield* write(new TextEncoder().encode("Hello"))
yield* write(new TextEncoder().encode("World"))
}).pipe(Effect.scoped)
yield* Effect.promise(async () => {
await expect(server).toReceiveMessage(new TextEncoder().encode("Hello"))
await expect(server).toReceiveMessage(new TextEncoder().encode("World"))
}))
})

server.send("Right back at you!")
const message = yield* _(messages.take)
let message = yield* messages.take
expect(message).toEqual(new TextEncoder().encode("Right back at you!"))

server.send(new Blob(["A Blob message"]))
message = yield* messages.take
expect(message).toEqual(new TextEncoder().encode("A Blob message"))

server.close()
const exit = yield* _(Fiber.join(fiber), Effect.exit)
const exit = yield* fiber.await
expect(exit._tag).toEqual("Success")
}).pipe(
Effect.scoped,
Expand Down
19 changes: 10 additions & 9 deletions packages/platform/src/Socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,13 +407,15 @@ export const fromWebSocket = <R>(
let open = false

function onMessage(event: MessageEvent) {
const result = handler(
typeof event.data === "string"
? event.data
: event.data instanceof Uint8Array
? event.data
: new Uint8Array(event.data)
)
if (event.data instanceof Blob) {
return Effect.promise(() =>
event.data.arrayBuffer() as Promise<ArrayBuffer>
).pipe(
Effect.andThen((buffer) => handler(new Uint8Array(buffer))),
run
)
}
const result = handler(event.data)
if (Effect.isEffect(result)) {
run(result)
}
Expand Down Expand Up @@ -476,8 +478,7 @@ export const fromWebSocket = <R>(
})
}) :
Effect.try({
try: () =>
ws.send(chunk),
try: () => ws.send(chunk),
catch: (cause) => new SocketGenericError({ reason: "Write", cause })
})
),
Expand Down

0 comments on commit 8e94585

Please sign in to comment.