Skip to content

Commit

Permalink
[ML] Explain Log Rate Spikes: Fix stream flushing. (elastic#140506)
Browse files Browse the repository at this point in the history
This is a temporary fix for response streaming. The current cloud environment buffers each stream with chunks up to 4KB. To force trigger flushing, we send along a 4KB dummy payload to trigger an update. This fixes a stale loading bar for Explain Log Rate Spikes. Once the cloud environment's proxy has been updated to support flushing below the 4KB threshold, we can remove this fix again.
  • Loading branch information
walterra authored Sep 15, 2022
1 parent 95086f4 commit e3664b1
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
16 changes: 13 additions & 3 deletions x-pack/packages/ml/aiops_utils/src/stream_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* 2.0.
*/

import crypto from 'crypto';
import { Stream } from 'stream';
import * as zlib from 'zlib';

Expand Down Expand Up @@ -42,7 +43,8 @@ interface StreamFactoryReturnType<T = unknown> {
*/
export function streamFactory<T = string>(
headers: Headers,
logger: Logger
logger: Logger,
flushFix?: boolean
): StreamFactoryReturnType<T>;
/**
* Sets up a response stream with support for gzip compression depending on provided
Expand All @@ -53,7 +55,8 @@ export function streamFactory<T = string>(
*/
export function streamFactory<T = unknown>(
headers: Headers,
logger: Logger
logger: Logger,
flushFix: boolean = false
): StreamFactoryReturnType<T> {
let streamType: StreamType;
const isCompressed = acceptCompression(headers);
Expand Down Expand Up @@ -82,7 +85,14 @@ export function streamFactory<T = unknown>(
}

try {
const line = typeof d !== 'string' ? `${JSON.stringify(d)}${DELIMITER}` : d;
const line =
streamType === 'ndjson'
? `${JSON.stringify({
...d,
// This is a temporary fix for response streaming with proxy configurations that buffer responses up to 4KB in size.
...(flushFix ? { flushPayload: crypto.randomBytes(4096).toString('hex') } : {}),
})}${DELIMITER}`
: d;
stream.write(line);
} catch (e) {
logger.error(`Could not serialize or stream data chunk: ${e.toString()}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ export const defineExplainLogRateSpikesRoute = (

const { end, push, responseWithHeaders } = streamFactory<AiopsExplainLogRateSpikesApiAction>(
request.headers,
logger
logger,
true
);

function endWithUpdatedLoadingState() {
Expand Down

0 comments on commit e3664b1

Please sign in to comment.