Skip to content

Commit

Permalink
fix(data post): add retry capability on upload file as intended PE-7598
Browse files Browse the repository at this point in the history
  • Loading branch information
fedellen committed Feb 6, 2025
1 parent 7bcf46b commit cfe312b
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 41 deletions.
91 changes: 63 additions & 28 deletions src/common/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { AxiosError } from 'axios';
import { IAxiosRetryConfig } from 'axios-retry';
import { Buffer } from 'node:buffer';
import { Readable } from 'node:stream';
import { pLimit } from 'plimit-lit';
Expand All @@ -37,6 +39,8 @@ import {
TurboUploadFolderParams,
TurboUploadFolderResponse,
} from '../types.js';
import { defaultRetryConfig } from '../utils/axiosClient.js';
import { sleep } from '../utils/common.js';
import { TurboHTTPService } from './http.js';
import { TurboWinstonLogger } from './logger.js';

Expand All @@ -56,11 +60,12 @@ export class TurboUnauthenticatedUploadService
protected httpService: TurboHTTPService;
protected logger: TurboLogger;
protected token: TokenType;
protected retryConfig: IAxiosRetryConfig;

constructor({
url = defaultUploadServiceURL,
retryConfig,
logger = TurboWinstonLogger.default,
retryConfig = defaultRetryConfig(logger),
token = 'arweave',
}: TurboUnauthenticatedUploadServiceConfiguration) {
this.token = token;
Expand All @@ -70,6 +75,7 @@ export class TurboUnauthenticatedUploadService
retryConfig,
logger: this.logger,
});
this.retryConfig = retryConfig;
}

async uploadSignedDataItem({
Expand Down Expand Up @@ -118,36 +124,65 @@ export abstract class TurboAuthenticatedBaseUploadService
dataItemOpts,
}: TurboFileFactory &
TurboAbortSignal): Promise<TurboUploadDataItemResponse> {
const { dataItemStreamFactory, dataItemSizeFactory } =
await this.signer.signDataItem({
fileStreamFactory,
fileSizeFactory,
dataItemOpts,
});
const signedDataItem = dataItemStreamFactory();
this.logger.debug('Uploading signed data item...');
// TODO: add p-limit constraint or replace with separate upload class

const headers = {
'content-type': 'application/octet-stream',
'content-length': `${dataItemSizeFactory()}`,
};
if (dataItemOpts !== undefined && dataItemOpts.paidBy !== undefined) {
const paidBy = Array.isArray(dataItemOpts.paidBy)
? dataItemOpts.paidBy
: [dataItemOpts.paidBy];

if (dataItemOpts.paidBy.length > 0) {
headers['x-paid-by'] = paidBy;
let retries = 0;
const maxRetries = this.retryConfig.retries ?? 3;
const retryDelay =
this.retryConfig.retryDelay ??
((retryNumber: number) => retryNumber * 1000);
let lastError = 'Unknown Error'; // Store the last error for throwing

while (retries < maxRetries) {
try {
const { dataItemStreamFactory, dataItemSizeFactory } =
await this.signer.signDataItem({
fileStreamFactory,
fileSizeFactory,
dataItemOpts,
});
this.logger.debug('Uploading signed data item...');
// TODO: add p-limit constraint or replace with separate upload class

const headers = {
'content-type': 'application/octet-stream',
'content-length': `${dataItemSizeFactory()}`,
};
if (dataItemOpts !== undefined && dataItemOpts.paidBy !== undefined) {
const paidBy = Array.isArray(dataItemOpts.paidBy)
? dataItemOpts.paidBy
: [dataItemOpts.paidBy];

if (dataItemOpts.paidBy.length > 0) {
headers['x-paid-by'] = paidBy;
}
}
const data = await this.httpService.post<TurboUploadDataItemResponse>({
endpoint: `/tx/${this.token}`,
signal,
data: dataItemStreamFactory(),
headers,
});
return data;
} catch (error) {
lastError =
error instanceof AxiosError && error.code !== undefined
? error.code
: error instanceof Error && error.message !== undefined
? error.message
: `${error}`; // Store the last encountered error
this.logger.debug(
`Upload failed, attempt ${retries + 1}/${maxRetries}`,
{ message: lastError },
error,
);
retries++;
await sleep(retryDelay(retries, error));
}
}

return this.httpService.post<TurboUploadDataItemResponse>({
endpoint: `/tx/${this.token}`,
signal,
data: signedDataItem,
headers,
});
// After all retries, throw the last error for catching
throw new Error(
`Failed to upload file after ${maxRetries} attempts: ${lastError}`,
);
}

protected async generateManifest({
Expand Down
31 changes: 18 additions & 13 deletions src/utils/axiosClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,27 @@ export interface AxiosInstanceParameters {
logger?: TurboLogger;
}

export const defaultRetryConfig: (logger?: TurboLogger) => IAxiosRetryConfig = (
logger = TurboWinstonLogger.default,
) => ({
retryDelay: axiosRetry.exponentialDelay,
retries: 5,
retryCondition: (error) => {
return (
!(error instanceof CanceledError) &&
axiosRetry.isIdempotentRequestError(error) &&
axiosRetry.isNetworkError(error)
);
},
onRetry: (retryCount, error) => {
logger.debug(`Request failed, ${error}. Retry attempt #${retryCount}...`);
},
});

export const createAxiosInstance = ({
logger = TurboWinstonLogger.default,
axiosConfig = {},
retryConfig = {
retryDelay: axiosRetry.exponentialDelay,
retries: 3,
retryCondition: (error) => {
return (
!(error instanceof CanceledError) &&
axiosRetry.isNetworkOrIdempotentRequestError(error)
);
},
onRetry: (retryCount, error) => {
logger.debug(`Request failed, ${error}. Retry attempt #${retryCount}...`);
},
},
retryConfig = defaultRetryConfig(logger),
}: AxiosInstanceParameters = {}): AxiosInstance => {
const axiosInstance = axios.create({
...axiosConfig,
Expand Down

0 comments on commit cfe312b

Please sign in to comment.