Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delegate control to process queue. #313

Merged
merged 23 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8efe6f0
Reduce LoadProgress object type.
i-zolotarenko Nov 9, 2023
f7225e6
Create microtask for process queue.
i-zolotarenko Nov 9, 2023
7408c87
Add event dispatcher class.
i-zolotarenko Nov 13, 2023
e019870
Create Request class.
i-zolotarenko Nov 13, 2023
eb1ba54
Move Request file to separate file.
i-zolotarenko Nov 13, 2023
51fc027
Change style of bittorrent-tracker declarations event handlers types.
i-zolotarenko Nov 13, 2023
bab7d15
Create P2PTrackerClient class to encapsulate peer connection logic.
i-zolotarenko Nov 14, 2023
0e4e204
Add TODO task.
i-zolotarenko Nov 14, 2023
3247e01
Merge with remote
i-zolotarenko Nov 14, 2023
92021d3
Add requests error handling.
i-zolotarenko Nov 15, 2023
3b4dbdd
Fix types errors.
i-zolotarenko Nov 16, 2023
5d5865b
P2P announce segments on http requests state change.
i-zolotarenko Nov 16, 2023
0d8c3bc
Fix bugs.
i-zolotarenko Nov 16, 2023
6475ce8
Make broadcastAnnouncement method as function expression.
i-zolotarenko Nov 16, 2023
e354c72
Refactor Request code. Create separate flows for each type of abort.
i-zolotarenko Nov 27, 2023
c608079
Remove event subscriptions to request instance.
i-zolotarenko Nov 27, 2023
15f8358
Fix type errors.
i-zolotarenko Nov 27, 2023
bb4f57a
Fix bugs.
i-zolotarenko Nov 27, 2023
2275bd0
Fix lint warnings.
i-zolotarenko Nov 27, 2023
fec7ce4
Move P2P tracker client to separate file.
i-zolotarenko Nov 27, 2023
1a08f15
Fix more bugs. Rewrite loggers.
i-zolotarenko Nov 28, 2023
1ea58f0
Remove unused code.
i-zolotarenko Nov 28, 2023
349d9fc
Use function declaration instead of expression.
i-zolotarenko Nov 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { LoadProgress } from "./request-container";
import { LoadProgress } from "./request";

