Skip to content

Commit

Permalink
Merge pull request #811 from Portkey-AI/feat/log-restructuring
Browse files Browse the repository at this point in the history
Feat: add new data points in request options
  • Loading branch information
VisargD authored Dec 16, 2024
2 parents 5aedf36 + 80248e9 commit 202fe23
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 76 deletions.
168 changes: 105 additions & 63 deletions src/handlers/handlerUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,10 @@ export async function tryPost(

const requestOptions = c.get('requestOptions') ?? [];

let mappedResponse: Response, retryCount: number | undefined;
let mappedResponse: Response,
retryCount: number | undefined,
createdAt: Date,
originalResponseJson: Record<string, any> | undefined;

let cacheKey: string | undefined;
let { cacheMode, cacheMaxAge, cacheStatus } = getCacheOptions(
Expand All @@ -372,16 +375,17 @@ export async function tryPost(
isResponseAlreadyMapped: boolean = false
) {
if (!isResponseAlreadyMapped) {
({ response: mappedResponse } = await responseHandler(
response,
isStreamingMode,
provider,
responseTransformer,
url,
isCacheHit,
params,
strictOpenAiCompliance
));
({ response: mappedResponse, originalResponseJson } =
await responseHandler(
response,
isStreamingMode,
provider,
responseTransformer,
url,
isCacheHit,
params,
strictOpenAiCompliance
));
}

updateResponseHeaders(
Expand All @@ -401,7 +405,18 @@ export async function tryPost(
requestURL: url,
rubeusURL: fn,
},
transformedRequest: {
body: transformedRequestBody,
headers: fetchOptions.headers,
},
requestParams: transformedRequestBody,
finalUntransformedRequest: {
body: params,
},
originalResponse: {
body: originalResponseJson,
},
createdAt,
response: mappedResponse.clone(),
cacheStatus: cacheStatus,
lastUsedOptionIndex: currentIndex,
Expand All @@ -424,15 +439,18 @@ export async function tryPost(
}

// BeforeHooksHandler
brhResponse = await beforeRequestHookHandler(c, hookSpan.id);
({ response: brhResponse, createdAt } = await beforeRequestHookHandler(
c,
hookSpan.id
));

if (!!brhResponse) {
// If before requestHandler returns a response, return it
return createResponse(brhResponse, undefined, false);
return createResponse(brhResponse, undefined, false, false);
}

// Cache Handler
({ cacheResponse, cacheStatus, cacheKey } = await cacheHandler(
({ cacheResponse, cacheStatus, cacheKey, createdAt } = await cacheHandler(
c,
providerOption,
requestHeaders,
Expand All @@ -455,19 +473,20 @@ export async function tryPost(
}

// Request Handler (Including retries, recursion and hooks)
[mappedResponse, retryCount] = await recursiveAfterRequestHookHandler(
c,
url,
fetchOptions,
providerOption,
isStreamingMode,
params,
0,
fn,
requestHeaders,
hookSpan.id,
strictOpenAiCompliance
);
({ mappedResponse, retryCount, createdAt, originalResponseJson } =
await recursiveAfterRequestHookHandler(
c,
url,
fetchOptions,
providerOption,
isStreamingMode,
params,
0,
fn,
requestHeaders,
hookSpan.id,
strictOpenAiCompliance
));

return createResponse(mappedResponse, undefined, false, true);
}
Expand Down Expand Up @@ -1007,34 +1026,46 @@ export async function recursiveAfterRequestHookHandler(
requestHeaders: Record<string, string>,
hookSpanId: string,
strictOpenAiCompliance: boolean
): Promise<[Response, number]> {
let response, retryCount;
): Promise<{
mappedResponse: Response;
retryCount: number;
createdAt: Date;
originalResponseJson?: Record<string, any>;
}> {
let response, retryCount, createdAt, executionTime;
const requestTimeout =
Number(requestHeaders[HEADER_KEYS.REQUEST_TIMEOUT]) ||
providerOption.requestTimeout ||
null;

const { retry } = providerOption;

[response, retryCount] = await retryRequest(
({
response,
attempt: retryCount,
createdAt,
} = await retryRequest(
url,
options,
retry?.attempts || 0,
retry?.onStatusCodes || [],
requestTimeout || null
);
));

const { response: mappedResponse, responseJson: mappedResponseJson } =
await responseHandler(
response,
isStreamingMode,
providerOption,
fn,
url,
false,
gatewayParams,
strictOpenAiCompliance
);
const {
response: mappedResponse,
responseJson: mappedResponseJson,
originalResponseJson,
} = await responseHandler(
response,
isStreamingMode,
providerOption,
fn,
url,
false,
gatewayParams,
strictOpenAiCompliance
);

const arhResponse = await afterRequestHookHandler(
c,
Expand Down Expand Up @@ -1072,7 +1103,12 @@ export async function recursiveAfterRequestHookHandler(
lastAttempt = -1; // All retry attempts exhausted without success.
}

return [arhResponse, lastAttempt];
return {
mappedResponse: arhResponse,
retryCount: lastAttempt,
createdAt,
originalResponseJson,
};
}

/**
Expand Down Expand Up @@ -1104,6 +1140,7 @@ async function cacheHandler(
hookSpanId: string,
fn: endpointStrings
) {
const start = new Date();
const [getFromCacheFunction, cacheIdentifier] = [
c.get('getFromCache'),
c.get('cacheIdentifier'),
Expand Down Expand Up @@ -1149,7 +1186,6 @@ async function cacheHandler(
},
});
}

return {
cacheResponse: !!cacheResponse
? new Response(responseBody, {
Expand All @@ -1159,6 +1195,7 @@ async function cacheHandler(
: undefined,
cacheStatus,
cacheKey,
createdAt: start,
};
}

Expand All @@ -1167,6 +1204,7 @@ export async function beforeRequestHookHandler(
hookSpanId: string
): Promise<any> {
try {
const start = new Date();
const hooksManager = c.get('hooksManager');
const hooksResult = await hooksManager.executeHooks(
hookSpanId,
Expand All @@ -1175,29 +1213,33 @@ export async function beforeRequestHookHandler(
);

if (hooksResult.shouldDeny) {
return new Response(
JSON.stringify({
error: {
message:
'The guardrail checks defined in the config failed. You can find more information in the `hook_results` object.',
type: 'hooks_failed',
param: null,
code: null,
},
hook_results: {
before_request_hooks: hooksResult.results,
after_request_hooks: [],
},
}),
{
status: 446,
headers: { 'content-type': 'application/json' },
}
);
return {
response: new Response(
JSON.stringify({
error: {
message:
'The guardrail checks defined in the config failed. You can find more information in the `hook_results` object.',
type: 'hooks_failed',
param: null,
code: null,
},
hook_results: {
before_request_hooks: hooksResult.results,
after_request_hooks: [],
},
}),
{
status: 446,
headers: { 'content-type': 'application/json' },
}
),
createdAt: start,
};
}
} catch (err) {
console.log(err);
return { error: err };
// TODO: Handle this error!!!
}
return {};
}
7 changes: 6 additions & 1 deletion src/handlers/responseHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ export async function responseHandler(
isCacheHit: boolean = false,
gatewayRequest: Params,
strictOpenAiCompliance: boolean
): Promise<{ response: Response; responseJson: any }> {
): Promise<{
response: Response;
responseJson: Record<string, any> | null;
originalResponseJson?: Record<string, any>;
}> {
let responseTransformerFunction: Function | undefined;
let providerOption: Options | undefined;
const responseContentType = response.headers?.get('content-type');
Expand Down Expand Up @@ -146,6 +150,7 @@ export async function responseHandler(
return {
response: nonStreamingResponse.response,
responseJson: nonStreamingResponse.json,
originalResponseJson: nonStreamingResponse.originalResponseBodyJson,
};
}

Expand Down
21 changes: 12 additions & 9 deletions src/handlers/retryHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,14 @@ export const retryRequest = async (
retryCount: number,
statusCodesToRetry: number[],
timeout: number | null
): Promise<[Response, number | undefined]> => {
let lastError: any | undefined;
): Promise<{
response: Response;
attempt: number | undefined;
createdAt: Date;
}> => {
let lastResponse: Response | undefined;
let lastAttempt: number | undefined;
const start = new Date();
try {
await retry(
async (bail: any, attempt: number) => {
Expand All @@ -79,11 +83,7 @@ export const retryRequest = async (
errorObj.headers = Object.fromEntries(response.headers);
throw errorObj;
} else if (response.status >= 200 && response.status <= 204) {
// console.log(
// `Returned in Retry Attempt ${attempt}. Status:`,
// response.ok,
// response.status
// );
// do nothing
} else {
// All error codes that aren't retried need to be propogated up
const errorObj: any = new Error(await response.clone().text());
Expand All @@ -94,7 +94,6 @@ export const retryRequest = async (
}
lastResponse = response;
} catch (error: any) {
lastError = error;
if (attempt >= retryCount + 1) {
bail(error);
return;
Expand Down Expand Up @@ -138,5 +137,9 @@ export const retryRequest = async (
`Tried ${lastAttempt ?? 1} time(s) but failed. Error: ${JSON.stringify(error)}`
);
}
return [lastResponse as Response, lastAttempt];
return {
response: lastResponse as Response,
attempt: lastAttempt,
createdAt: start,
};
};
13 changes: 10 additions & 3 deletions src/handlers/streamHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ export async function handleNonStreamingMode(
response: Response,
responseTransformer: Function | undefined,
strictOpenAiCompliance: boolean
) {
): Promise<{
response: Response;
json: Record<string, any>;
originalResponseBodyJson?: Record<string, any>;
}> {
// 408 is thrown whenever a request takes more than request_timeout to respond.
// In that case, response thrown by gateway is already in OpenAI format.
// So no need to transform it again.
Expand All @@ -227,7 +231,8 @@ export async function handleNonStreamingMode(
return { response, json: await response.clone().json() };
}

let responseBodyJson = await response.json();
const originalResponseBodyJson: Record<string, any> = await response.json();
let responseBodyJson = originalResponseBodyJson;
if (responseTransformer) {
responseBodyJson = responseTransformer(
responseBodyJson,
Expand All @@ -239,7 +244,9 @@ export async function handleNonStreamingMode(

return {
response: new Response(JSON.stringify(responseBodyJson), response),
json: responseBodyJson,
json: responseBodyJson as Record<string, any>,
// Send original response if transformer exists
...(responseTransformer && { originalResponseBodyJson }),
};
}

Expand Down

0 comments on commit 202fe23

Please sign in to comment.