Skip to content

Commit

Permalink
[Flight] Split Streaming from Relay Implemenation (#18260)
Browse files Browse the repository at this point in the history
* Add ReactFlightServerConfig intermediate

This just forwards to the stream version of Flight which is itself forked
between Node and W3C streams.

The dom-relay goes directly to the Relay config though which allows it to
avoid the stream part of Flight.

* Separate streaming protocol into the Stream config

* Split streaming parts into the ReactFlightServerConfigStream

This decouples it so that the Relay implementation doesn't have to encode
the JSON to strings. Instead it can be fed the values as JSON objects and
do its own encoding.

* Split FlightClient into a basic part and a stream part

Same split as the server.

* Expose lower level async hooks to Relay

This requires an external helper file that we'll wire up internally.
  • Loading branch information
sebmarkbage authored Mar 10, 2020
1 parent 160505b commit 99d7371
Show file tree
Hide file tree
Showing 26 changed files with 615 additions and 269 deletions.
2 changes: 1 addition & 1 deletion packages/react-client/flight.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
* @flow
*/

export * from './src/ReactFlightClient';
export * from './src/ReactFlightClientStream';
141 changes: 28 additions & 113 deletions packages/react-client/src/ReactFlightClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,17 @@
* @flow
*/

import type {Source, StringDecoder} from './ReactFlightClientHostConfig';

import {
supportsBinaryStreams,
createStringDecoder,
readPartialStringChunk,
readFinalStringChunk,
} from './ReactFlightClientHostConfig';

export type ReactModelRoot<T> = {|
model: T,
|};

type JSONValue =
export type JSONValue =
| number
| null
| boolean
| string
| {[key: string]: JSONValue, ...};
| {[key: string]: JSONValue}
| Array<JSONValue>;

const PENDING = 0;
const RESOLVED = 1;
Expand All @@ -48,39 +40,23 @@ type ErroredChunk = {|
|};
type Chunk = PendingChunk | ResolvedChunk | ErroredChunk;

type OpaqueResponseWithoutDecoder = {
source: Source,
export type Response = {
partialRow: string,
modelRoot: ReactModelRoot<any>,
chunks: Map<number, Chunk>,
fromJSON: (key: string, value: JSONValue) => any,
...
};

type OpaqueResponse = OpaqueResponseWithoutDecoder & {
stringDecoder: StringDecoder,
...
};

export function createResponse(source: Source): OpaqueResponse {
export function createResponse(): Response {
let modelRoot: ReactModelRoot<any> = ({}: any);
let rootChunk: Chunk = createPendingChunk();
definePendingProperty(modelRoot, 'model', rootChunk);
let chunks: Map<number, Chunk> = new Map();
chunks.set(0, rootChunk);

let response: OpaqueResponse = (({
source,
let response = {
partialRow: '',
modelRoot,
chunks: chunks,
fromJSON: function(key, value) {
return parseFromJSON(response, this, key, value);
},
}: OpaqueResponseWithoutDecoder): any);
if (supportsBinaryStreams) {
response.stringDecoder = createStringDecoder();
}
};
return response;
}

Expand Down Expand Up @@ -138,10 +114,7 @@ function resolveChunk(chunk: Chunk, value: mixed): void {

// Report that any missing chunks in the model is now going to throw this
// error upon read. Also notify any pending promises.
export function reportGlobalError(
response: OpaqueResponse,
error: Error,
): void {
export function reportGlobalError(response: Response, error: Error): void {
response.chunks.forEach(chunk => {
// If this chunk was already resolved or errored, it won't
// trigger an error but if it wasn't then we need to
Expand All @@ -168,8 +141,8 @@ function definePendingProperty(
});
}

function parseFromJSON(
response: OpaqueResponse,
export function parseModelFromJSON(
response: Response,
targetObj: Object,
key: string,
value: JSONValue,
Expand All @@ -195,12 +168,11 @@ function parseFromJSON(
return value;
}

function resolveJSONRow(
response: OpaqueResponse,
export function resolveModelChunk<T>(
response: Response,
id: number,
json: string,
model: T,
): void {
let model = JSON.parse(json, response.fromJSON);
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
Expand All @@ -210,88 +182,31 @@ function resolveJSONRow(
}
}

function processFullRow(response: OpaqueResponse, row: string): void {
if (row === '') {
return;
}
let tag = row[0];
switch (tag) {
case 'J': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
resolveJSONRow(response, id, json);
return;
}
case 'E': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
let errorInfo = JSON.parse(json);
let error = new Error(errorInfo.message);
error.stack = errorInfo.stack;
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
chunks.set(id, createErrorChunk(error));
} else {
triggerErrorOnChunk(chunk, error);
}
return;
}
default: {
// Assume this is the root model.
resolveJSONRow(response, 0, row);
return;
}
}
}

export function processStringChunk(
response: OpaqueResponse,
chunk: string,
offset: number,
): void {
let linebreak = chunk.indexOf('\n', offset);
while (linebreak > -1) {
let fullrow = response.partialRow + chunk.substring(offset, linebreak);
processFullRow(response, fullrow);
response.partialRow = '';
offset = linebreak + 1;
linebreak = chunk.indexOf('\n', offset);
}
response.partialRow += chunk.substring(offset);
}

export function processBinaryChunk(
response: OpaqueResponse,
chunk: Uint8Array,
export function resolveErrorChunk(
response: Response,
id: number,
message: string,
stack: string,
): void {
if (!supportsBinaryStreams) {
throw new Error("This environment don't support binary chunks.");
}
let stringDecoder = response.stringDecoder;
let linebreak = chunk.indexOf(10); // newline
while (linebreak > -1) {
let fullrow =
response.partialRow +
readFinalStringChunk(stringDecoder, chunk.subarray(0, linebreak));
processFullRow(response, fullrow);
response.partialRow = '';
chunk = chunk.subarray(linebreak + 1);
linebreak = chunk.indexOf(10); // newline
let error = new Error(message);
error.stack = stack;
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
chunks.set(id, createErrorChunk(error));
} else {
triggerErrorOnChunk(chunk, error);
}
response.partialRow += readPartialStringChunk(stringDecoder, chunk);
}

export function complete(response: OpaqueResponse): void {
export function close(response: Response): void {
// In case there are any remaining unresolved chunks, they won't
// be resolved now. So we need to issue an error to those.
// Ideally we should be able to early bail out if we kept a
// ref count of pending chunks.
reportGlobalError(response, new Error('Connection closed.'));
}

export function getModelRoot<T>(response: OpaqueResponse): ReactModelRoot<T> {
export function getModelRoot<T>(response: Response): ReactModelRoot<T> {
return response.modelRoot;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
* @flow
*/

export type Source = Promise<Response> | ReadableStream | XMLHttpRequest;

export type StringDecoder = TextDecoder;

export const supportsBinaryStreams = true;
Expand Down
116 changes: 116 additions & 0 deletions packages/react-client/src/ReactFlightClientStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @flow
*/

import type {Response as ResponseBase, JSONValue} from './ReactFlightClient';

import type {StringDecoder} from './ReactFlightClientHostConfig';

import {
createResponse as createResponseImpl,
resolveModelChunk,
resolveErrorChunk,
parseModelFromJSON,
} from './ReactFlightClient';

import {
supportsBinaryStreams,
createStringDecoder,
readPartialStringChunk,
readFinalStringChunk,
} from './ReactFlightClientHostConfig';

export type ReactModelRoot<T> = {|
model: T,
|};

type Response = ResponseBase & {
fromJSON: (key: string, value: JSONValue) => any,
stringDecoder: StringDecoder,
};

export function createResponse(): Response {
let response: Response = (createResponseImpl(): any);
response.fromJSON = function(key: string, value: JSONValue) {
return parseModelFromJSON(response, this, key, value);
};
if (supportsBinaryStreams) {
response.stringDecoder = createStringDecoder();
}
return response;
}

function processFullRow(response: Response, row: string): void {
if (row === '') {
return;
}
let tag = row[0];
switch (tag) {
case 'J': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
let model = JSON.parse(json, response.fromJSON);
resolveModelChunk(response, id, model);
return;
}
case 'E': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
let errorInfo = JSON.parse(json);
resolveErrorChunk(response, id, errorInfo.message, errorInfo.stack);
return;
}
default: {
// Assume this is the root model.
let model = JSON.parse(row, response.fromJSON);
resolveModelChunk(response, 0, model);
return;
}
}
}

export function processStringChunk(
response: Response,
chunk: string,
offset: number,
): void {
let linebreak = chunk.indexOf('\n', offset);
while (linebreak > -1) {
let fullrow = response.partialRow + chunk.substring(offset, linebreak);
processFullRow(response, fullrow);
response.partialRow = '';
offset = linebreak + 1;
linebreak = chunk.indexOf('\n', offset);
}
response.partialRow += chunk.substring(offset);
}

export function processBinaryChunk(
response: Response,
chunk: Uint8Array,
): void {
if (!supportsBinaryStreams) {
throw new Error("This environment don't support binary chunks.");
}
let stringDecoder = response.stringDecoder;
let linebreak = chunk.indexOf(10); // newline
while (linebreak > -1) {
let fullrow =
response.partialRow +
readFinalStringChunk(stringDecoder, chunk.subarray(0, linebreak));
processFullRow(response, fullrow);
response.partialRow = '';
chunk = chunk.subarray(linebreak + 1);
linebreak = chunk.indexOf(10); // newline
}
response.partialRow += readPartialStringChunk(stringDecoder, chunk);
}

export {reportGlobalError, close, getModelRoot} from './ReactFlightClient';
Loading

0 comments on commit 99d7371

Please sign in to comment.