diff --git a/x-pack/packages/ml/aiops_utils/src/stream_factory.ts b/x-pack/packages/ml/aiops_utils/src/stream_factory.ts index 16bc2abe19306..fa455a04c23f1 100644 --- a/x-pack/packages/ml/aiops_utils/src/stream_factory.ts +++ b/x-pack/packages/ml/aiops_utils/src/stream_factory.ts @@ -5,6 +5,7 @@ * 2.0. */ +import crypto from 'crypto'; import { Stream } from 'stream'; import * as zlib from 'zlib'; @@ -42,7 +43,8 @@ interface StreamFactoryReturnType { */ export function streamFactory( headers: Headers, - logger: Logger + logger: Logger, + flushFix?: boolean ): StreamFactoryReturnType; /** * Sets up a response stream with support for gzip compression depending on provided @@ -53,7 +55,8 @@ export function streamFactory( */ export function streamFactory( headers: Headers, - logger: Logger + logger: Logger, + flushFix: boolean = false ): StreamFactoryReturnType { let streamType: StreamType; const isCompressed = acceptCompression(headers); @@ -82,7 +85,14 @@ export function streamFactory( } try { - const line = typeof d !== 'string' ? `${JSON.stringify(d)}${DELIMITER}` : d; + const line = + streamType === 'ndjson' + ? `${JSON.stringify({ + ...d, + // This is a temporary fix for response streaming with proxy configurations that buffer responses up to 4KB in size. + ...(flushFix ? { flushPayload: crypto.randomBytes(4096).toString('hex') } : {}), + })}${DELIMITER}` + : d; stream.write(line); } catch (e) { logger.error(`Could not serialize or stream data chunk: ${e.toString()}`); diff --git a/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts b/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts index 299157dccce66..83bcac0bfa70e 100644 --- a/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts +++ b/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts @@ -81,7 +81,8 @@ export const defineExplainLogRateSpikesRoute = ( const { end, push, responseWithHeaders } = streamFactory( request.headers, - logger + logger, + true ); function endWithUpdatedLoadingState() {