Skip to content

Commit

Permalink
test: enhance test stability
Browse files Browse the repository at this point in the history
  • Loading branch information
tmkx committed Jan 30, 2024
1 parent 0c69744 commit 0029131
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 23 deletions.
32 changes: 19 additions & 13 deletions packages/messaging/src/core/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { setTimeout as sleep } from 'node:timers/promises';
import { expect, it, vi } from 'vitest';
import { Messaging, createMessaging, fromMessagePort } from '../index';
import { withResolvers } from '../utils';

function expectMessagingIsNotLeaked(messaging: Messaging) {
// @ts-expect-error
Expand All @@ -9,7 +10,7 @@ function expectMessagingIsNotLeaked(messaging: Messaging) {
expect(messaging.ongoingStreamObservers).toHaveLength(0);
}

it.concurrent('should on/off listener', async () => {
it('should on/off listener', async () => {
const { port1, port2 } = new MessageChannel();
const listenerFn = vi.fn();

Expand All @@ -26,7 +27,7 @@ it.concurrent('should on/off listener', async () => {
expectMessagingIsNotLeaked(receiver);
});

it.concurrent('should support request', async () => {
it('should support request', async () => {
const { port1, port2 } = new MessageChannel();

const receiver = createMessaging(fromMessagePort(port1), {
Expand All @@ -48,7 +49,7 @@ it.concurrent('should support request', async () => {
expectMessagingIsNotLeaked(sender);
});

it.concurrent('should support stream', async () => {
it('should support stream', async () => {
const { port1, port2 } = new MessageChannel();

const receiver = createMessaging(fromMessagePort(port1), {
Expand Down Expand Up @@ -99,9 +100,10 @@ it.concurrent('should support stream', async () => {
expectMessagingIsNotLeaked(sender);
});

it.concurrent('should support abort stream', async () => {
it('should support abort stream', async () => {
const { port1, port2 } = new MessageChannel();

const cleanResolver = withResolvers<void>();
const cleanupFn = vi.fn();
const receiver = createMessaging(fromMessagePort(port1), {
async onStream(message, subscriber) {
Expand All @@ -112,6 +114,7 @@ it.concurrent('should support abort stream', async () => {
return () => {
clearInterval(timer);
cleanupFn();
cleanResolver.resolve();
};
}
default:
Expand All @@ -126,28 +129,31 @@ it.concurrent('should support abort stream', async () => {
new Promise<unknown[]>((resolve, reject) => {
const result: unknown[] = [];
const unsubscribe = sender.stream(
{ name: 'hello', interval: 100 },
{ name: 'hello', interval: 10 },
{
next: (value) => result.push(value),
next: (value) => {
result.push(value);
if (result.length === 2) {
unsubscribe();
resolve(result);
}
},
error: (reason) => reject(reason),
complete: completeFn,
}
);
setTimeout(() => {
unsubscribe();
resolve(result);
}, 250);
})
).resolves.toEqual([0, 1]);
await sleep(10);
await cleanResolver.promise;
expect(cleanupFn).toBeCalled();
expect(completeFn).not.toBeCalled();

await sleep();
expectMessagingIsNotLeaked(receiver);
expectMessagingIsNotLeaked(sender);
});

it.concurrent('should support relay request', async () => {
it('should support relay request', async () => {
const { port1, port2 } = new MessageChannel();
const { port1: port3, port2: port4 } = new MessageChannel();

Expand Down Expand Up @@ -178,7 +184,7 @@ it.concurrent('should support relay request', async () => {
expectMessagingIsNotLeaked(sender);
});

it.concurrent('should support relay stream', async () => {
it('should support relay stream', async () => {
const { port1, port2 } = new MessageChannel();
const { port1: port3, port2: port4 } = new MessageChannel();

Expand Down
25 changes: 16 additions & 9 deletions packages/messaging/src/core/__tests__/trpc.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { setTimeout as sleep } from 'node:timers/promises';
import { createTRPCClient } from '@trpc/client';
import { initTRPC } from '@trpc/server';
import { afterEach, describe, expect, it, vi } from 'vitest';
import { z } from 'zod';
import { applyMessagingHandler, messagingLink } from '../trpc';
import { Messaging, fromMessagePort } from '../index';
import { observable } from '@trpc/server/observable';
import { sleep } from '@webx-kit/test-utils/shared';
import { withResolvers } from '../utils';

function expectMessagingIsNotLeaked(messaging: Messaging) {
Expand Down Expand Up @@ -142,10 +142,10 @@ describe('Basic', () => {
const onDataFn = vi.fn();
const onErrorFn = vi.fn();
const onStoppedFn = vi.fn();
const { promise, resolve } = withResolvers();
const { promise, resolve } = withResolvers<void>();
client.streamBasic.subscribe(undefined, {
onStarted: onStartedFn,
onComplete: () => resolve(null),
onComplete: resolve,
onData: onDataFn,
onError: onErrorFn,
onStopped: onStoppedFn,
Expand Down Expand Up @@ -175,7 +175,7 @@ describe('Basic', () => {
expect(onDataFn.mock.calls).toEqual([[1]]);
expect(onCompleteFn).not.toBeCalled();
expect(onStoppedFn).not.toBeCalled();
await sleep(30);
await sleep();
});

it('should support unsubscribe stream', async () => {
Expand All @@ -184,25 +184,32 @@ describe('Basic', () => {
const onDataFn = vi.fn();
const onErrorFn = vi.fn();
const onStoppedFn = vi.fn();
const unsubscribeResolver = withResolvers<void>();
const unsubscribe = client.streamInterval.subscribe(
{ from: 666, interval: 100 },
{ from: 666, interval: 10 },
{
onStarted: onStartedFn,
onComplete: onCompleteFn,
onData: onDataFn,
onData(value) {
onDataFn(value);
if (onDataFn.mock.calls.length === 2) {
unsubscribe.unsubscribe();
unsubscribeResolver.resolve();
}
},
onError: onErrorFn,
onStopped: onStoppedFn,
}
);
await sleep(250);
await unsubscribeResolver.promise;
expect(streamCleanupFn).not.toBeCalled();
unsubscribe.unsubscribe();
await sleep(30);
await sleep();
expect(onStartedFn).toBeCalledTimes(1);
expect(onCompleteFn).not.toBeCalled();
expect(onDataFn.mock.calls).toEqual([[666], [667]]);
expect(onErrorFn).not.toBeCalled();
expect(onStoppedFn).not.toBeCalled();
expect(streamCleanupFn).toBeCalledTimes(1);
await sleep();
});
});
2 changes: 1 addition & 1 deletion packages/test-utils/src/shared.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IncomingMessage, RequestListener, ServerResponse, createServer as createHttpServer } from 'node:http';
import { createServer as createNetServer } from 'node:net';

export function sleep(ms: number) {
export function sleep(ms = 0) {
return new Promise<void>((resolve) => setTimeout(resolve, ms));
}

Expand Down

0 comments on commit 0029131

Please sign in to comment.