export class BandwidthApproximator {
private readonly loadings: LoadProgress[] = [];
Expand Down
19 changes: 12 additions & 7 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,28 @@ import {
SegmentBase,
CoreEventHandlers,
} from "./types";
import * as Utils from "./utils/utils";
import * as StreamUtils from "./utils/stream";
import { LinkedMap } from "./linked-map";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { EngineCallbacks } from "./request-container";
import { EngineCallbacks } from "./request";
import { SegmentsMemoryStorage } from "./segments-storage";

export class Core<TStream extends Stream = Stream> {
private manifestResponseUrl?: string;
private readonly streams = new Map<string, StreamWithSegments<TStream>>();
private readonly settings: Settings = {
simultaneousHttpDownloads: 2,
simultaneousHttpDownloads: 1,
simultaneousP2PDownloads: 3,
highDemandTimeWindow: 15,
httpDownloadTimeWindow: 45,
p2pDownloadTimeWindow: 45,
cachedSegmentExpiration: 120 * 1000,
cachedSegmentsCount: 50,
webRtcMaxMessageSize: 64 * 1024 - 1,
p2pSegmentDownloadTimeout: 5000,
p2pLoaderDestroyTimeout: 30 * 1000,
p2pSegmentFirstBytesTimeoutMs: 1000,
p2pSegmentDownloadTimeoutMs: 5000,
p2pLoaderDestroyTimeoutMs: 30 * 1000,
httpDownloadTimeoutMs: 5000,
};
private readonly bandwidthApproximator = new BandwidthApproximator();
private segmentStorage?: SegmentsMemoryStorage;
Expand All @@ -40,7 +42,7 @@ export class Core<TStream extends Stream = Stream> {
}

hasSegment(segmentLocalId: string): boolean {
const segment = Utils.getSegmentFromStreamsMap(
const segment = StreamUtils.getSegmentFromStreamsMap(
this.streams,
segmentLocalId
);
Expand Down Expand Up @@ -116,7 +118,10 @@ export class Core<TStream extends Stream = Stream> {
throw new Error("Manifest response url is undefined");
}

const segment = Utils.getSegmentFromStreamsMap(this.streams, segmentId);
const segment = StreamUtils.getSegmentFromStreamsMap(
this.streams,
segmentId
);
if (!segment) {
throw new Error(`Not found segment with id: ${segmentId}`);
}
Expand Down
47 changes: 18 additions & 29 deletions packages/p2p-media-loader-core/src/declarations.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ declare module "bittorrent-tracker" {
getAnnounceOpts?: () => object;
});

on<E extends TrackerEvent>(event: E, handler: TrackerEventHandler<E>): void;
on<E extends keyof TrackerClientEvents>(
event: E,
handler: TrackerClientEvents[E]
): void;

start(): void;

Expand All @@ -20,39 +23,25 @@ declare module "bittorrent-tracker" {
destroy(): void;
}

export type TrackerEvent = "update" | "peer" | "warning" | "error";

export type TrackerEventHandler<E extends TrackerEvent> = E extends "update"
? (data: object) => void
: E extends "peer"
? (peer: PeerConnection) => void
: E extends "warning"
? (warning: unknown) => void
: E extends "error"
? (error: unknown) => void
: never;

type PeerEvent = "connect" | "data" | "close" | "error";

export type PeerConnectionEventHandler<E extends PeerEvent> =
E extends "connect"
? () => void
: E extends "data"
? (data: ArrayBuffer) => void
: E extends "close"
? () => void
: E extends "error"
? (error: { code: string }) => void
: never;
export type TrackerClientEvents = {
update: (data: object) => void;
peer: (peer: PeerConnection) => void;
warning: (warning: unknown) => void;
error: (error: unknown) => void;
};

export type PeerEvents = {
connect: () => void;
data: (data: Uint8Array) => void;
close: () => void;
error: (error: { code: string }) => void;
};

export type PeerConnection = {
id: string;
initiator: boolean;
_channel: RTCDataChannel;
on<E extends PeerEvent>(
event: E,
handler: PeerConnectionEventHandler<E>
): void;
on<E extends keyof PeerEvents>(event: E, handler: PeerEvents[E]): void;
send(data: string | ArrayBuffer): void;
write(data: string | ArrayBuffer): void;
destroy(): void;
Expand Down
16 changes: 0 additions & 16 deletions packages/p2p-media-loader-core/src/errors.ts

This file was deleted.

31 changes: 31 additions & 0 deletions packages/p2p-media-loader-core/src/event-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
export class EventDispatcher<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
T extends { [key: string]: (...args: any[]) => void | Promise<void> },
K extends keyof T = keyof T
> {
private readonly listeners = new Map<keyof T, Set<T[K]>>();

subscribe(eventType: K, ...listeners: T[K][]) {
let eventListeners = this.listeners.get(eventType);
if (!eventListeners) {
eventListeners = new Set();
this.listeners.set(eventType, eventListeners);
}
for (const listener of listeners) eventListeners.add(listener);
}

unsubscribe(eventType: K, listener: T[K]) {
const eventListeners = this.listeners.get(eventType);
if (!eventListeners) return;
eventListeners.delete(listener);
if (!eventListeners.size) this.listeners.delete(eventType);
}

dispatch(eventType: K, ...args: Parameters<T[K]>) {
const eventListeners = this.listeners.get(eventType);
if (!eventListeners) return;
for (const listener of eventListeners) {
listener(...args);
}
}
}
132 changes: 41 additions & 91 deletions packages/p2p-media-loader-core/src/http-loader.ts
Original file line number Diff line number Diff line change
@@ -1,111 +1,61 @@
import { RequestAbortError, FetchError } from "./errors";
import { Segment } from "./types";
import { HttpRequest, LoadProgress } from "./request-container";
import { Settings } from "./types";
import { Request, RequestError, HttpRequestErrorType } from "./request";

export function getHttpSegmentRequest(segment: Segment): Readonly<HttpRequest> {
const { promise, abortController, progress } = fetchSegmentData(segment);
return {
type: "http",
promise,
progress,
abort: () => abortController.abort(),
};
}

function fetchSegmentData(segment: Segment) {
export async function fulfillHttpSegmentRequest(
request: Request,
settings: Pick<Settings, "httpDownloadTimeoutMs">
) {
const headers = new Headers();
const { url, byteRange, localId: segmentId } = segment;
const { segment } = request;
const { url, byteRange } = segment;

if (byteRange) {
const { start, end } = byteRange;
const byteRangeString = `bytes=${start}-${end}`;
headers.set("Range", byteRangeString);
}
const abortController = new AbortController();

const progress: LoadProgress = {
canBeTracked: false,
totalBytes: 0,
loadedBytes: 0,
percent: 0,
startTimestamp: performance.now(),
};
const loadSegmentData = async () => {
try {
const response = await window.fetch(url, {
headers,
signal: abortController.signal,
});

if (response.ok) {
return await getDataPromiseAndMonitorProgress(response, progress);
}
throw new FetchError(
response.statusText ?? `Network response was not for ${segmentId}`,
response.status,
response
);
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
throw new RequestAbortError(`Segment fetch was aborted ${segmentId}`);
}
throw error;
const abortController = new AbortController();
const requestControls = request.start(
{ type: "http" },
{
abort: () => abortController.abort("abort"),
fullLoadingTimeoutMs: settings.httpDownloadTimeoutMs,
}
};

return {
promise: loadSegmentData(),
abortController,
progress,
};
}

async function getDataPromiseAndMonitorProgress(
response: Response,
progress: LoadProgress
): Promise<ArrayBuffer> {
const totalBytesString = response.headers.get("Content-Length");
if (!response.body) {
return response.arrayBuffer().then((data) => {
progress.loadedBytes = data.byteLength;
progress.totalBytes = data.byteLength;
progress.lastLoadedChunkTimestamp = performance.now();
progress.percent = 100;
return data;
);
try {
const fetchResponse = await window.fetch(url, {
headers,
signal: abortController.signal,
});
}
requestControls.firstBytesReceived();

if (totalBytesString) {
progress.totalBytes = +totalBytesString;
progress.canBeTracked = true;
}
if (!fetchResponse.ok) {
throw new RequestError("fetch-error", fetchResponse.statusText);
}

const reader = response.body.getReader();
if (!fetchResponse.body) return;
const totalBytesString = fetchResponse.headers.get("Content-Length");
if (totalBytesString) request.setTotalBytes(+totalBytesString);

progress.startTimestamp = performance.now();
const chunks: Uint8Array[] = [];
for await (const chunk of readStream(reader)) {
chunks.push(chunk);
progress.loadedBytes += chunk.length;
progress.lastLoadedChunkTimestamp = performance.now();
if (progress.canBeTracked) {
progress.percent = (progress.loadedBytes / progress.totalBytes) * 100;
const reader = fetchResponse.body.getReader();
for await (const chunk of readStream(reader)) {
requestControls.addLoadedChunk(chunk);
}
}
requestControls.completeOnSuccess();
} catch (error) {
if (error instanceof Error) {
if (error.name !== "abort") return;

if (!progress.canBeTracked) {
progress.totalBytes = progress.loadedBytes;
progress.percent = 100;
}
const resultBuffer = new ArrayBuffer(progress.loadedBytes);
const view = new Uint8Array(resultBuffer);

let offset = 0;
for (const chunk of chunks) {
view.set(chunk, offset);
offset += chunk.length;
const httpLoaderError: RequestError<HttpRequestErrorType> = !(
error instanceof RequestError
)
? new RequestError("fetch-error", error.message)
: error;
console.log("HTTP ERROR");
i-zolotarenko marked this conversation as resolved.
Show resolved Hide resolved
requestControls.cancelOnError(httpLoaderError);
}
}
return resultBuffer;
}

async function* readStream(
Expand Down
Loading