Skip to content

Commit

Permalink
Merge branch 'develop' into feat.amplitudeSkipTraits
Browse files Browse the repository at this point in the history
  • Loading branch information
ItsSudip authored Mar 18, 2024
2 parents 2fdbb7a + 2ad1239 commit bd32ace
Show file tree
Hide file tree
Showing 27 changed files with 2,810 additions and 431 deletions.
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
"rudder-transformer-cdk": "^1.4.11",
"set-value": "^4.1.0",
"sha256": "^0.2.0",
"sqlstring": "^2.3.3",
"stacktrace-parser": "^0.1.10",
"statsd-client": "^0.4.7",
"truncate-utf8-bytes": "^1.0.2",
Expand Down
3 changes: 2 additions & 1 deletion src/controllers/userTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ export class UserTransformController {
'(User transform - router:/customTransform ):: Request to transformer',
JSON.stringify(ctx.request.body),
);
const requestSize = Number(ctx.request.get('content-length'));
const events = ctx.request.body as ProcessorTransformationRequest[];
const processedRespone: UserTransformationServiceResponse =
await UserTransformService.transformRoutine(events, ctx.state.features);
await UserTransformService.transformRoutine(events, ctx.state.features, requestSize);
ctx.body = processedRespone.transformedEvents;
ControllerUtility.postProcess(ctx, processedRespone.retryStatus);
logger.debug(
Expand Down
14 changes: 9 additions & 5 deletions src/services/userTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
RetryRequestError,
extractStackTraceUptoLastSubstringMatch,
} from '../util/utils';
import { getMetadata, isNonFuncObject } from '../v0/util';
import { getMetadata, getTransformationMetadata, isNonFuncObject } from '../v0/util';
import { SUPPORTED_FUNC_NAMES } from '../util/ivmFactory';
import logger from '../logger';
import stats from '../util/stats';
Expand All @@ -28,6 +28,7 @@ export class UserTransformService {
public static async transformRoutine(
events: ProcessorTransformationRequest[],
features: FeatureFlags = {},
requestSize = 0,
): Promise<UserTransformationServiceResponse> {
let retryStatus = 200;
const groupedEvents: NonNullable<unknown> = groupBy(
Expand Down Expand Up @@ -162,16 +163,19 @@ export class UserTransformService {
),
);
stats.counter('user_transform_errors', eventsToProcess.length, {
transformationId: eventsToProcess[0]?.metadata?.transformationId,
workspaceId: eventsToProcess[0]?.metadata?.workspaceId,
status,
...metaTags,
...getTransformationMetadata(eventsToProcess[0]?.metadata),
});
} finally {
stats.timing('user_transform_request_latency', userFuncStartTime, {
workspaceId: eventsToProcess[0]?.metadata?.workspaceId,
transformationId: eventsToProcess[0]?.metadata?.transformationId,
...metaTags,
...getTransformationMetadata(eventsToProcess[0]?.metadata),
});

stats.histogram('user_transform_batch_size', requestSize, {
...metaTags,
...getTransformationMetadata(eventsToProcess[0]?.metadata),
});
}

Expand Down
20 changes: 20 additions & 0 deletions src/util/prometheus.js
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,10 @@ class Prometheus {
name: 'tp_batch_size',
help: 'Size of batch of events for tracking plan validation',
type: 'histogram',
buckets: [
1024, 102400, 524288, 1048576, 10485760, 20971520, 52428800, 104857600, 209715200,
524288000,
],
labelNames: [
'sourceType',
'destinationType',
Expand Down Expand Up @@ -670,6 +674,22 @@ class Prometheus {
'k8_namespace',
],
},
{
name: 'user_transform_batch_size',
help: 'user_transform_batch_size',
type: 'histogram',
labelNames: [
'workspaceId',
'transformationId',
'sourceType',
'destinationType',
'k8_namespace',
],
buckets: [
1024, 102400, 524288, 1048576, 10485760, 20971520, 52428800, 104857600, 209715200,
524288000,
], // 1KB, 100KB, 0.5MB, 1MB, 10MB, 20MB, 50MB, 100MB, 200MB, 500MB
},
{
name: 'source_transform_request_latency',
help: 'source_transform_request_latency',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { get, set } = require('lodash');
const sha256 = require('sha256');
const { NetworkError, NetworkInstrumentationError } = require('@rudderstack/integrations-lib');
const SqlString = require('sqlstring');
const { prepareProxyRequest, handleHttpRequest } = require('../../../adapters/network');
const { isHttpStatusSuccess, getAuthErrCategoryFromStCode } = require('../../util/index');
const { CONVERSION_ACTION_ID_CACHE_TTL } = require('./config');
Expand Down Expand Up @@ -29,8 +30,12 @@ const ERROR_MSG_PATH = 'response[0].error.message';
const getConversionActionId = async (method, headers, params) => {
const conversionActionIdKey = sha256(params.event + params.customerId).toString();
return conversionActionIdCache.get(conversionActionIdKey, async () => {
const queryString = SqlString.format(
'SELECT conversion_action.id FROM conversion_action WHERE conversion_action.name = ?',
[params.event],
);
const data = {
query: `SELECT conversion_action.id FROM conversion_action WHERE conversion_action.name = '${params.event}'`,
query: queryString,
};
const requestBody = {
url: `${BASE_ENDPOINT}/${params.customerId}/googleAds:searchStream`,
Expand Down Expand Up @@ -117,7 +122,7 @@ const responseHandler = (responseParams) => {
// Ref - https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
if (partialFailureError && partialFailureError.code !== 0) {
throw new NetworkError(
`[Google Ads Offline Conversions]:: partialFailureError - ${JSON.stringify(
`[Google Adwords Enhanced Conversions]:: partialFailureError - ${JSON.stringify(
partialFailureError,
)}`,
400,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const sha256 = require('sha256');
const SqlString = require('sqlstring');
const { get, set, cloneDeep } = require('lodash');
const {
AbortedError,
Expand Down Expand Up @@ -53,8 +54,12 @@ const validateDestinationConfig = ({ Config }) => {
const getConversionActionId = async (headers, params) => {
const conversionActionIdKey = sha256(params.event + params.customerId).toString();
return conversionActionIdCache.get(conversionActionIdKey, async () => {
const queryString = SqlString.format(
'SELECT conversion_action.id FROM conversion_action WHERE conversion_action.name = ?',
[params.event],
);
const data = {
query: `SELECT conversion_action.id FROM conversion_action WHERE conversion_action.name = '${params.event}'`,
query: queryString,
};
const endpoint = SEARCH_STREAM.replace(':customerId', params.customerId);
const requestOptions = {
Expand Down
13 changes: 13 additions & 0 deletions src/v0/destinations/pardot/networkHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ const getStatus = (code) => {
const pardotRespHandler = (destResponse, stageMsg) => {
const { status, response } = destResponse;
const respAttributes = response['@attributes'];

// to handle errors like service unavilable, wrong url, no response
if (!respAttributes) {
throw new NetworkError(
`${JSON.stringify(response)} ${stageMsg}`,
status,
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status),
},
response,
);
}

const { stat, err_code: errorCode } = respAttributes;

if (isHttpStatusSuccess(status) && stat !== 'fail') {
Expand Down
51 changes: 24 additions & 27 deletions src/v1/destinations/algolia/networkHandler.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable no-restricted-syntax */
const { TransformerProxyError } = require('../../../v0/util/errorTypes');
const { prepareProxyRequest, proxyRequest } = require('../../../adapters/network');
const { isHttpStatusSuccess, getAuthErrCategoryFromStCode } = require('../../../v0/util/index');
const { isHttpStatusSuccess } = require('../../../v0/util/index');

const {
processAxiosResponse,
Expand All @@ -13,12 +13,6 @@ const responseHandler = (responseParams) => {
const { destinationResponse, rudderJobMetadata } = responseParams;
const message = `[ALGOLIA Response V1 Handler] - Request Processed Successfully`;
const responseWithIndividualEvents = [];
// response:
// {status: 200, message: 'OK'}
// {response:'[ENOTFOUND] :: DNS lookup failed', status: 400}
// destinationResponse = {
// response: {"status": 422, "message": "EventType must be one of \"click\", \"conversion\" or \"view\""}, status: 422
// }
const { response, status } = destinationResponse;

if (isHttpStatusSuccess(status)) {
Expand All @@ -41,35 +35,38 @@ const responseHandler = (responseParams) => {

// in case of non 2xx status sending 500 for every event, populate response and update dontBatch to true
const errorMessage = response?.error?.message || response?.message || 'unknown error format';
let serverStatus = 400;
for (const metadata of rudderJobMetadata) {
// handling case if dontBatch is true, and again we got invalid from destination
if (metadata.dontBatch && status === 422) {
responseWithIndividualEvents.push({
statusCode: 400,
metadata,
error: errorMessage,
});
} else {
serverStatus = 500;
metadata.dontBatch = true;
responseWithIndividualEvents.push({
statusCode: 500,
metadata,
error: errorMessage,
});
}
metadata.dontBatch = true;
responseWithIndividualEvents.push({
statusCode: 500,
metadata,
error: errorMessage,
});
}

// At least one event in the batch is invalid.
if (status === 422) {
// sending back 500 for retry
throw new TransformerProxyError(
`ALGOLIA: Error transformer proxy v1 during ALGOLIA response transformation`,
500,
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(500),
},
destinationResponse,
'',
responseWithIndividualEvents,
);
}

// sending back 500 for retry
throw new TransformerProxyError(
`ALGOLIA: Error transformer proxy v1 during ALGOLIA response transformation`,
serverStatus,
status,
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status),
},
destinationResponse,
getAuthErrCategoryFromStCode(status),
'',
responseWithIndividualEvents,
);
};
Expand Down
Loading

0 comments on commit bd32ace

Please sign in to comment.