From 5ef1b550d5971f6eb7c5dc028ad6994e81b3feff Mon Sep 17 00:00:00 2001 From: Cristian Dominguez <6853656+cristiand391@users.noreply.github.com> Date: Wed, 30 Oct 2024 11:58:01 -0300 Subject: [PATCH] feat: add `data update bulk/resume` commands (#1098) * chore: refactor bulk ingest utils * feat: add `data update bulk/resume` * fix: update `data import bulk` help * test: add bulk update NUT * test: break up NUTs (#1099) * chore: unify bulk ingest logic * test: add bulk update NUTs to test matrix * fix: insert operation * fix: command-specific resume instructions * fix: command-specific stage title * fix: pass operation opt * test: fix update resume NUT on win * test: refactor/doc * chore: moar refactor/doc * chore: clean up msgs * feat: add column-delimiter flag to import/update bulk * chore: update command snapshot * chore: eslint rule inline * test: validate async command's cache files * chore: update msg [skip ci] * fix: edit help for new "data update bulk|resume" commands (#1106) * fix: remove `as string` * chore: use proper stop status * chore: share column-delimiter flag def * test: remove type assertions * feat: detect column delimiter * test: nut should detect column delimiter --------- Co-authored-by: Juliet Shackell <63259011+jshackell-sfdc@users.noreply.github.com> --- .github/workflows/test.yml | 11 + command-snapshot.json | 40 ++- messages/bulkIngest.md | 39 +++ messages/data.export.resume.md | 2 +- messages/data.import.bulk.md | 38 +-- messages/data.import.resume.md | 32 -- messages/data.update.bulk.md | 47 +++ messages/data.update.resume.md | 29 ++ package.json | 14 +- src/bulkDataRequestCache.ts | 69 +++++ src/bulkIngest.ts | 290 ++++++++++++++++++ src/bulkUtils.ts | 74 ++++- src/commands/data/import/bulk.ts | 146 ++------- src/commands/data/import/resume.ts | 83 +---- src/commands/data/update/bulk.ts | 80 +++++ src/commands/data/update/resume.ts | 60 ++++ ...ulkImportStages.ts => bulkIngestStages.ts} | 7 +- test/bulkUtils.test.ts | 12 +- test/commands/data/dataBulk.nut.ts | 2 +- test/commands/data/export/bulk.nut.ts | 31 +- test/commands/data/export/resume.nut.ts | 36 +-- test/commands/data/import/bulk.nut.ts | 29 +- test/commands/data/import/resume.nut.ts | 76 ++--- .../query.nut.ts} | 0 test/commands/data/tree/dataTree.nut.ts | 2 +- .../data/tree/dataTreeCommonChild.nut.ts | 2 +- test/commands/data/tree/dataTreeDeep.nut.ts | 2 +- .../data/tree/dataTreeDeepBeta.nut.ts | 2 +- .../data/tree/dataTreeMoreThan200.nut.ts | 2 +- .../data/tree/dataTreeSelfReferencing.nut.ts | 2 +- test/commands/data/update/bulk.nut.ts | 71 +++++ test/commands/data/update/resume.nut.ts | 83 +++++ test/commands/force/data/bulk/dataBulk.nut.ts | 2 +- test/test-files/csv/backquote.csv | 11 + test/test-files/csv/caret.csv | 11 + test/test-files/csv/comma.csv | 11 + test/test-files/csv/pipe.csv | 11 + test/test-files/csv/semicolon.csv | 11 + test/test-files/csv/tab.csv | 11 + test/testUtil.ts | 105 +++++++ 40 files changed, 1200 insertions(+), 386 deletions(-) create mode 100644 messages/bulkIngest.md create mode 100644 messages/data.update.bulk.md create mode 100644 messages/data.update.resume.md create mode 100644 src/bulkIngest.ts create mode 100644 src/commands/data/update/bulk.ts create mode 100644 src/commands/data/update/resume.ts rename src/ux/{bulkImportStages.ts => bulkIngestStages.ts} (94%) rename test/commands/data/{dataSoqlQuery.nut.ts => query/query.nut.ts} (100%) create mode 100644 test/commands/data/update/bulk.nut.ts create mode 100644 test/commands/data/update/resume.nut.ts create mode 100644 test/test-files/csv/backquote.csv create mode 100644 test/test-files/csv/caret.csv create mode 100644 test/test-files/csv/comma.csv create mode 100644 test/test-files/csv/pipe.csv create mode 100644 test/test-files/csv/semicolon.csv create mode 100644 test/test-files/csv/tab.csv diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 04639c09..7f66c86c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -21,6 +21,17 @@ jobs: strategy: matrix: os: [ubuntu-latest, windows-latest] + command: + - 'yarn test:nuts:bulk:export' + - 'yarn test:nuts:bulk:import' + - 'yarn test:nuts:bulk:update' + - 'yarn test:nuts:data:bulk-upsert-delete' + - 'yarn test:nuts:data:create' + - 'yarn test:nuts:data:query' + - 'yarn test:nuts:data:record' + - 'yarn test:nuts:data:search' + - 'yarn test:nuts:data:tree' fail-fast: false with: os: ${{ matrix.os }} + command: ${{ matrix.command }} diff --git a/command-snapshot.json b/command-snapshot.json index fc20c2ce..cae8bf1d 100644 --- a/command-snapshot.json +++ b/command-snapshot.json @@ -142,7 +142,18 @@ "command": "data:import:bulk", "flagAliases": [], "flagChars": ["a", "f", "o", "s", "w"], - "flags": ["api-version", "async", "file", "flags-dir", "json", "line-ending", "sobject", "target-org", "wait"], + "flags": [ + "api-version", + "async", + "column-delimiter", + "file", + "flags-dir", + "json", + "line-ending", + "sobject", + "target-org", + "wait" + ], "plugin": "@salesforce/plugin-data" }, { @@ -235,6 +246,25 @@ "flags": ["api-version", "file", "flags-dir", "json", "query", "result-format", "target-org"], "plugin": "@salesforce/plugin-data" }, + { + "alias": [], + "command": "data:update:bulk", + "flagAliases": [], + "flagChars": ["a", "f", "o", "s", "w"], + "flags": [ + "api-version", + "async", + "column-delimiter", + "file", + "flags-dir", + "json", + "line-ending", + "sobject", + "target-org", + "wait" + ], + "plugin": "@salesforce/plugin-data" + }, { "alias": ["force:data:record:update"], "command": "data:update:record", @@ -255,6 +285,14 @@ ], "plugin": "@salesforce/plugin-data" }, + { + "alias": [], + "command": "data:update:resume", + "flagAliases": [], + "flagChars": ["i", "w"], + "flags": ["flags-dir", "job-id", "json", "use-most-recent", "wait"], + "plugin": "@salesforce/plugin-data" + }, { "alias": [], "command": "data:upsert:bulk", diff --git a/messages/bulkIngest.md b/messages/bulkIngest.md new file mode 100644 index 00000000..73908d38 --- /dev/null +++ b/messages/bulkIngest.md @@ -0,0 +1,39 @@ +# export.resume + +Run "sf %s --job-id %s" to resume the operation. + +# error.timeout + +The operation timed out after %s minutes. + +Run "sf %s --job-id %s" to resume it. + +# error.failedRecordDetails + +Job finished being processed but failed to process %s records. + +To review the details of this job, run this command: + +sf org open --target-org %s --path "/lightning/setup/AsyncApiJobStatus/page?address=%2F%s" + +# error.jobFailed + +Job failed to be processed due to: + +%s + +To review the details of this job, run this command: + +sf org open --target-org %s --path "/lightning/setup/AsyncApiJobStatus/page?address=%2F%s" + +# error.jobAborted + +Job has been aborted. + +To review the details of this job, run this command: + +sf org open --target-org %s --path "/lightning/setup/AsyncApiJobStatus/page?address=%2F%s" + +# flags.column-delimiter.summary + +Column delimiter used in the CSV file. Default is COMMA. diff --git a/messages/data.export.resume.md b/messages/data.export.resume.md index fabdd30f..7d432a72 100644 --- a/messages/data.export.resume.md +++ b/messages/data.export.resume.md @@ -1,6 +1,6 @@ # summary -Resume a bulk export job that you previously started. +Resume a bulk export job that you previously started. Uses Bulk API 2.0. # description diff --git a/messages/data.import.bulk.md b/messages/data.import.bulk.md index 6f318d63..b2ccfc37 100644 --- a/messages/data.import.bulk.md +++ b/messages/data.import.bulk.md @@ -40,40 +40,8 @@ Time to wait for the command to finish, in minutes. # flags.line-ending.summary -Line ending used in the CSV file. Default value on Windows is `CRLF`; on macOS and Linux it's `LR`. +Line ending used in the CSV file. Default value on Windows is `CRLF`; on macOS and Linux it's `LF`. -# export.resume +# flags.column-delimiter.summary -Run "sf data import resume --job-id %s" to resume the operation. - -# error.timeout - -The operation timed out after %s minutes. - -Run "sf data import resume --job-id %s" to resume it. - -# error.failedRecordDetails - -Job finished being processed but failed to import %s records. - -To review the details of this job, run this command: - -sf org open --target-org %s --path "/lightning/setup/AsyncApiJobStatus/page?address=%2F%s" - -# error.jobFailed - -Job failed to be processed due to: - -%s - -To review the details of this job, run this command: - -sf org open --target-org %s --path "/lightning/setup/AsyncApiJobStatus/page?address=%2F%s" - -# error.jobAborted - -Job has been aborted. - -To review the details of this job, run this command: - -sf org open --target-org %s --path "/lightning/setup/AsyncApiJobStatus/page?address=%2F%s" +Column delimiter used in the CSV file. Default is COMMA. diff --git a/messages/data.import.resume.md b/messages/data.import.resume.md index 3f27911f..5c10f2b4 100644 --- a/messages/data.import.resume.md +++ b/messages/data.import.resume.md @@ -27,35 +27,3 @@ Job ID of the bulk import. # flags.wait.summary Time to wait for the command to finish, in minutes. - -# error.failedRecordDetails - -Job finished being processed but failed to import %s records. - -To review the details of this job, run this command: - -sf org open --target-org %s --path "/lightning/setup/AsyncApiJobStatus/page?address=%2F%s" - -# error.timeout - -The operation timed out after %s minutes. - -Try re-running "sf data import resume --job-id %s" with a bigger wait time. - -# error.jobFailed - -Job failed to be processed due to: - -%s - -To review the details of this job, run this command: - -sf org open --target-org %s --path "/lightning/setup/AsyncApiJobStatus/page?address=%2F%s" - -# error.jobAborted - -Job has been aborted. - -To review the details of this job, run this command: - -sf org open --target-org %s --path "/lightning/setup/AsyncApiJobStatus/page?address=%2F%s" diff --git a/messages/data.update.bulk.md b/messages/data.update.bulk.md new file mode 100644 index 00000000..fa2eabd9 --- /dev/null +++ b/messages/data.update.bulk.md @@ -0,0 +1,47 @@ +# summary + +Bulk update records to an org from a CSV file. Uses Bulk API 2.0. + +# description + +You can use this command to update millions of Salesforce object records based on a file in comma-separated values (CSV) format. + +All the records in the CSV file must be for the same Salesforce object. Specify the object with the `--sobject` flag. The first column of every line in the CSV file must be an ID of the record you want to update. The CSV file can contain only existing records; if a record in the file doesn't currently exist in the Salesforce object, the command fails. Consider using "sf data upsert bulk" if you also want to insert new records. + +Bulk updates can take a while, depending on how many records are in the CSV file. If the command times out, or you specified the --async flag, the command displays the job ID. To see the status and get the results of the job, run "sf data update resume" and pass the job ID to the --job-id flag. + +For information and examples about how to prepare your CSV files, see "Prepare Data to Ingest" in the "Bulk API 2.0 and Bulk API Developer Guide" (https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/datafiles_prepare_data.htm). + +# examples + +- Update Account records from a CSV-formatted file into an org with alias "my-scratch"; if the update doesn't complete in 10 minutes, the command ends and displays a job ID: + + <%= config.bin %> <%= command.id %> --file accounts.csv --sobject Account --wait 10 --target-org my-scratch + +- Update asynchronously and use the default org; the command immediately returns a job ID that you then pass to the "sf data update resume" command: + + <%= config.bin %> <%= command.id %> --file accounts.csv --sobject Account --async + +# flags.async.summary + +Don't wait for the command to complete. + +# flags.wait.summary + +Time to wait for the command to finish, in minutes. + +# flags.file.summary + +CSV file that contains the Salesforce object records you want to update. + +# flags.sobject.summary + +API name of the Salesforce object, either standard or custom, which you are updating. + +# flags.line-ending.summary + +Line ending used in the CSV file. Default value on Windows is `CRLF`; on macOS and Linux it's `LF`. + +# flags.column-delimiter.summary + +Column delimiter used in the CSV file. Default is COMMA. diff --git a/messages/data.update.resume.md b/messages/data.update.resume.md new file mode 100644 index 00000000..c3e7d3a6 --- /dev/null +++ b/messages/data.update.resume.md @@ -0,0 +1,29 @@ +# summary + +Resume a bulk update job that you previously started. Uses Bulk API 2.0. + +# description + +When the original "sf data update bulk" command either times out or is run with the --async flag, it displays a job ID. To see the status and get the results of the bulk update, run this command by either passing it the job ID or using the --use-most-recent flag to specify the most recent bulk update job. + +# examples + +- Resume a bulk update job of your default org using a job ID: + + <%= config.bin %> <%= command.id %> --job-id 750xx000000005sAAA + +- Resume the most recently run bulk update job for an org with alias "my-scratch": + + <%= config.bin %> <%= command.id %> --use-most-recent --target-org my-scratch + +# flags.use-most-recent.summary + +Use the job ID of the bulk update job that was most recently run. + +# flags.job-id.summary + +Job ID of the bulk update. + +# flags.wait.summary + +Time to wait for the command to finish, in minutes. diff --git a/package.json b/package.json index f031b578..d72a341a 100644 --- a/package.json +++ b/package.json @@ -67,7 +67,8 @@ "description": "Query records." }, "update": { - "description": "Update a single record." + "description": "Update many records.", + "external": true }, "upsert": { "description": "Upsert many records." @@ -103,8 +104,15 @@ "prepack": "sf-prepack", "prepare": "sf-install", "test": "wireit", - "test:nuts": "nyc mocha \"./test/**/*.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20", - "test:nuts:bulk": "nyc mocha \"./test/**/dataBulk.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20", + "test:nuts:bulk:import": "nyc mocha \"./test/commands/data/import/*.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20", + "test:nuts:bulk:export": "nyc mocha \"./test/commands/data/export/*.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20", + "test:nuts:bulk:update": "nyc mocha \"./test/commands/data/update/*.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20", + "test:nuts:data:tree": "nyc mocha \"./test/commands/data/tree/*.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20", + "test:nuts:data:query": "nyc mocha \"./test/commands/data/query/*.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20", + "test:nuts:data:record": "nyc mocha \"./test/commands/data/record/dataRecord.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20", + "test:nuts:data:search": "nyc mocha \"./test/commands/data/search.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20", + "test:nuts:data:create": "nyc mocha \"./test/commands/data/create/*.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20", + "test:nuts:data:bulk-upsert-delete": "nyc mocha \"./test/commands/data/dataBulk.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20", "test:only": "wireit", "version": "oclif readme" }, diff --git a/src/bulkDataRequestCache.ts b/src/bulkDataRequestCache.ts index 43ef6c8b..7f8a61b9 100644 --- a/src/bulkDataRequestCache.ts +++ b/src/bulkDataRequestCache.ts @@ -257,6 +257,75 @@ export class BulkImportRequestCache extends TTLConfig { + public static getDefaultOptions(): TTLConfig.Options { + return { + isGlobal: true, + isState: true, + filename: BulkUpdateRequestCache.getFileName(), + stateFolder: Global.SF_STATE_FOLDER, + ttl: Duration.days(7), + }; + } + + public static getFileName(): string { + return 'bulk-data-update-cache.json'; + } + + public static async unset(key: string): Promise { + const cache = await BulkImportRequestCache.create(); + cache.unset(key); + await cache.write(); + } + + /** + * Creates a new bulk data import cache entry for the given bulk request id. + * + * @param bulkRequestId + * @param username + * @param apiVersion + */ + public async createCacheEntryForRequest(bulkRequestId: string, username: string, apiVersion: string): Promise { + this.set(bulkRequestId, { + jobId: bulkRequestId, + username, + apiVersion, + }); + await this.write(); + Logger.childFromRoot('BulkUpdateCache').debug(`bulk cache saved for ${bulkRequestId}`); + } + + public async resolveResumeOptionsFromCache(jobIdOrMostRecent: string | boolean): Promise { + if (typeof jobIdOrMostRecent === 'boolean') { + const key = this.getLatestKey(); + if (!key) { + throw messages.createError('error.missingCacheEntryError'); + } + // key definitely exists because it came from the cache + const entry = this.get(key); + + return { + jobInfo: { id: entry.jobId }, + options: { + connection: (await Org.create({ aliasOrUsername: entry.username })).getConnection(), + }, + }; + } else { + const entry = this.get(jobIdOrMostRecent); + if (!entry) { + throw messages.createError('error.bulkRequestIdNotFound', [jobIdOrMostRecent]); + } + + return { + jobInfo: { id: entry.jobId }, + options: { + connection: (await Org.create({ aliasOrUsername: entry.username })).getConnection(), + }, + }; + } + } +} + export class BulkExportRequestCache extends TTLConfig { public static getDefaultOptions(): TTLConfig.Options { return { diff --git a/src/bulkIngest.ts b/src/bulkIngest.ts new file mode 100644 index 00000000..4b7e3b5e --- /dev/null +++ b/src/bulkIngest.ts @@ -0,0 +1,290 @@ +/* + * Copyright (c) 2024, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +import * as fs from 'node:fs'; +import { platform } from 'node:os'; +import { Flags } from '@salesforce/sf-plugins-core'; +import { IngestJobV2, JobInfoV2 } from '@jsforce/jsforce-node/lib/api/bulk2.js'; +import { Connection, Messages } from '@salesforce/core'; +import { Schema } from '@jsforce/jsforce-node'; +import { Duration } from '@salesforce/kit'; +import { ensureString } from '@salesforce/ts-types'; +import { BulkIngestStages } from './ux/bulkIngestStages.js'; +import { BulkUpdateRequestCache, BulkImportRequestCache } from './bulkDataRequestCache.js'; +import { detectDelimiter } from './bulkUtils.js'; + +const messages = Messages.loadMessages('@salesforce/plugin-data', 'bulkIngest'); + +type BulkIngestInfo = { + jobId: string; + processedRecords?: number; + successfulRecords?: number; + failedRecords?: number; +}; + +type ResumeCommandIDs = 'data import resume' | 'data update resume'; + +/** + * Bulk API 2.0 ingest handler for `sf` bulk commands + * + * This function should be used exclusively by `sf data bulk` commands that: + * - do a bulk ingest operation + * - have a `resume` command + * + * It will create the specified bulk ingest job, set up the oclif/MSO stages and return the job info. + * */ +export async function bulkIngest(opts: { + resumeCmdId: ResumeCommandIDs; + stageTitle: string; + object: string; + operation: JobInfoV2['operation']; + lineEnding: JobInfoV2['lineEnding'] | undefined; + columnDelimiter: JobInfoV2['columnDelimiter'] | undefined; + conn: Connection; + cache: BulkUpdateRequestCache | BulkImportRequestCache; + async: boolean; + wait: Duration; + file: string; + jsonEnabled: boolean; + logFn: (message: string) => void; +}): Promise { + const { + conn, + operation, + object, + lineEnding = platform() === 'win32' ? 'CRLF' : 'LF', + columnDelimiter, + file, + logFn, + } = opts; + + const timeout = opts.async ? Duration.minutes(0) : opts.wait ?? Duration.minutes(0); + const async = timeout.milliseconds === 0; + + const baseUrl = ensureString(opts.conn.getAuthInfoFields().instanceUrl); + + const stages = new BulkIngestStages({ + resume: false, + title: async ? `${opts.stageTitle} (async)` : opts.stageTitle, + baseUrl, + jsonEnabled: opts.jsonEnabled, + }); + + stages.start(); + + if (async) { + const job = await createIngestJob(conn, file, { + object, + operation, + lineEnding, + columnDelimiter: columnDelimiter ?? (await detectDelimiter(file)), + }); + + stages.update(job.getInfo()); + + stages.stop(); + + await opts.cache.createCacheEntryForRequest(job.id, ensureString(conn.getUsername()), conn.getApiVersion()); + + logFn(messages.getMessage('export.resume', [opts.resumeCmdId, job.id])); + + return { + jobId: job.id, + }; + } + + // synchronous flow + const job = await createIngestJob(conn, file, { + object, + operation, + lineEnding, + columnDelimiter: columnDelimiter ?? (await detectDelimiter(file)), + }); + + stages.setupJobListeners(job); + stages.processingJob(); + + try { + await job.poll(5000, timeout.milliseconds); + + const jobInfo = job.getInfo(); + + // send last data update so job status/num. of records processed/failed represent the last update + stages.update(jobInfo); + + if (jobInfo.numberRecordsFailed) { + stages.error(); + // TODO: replace this msg to point to `sf data bulk results` when it's added (W-12408034) + throw messages.createError('error.failedRecordDetails', [ + jobInfo.numberRecordsFailed, + conn.getUsername(), + job.id, + ]); + } + + stages.stop(); + + return { + jobId: jobInfo.id, + processedRecords: jobInfo.numberRecordsProcessed, + successfulRecords: jobInfo.numberRecordsProcessed - (jobInfo.numberRecordsFailed ?? 0), + failedRecords: jobInfo.numberRecordsFailed, + }; + } catch (err) { + const jobInfo = await job.check(); + + // send last data update so job status/num. of records processed/failed represent the last update + stages.update(jobInfo); + + if (err instanceof Error && err.name === 'JobPollingTimeout') { + stages.stop(); + throw messages.createError('error.timeout', [timeout.minutes, opts.resumeCmdId, job.id]); + } + + if (jobInfo.state === 'Failed') { + stages.error(); + throw messages.createError( + 'error.jobFailed', + [jobInfo.errorMessage, conn.getUsername(), job.id], + [], + err as Error + ); + } + + if (jobInfo.state === 'Aborted') { + stages.stop('aborted'); + // TODO: replace this msg to point to `sf data bulk results` when it's added (W-12408034) + throw messages.createError('error.jobAborted', [conn.getUsername(), job.id], [], err as Error); + } + + throw err; + } +} + +/** + * Bulk API 2.0 ingest resume handler for `sf` bulk commands + * + * This function should be used exclusively by `sf data bulk` commands that can resume a bulk ingest operation. + * + * It will set up the oclif/MSO stages, poll for the job status and return the job info. + * */ +export async function bulkIngestResume(opts: { + cmdId: ResumeCommandIDs; + stageTitle: string; + cache: BulkUpdateRequestCache; + jobIdOrMostRecent: string | boolean; + jsonEnabled: boolean; + wait: Duration; +}): Promise { + const resumeOpts = await opts.cache.resolveResumeOptionsFromCache(opts.jobIdOrMostRecent); + + const conn = resumeOpts.options.connection; + + const stages = new BulkIngestStages({ + resume: true, + title: opts.stageTitle, + baseUrl: ensureString(conn.getAuthInfoFields().instanceUrl), + jsonEnabled: opts.jsonEnabled, + }); + + stages.start(); + + const job = conn.bulk2.job('ingest', { + id: resumeOpts.jobInfo.id, + }); + + stages.setupJobListeners(job); + + try { + await job.poll(5000, opts.wait.milliseconds); + + const jobInfo = await job.check(); + + // send last data update so job status/num. of records processed/failed represent the last update + stages.update(jobInfo); + + if (jobInfo.numberRecordsFailed) { + stages.error(); + // TODO: replace this msg to point to `sf data bulk results` when it's added (W-12408034) + throw messages.createError('error.failedRecordDetails', [ + jobInfo.numberRecordsFailed, + conn.getUsername(), + job.id, + ]); + } + + stages.stop(); + + return { + jobId: jobInfo.id, + processedRecords: jobInfo.numberRecordsProcessed, + successfulRecords: jobInfo.numberRecordsProcessed - (jobInfo.numberRecordsFailed ?? 0), + failedRecords: jobInfo.numberRecordsFailed, + }; + } catch (err) { + const jobInfo = await job.check(); + + // send last data update so job status/num. of records processed/failed represent the last update + stages.update(jobInfo); + + if (err instanceof Error && err.name === 'JobPollingTimeout') { + stages.error(); + throw messages.createError('error.timeout', [opts.wait.minutes, opts.cmdId, job.id]); + } + + if (jobInfo.state === 'Failed') { + stages.error(); + throw messages.createError( + 'error.jobFailed', + [jobInfo.errorMessage, conn.getUsername(), job.id], + [], + err as Error + ); + } + + if (jobInfo.state === 'Aborted') { + stages.stop('aborted'); + // TODO: replace this msg to point to `sf data bulk results` when it's added (W-12408034) + throw messages.createError('error.jobAborted', [conn.getUsername(), job.id], [], err as Error); + } + + throw err; + } +} + +/** + * Create an ingest job, upload data and mark it as ready for processing + * + * */ +export async function createIngestJob( + conn: Connection, + csvFile: string, + jobOpts: { + object: string; + operation: JobInfoV2['operation']; + lineEnding: JobInfoV2['lineEnding']; + columnDelimiter: JobInfoV2['columnDelimiter']; + } +): Promise> { + const job = conn.bulk2.createJob(jobOpts); + + // create the job in the org + await job.open(); + + // upload data + await job.uploadData(fs.createReadStream(csvFile)); + + // mark the job to be ready to be processed + await job.close(); + + return job; +} + +export const columnDelimiterFlag = Flags.option({ + summary: messages.getMessage('flags.column-delimiter.summary'), + options: ['BACKQUOTE', 'CARET', 'COMMA', 'PIPE', 'SEMICOLON', 'TAB'] as const, +})(); diff --git a/src/bulkUtils.ts b/src/bulkUtils.ts index f2b28795..2f9814b6 100644 --- a/src/bulkUtils.ts +++ b/src/bulkUtils.ts @@ -6,6 +6,7 @@ */ import { Transform, Readable } from 'node:stream'; +import { createInterface } from 'node:readline'; import { pipeline } from 'node:stream/promises'; import * as fs from 'node:fs'; import { EOL } from 'node:os'; @@ -22,7 +23,7 @@ import { } from '@jsforce/jsforce-node/lib/api/bulk2.js'; import { Parser as csvParse } from 'csv-parse'; import type { Schema } from '@jsforce/jsforce-node'; -import { Connection, Messages } from '@salesforce/core'; +import { Connection, Messages, SfError } from '@salesforce/core'; import { IngestJobV2 } from '@jsforce/jsforce-node/lib/api/bulk2.js'; import { SfCommand, Spinner } from '@salesforce/sf-plugins-core'; import { Duration } from '@salesforce/kit'; @@ -306,3 +307,74 @@ export async function exportRecords( return jobInfo; } + +async function readFirstFiveLines(filePath: string): Promise { + const fileStream = fs.createReadStream(filePath); + + const rl = createInterface({ + input: fileStream, + crlfDelay: Infinity, // Recognizes both CRLF and LF line endings + }); + + const lines = []; + + for await (const line of rl) { + lines.push(line); + if (lines.length === 5) break; + } + + return lines; +} + +export async function detectDelimiter(filePath: string): Promise { + const delimiterMap = new Map(); + delimiterMap.set('`', 'BACKQUOTE'); + delimiterMap.set('^', 'CARET'); + delimiterMap.set(',', 'COMMA'); + delimiterMap.set('|', 'PIPE'); + delimiterMap.set(';', 'SEMICOLON'); + delimiterMap.set(' ', 'TAB'); + + const delimiters = ['`', '^', ',', '|', ';', ' ']; + const delimiterCounts: { [key: string]: number } = {}; + + // Initialize counts + for (const delimiter of delimiters) { + delimiterCounts[delimiter] = 0; + } + + // Read the first few lines of the file + const data = await readFirstFiveLines(filePath); + + data.forEach((line) => { + // Ignore empty lines + if (line.trim() === '') return; + + delimiters.forEach((delimiter) => { + // Use regex to avoid counting delimiters inside quotes + const regexDelimiter = delimiter === '^' || delimiter === '|' ? `\\${delimiter}` : delimiter; + const regex = new RegExp(`(?<=^|[^"'])${regexDelimiter}(?=[^"']*$)`, 'g'); + const count = (line.match(regex) ?? []).length; + delimiterCounts[delimiter] += count; + }); + }); + + // Find the delimiter with the highest count + let detectedDelimiter: string | undefined; + let maxCount = 0; + + for (const [delimiter, count] of Object.entries(delimiterCounts)) { + if (count > maxCount) { + maxCount = count; + detectedDelimiter = delimiter; + } + } + + const columDelimiter = delimiterMap.get(detectedDelimiter ?? ''); + + if (columDelimiter === undefined) { + throw new SfError(`Failed to detect column delimiter used in ${filePath}.`); + } + + return columDelimiter; +} diff --git a/src/commands/data/import/bulk.ts b/src/commands/data/import/bulk.ts index 9b744249..573a0c87 100644 --- a/src/commands/data/import/bulk.ts +++ b/src/commands/data/import/bulk.ts @@ -5,16 +5,10 @@ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause */ -import * as fs from 'node:fs'; -import { platform } from 'node:os'; import { SfCommand, Flags } from '@salesforce/sf-plugins-core'; -import { Connection, Messages, Org } from '@salesforce/core'; -import { Duration } from '@salesforce/kit'; -import { IngestJobV2 } from '@jsforce/jsforce-node/lib/api/bulk2.js'; -import { Schema } from '@jsforce/jsforce-node'; -import { ensureString } from '@salesforce/ts-types'; +import { Messages } from '@salesforce/core'; +import { bulkIngest, columnDelimiterFlag } from '../../../bulkIngest.js'; import { BulkImportRequestCache } from '../../../bulkDataRequestCache.js'; -import { BulkImportStages } from '../../../ux/bulkImportStages.js'; Messages.importMessagesDirectoryFromMetaUrl(import.meta.url); const messages = Messages.loadMessages('@salesforce/plugin-data', 'data.import.bulk'); @@ -61,132 +55,28 @@ export default class DataImportBulk extends SfCommand { dependsOn: ['file'], options: ['CRLF', 'LF'] as const, })(), + 'column-delimiter': columnDelimiterFlag, }; public async run(): Promise { const { flags } = await this.parse(DataImportBulk); - const conn = flags['target-org'].getConnection(flags['api-version']); - - const timeout = flags.async ? Duration.minutes(0) : flags.wait ?? Duration.minutes(0); - const async = timeout.milliseconds === 0; - - const baseUrl = flags['target-org'].getField(Org.Fields.INSTANCE_URL).toString(); - - const stages = new BulkImportStages({ - resume: false, - title: async ? 'Importing data (async)' : 'Importing data', - baseUrl, + return bulkIngest({ + resumeCmdId: 'data import resume', + stageTitle: 'Importing data', + object: flags.sobject, + operation: 'insert', + lineEnding: flags['line-ending'], + columnDelimiter: flags['column-delimiter'], + conn: flags['target-org'].getConnection(flags['api-version']), + cache: await BulkImportRequestCache.create(), + async: flags.async, + wait: flags.wait, + file: flags.file, jsonEnabled: this.jsonEnabled(), + logFn: (...args) => { + this.log(...args); + }, }); - - stages.start(); - - if (async) { - const job = await createIngestJob(conn, flags.file, flags.sobject, flags['line-ending']); - - stages.update(job.getInfo()); - - stages.stop(); - - const cache = await BulkImportRequestCache.create(); - await cache.createCacheEntryForRequest(job.id, ensureString(conn.getUsername()), conn.getApiVersion()); - - this.log(messages.getMessage('export.resume', [job.id])); - - return { - jobId: job.id, - }; - } - - // synchronous flow - const job = await createIngestJob(conn, flags.file, flags.sobject, flags['line-ending']); - - stages.setupJobListeners(job); - stages.processingJob(); - - try { - await job.poll(5000, timeout.milliseconds); - - const jobInfo = job.getInfo(); - - // send last data update so job status/num. of records processed/failed represent the last update - stages.update(jobInfo); - - if (jobInfo.numberRecordsFailed) { - stages.error(); - // TODO: replace this msg to point to `sf data bulk results` when it's added (W-12408034) - throw messages.createError('error.failedRecordDetails', [ - jobInfo.numberRecordsFailed, - conn.getUsername(), - job.id, - ]); - } - - stages.stop(); - - return { - jobId: jobInfo.id, - processedRecords: jobInfo.numberRecordsProcessed, - successfulRecords: jobInfo.numberRecordsProcessed - (jobInfo.numberRecordsFailed ?? 0), - failedRecords: jobInfo.numberRecordsFailed, - }; - } catch (err) { - const jobInfo = await job.check(); - - // send last data update so job status/num. of records processed/failed represent the last update - stages.update(jobInfo); - - if (err instanceof Error && err.name === 'JobPollingTimeout') { - stages.stop(); - throw messages.createError('error.timeout', [timeout.minutes, job.id]); - } - - if (jobInfo.state === 'Failed') { - stages.error(); - throw messages.createError( - 'error.jobFailed', - [jobInfo.errorMessage, conn.getUsername(), job.id], - [], - err as Error - ); - } - - if (jobInfo.state === 'Aborted') { - stages.error(); - // TODO: replace this msg to point to `sf data bulk results` when it's added (W-12408034) - throw messages.createError('error.jobAborted', [conn.getUsername(), job.id], [], err as Error); - } - - throw err; - } } } - -/** - * Create an ingest job, upload data and mark it as ready for processing - * - * */ -async function createIngestJob( - conn: Connection, - csvFile: string, - object: string, - lineEnding: 'CRLF' | 'LF' | undefined -): Promise> { - const job = conn.bulk2.createJob({ - operation: 'insert', - lineEnding: lineEnding ?? platform() === 'win32' ? 'CRLF' : 'LF', - object, - }); - - // create the job in the org - await job.open(); - - // upload data - await job.uploadData(fs.createReadStream(csvFile)); - - // mark the job to be ready to be processed - await job.close(); - - return job; -} diff --git a/src/commands/data/import/resume.ts b/src/commands/data/import/resume.ts index 224428f2..da117107 100644 --- a/src/commands/data/import/resume.ts +++ b/src/commands/data/import/resume.ts @@ -7,9 +7,8 @@ import { SfCommand, Flags } from '@salesforce/sf-plugins-core'; import { Messages } from '@salesforce/core'; -import { ensureString } from '@salesforce/ts-types'; import { BulkImportRequestCache } from '../../../bulkDataRequestCache.js'; -import { BulkImportStages } from '../../../ux/bulkImportStages.js'; +import { bulkIngestResume } from '../../../bulkIngest.js'; Messages.importMessagesDirectoryFromMetaUrl(import.meta.url); const messages = Messages.loadMessages('@salesforce/plugin-data', 'data.import.resume'); @@ -49,81 +48,13 @@ export default class DataImportResume extends SfCommand public async run(): Promise { const { flags } = await this.parse(DataImportResume); - const cache = await BulkImportRequestCache.create(); - - const resumeOpts = await cache.resolveResumeOptionsFromCache(flags['job-id'] ?? flags['use-most-recent']); - - const conn = resumeOpts.options.connection; - - const stages = new BulkImportStages({ - resume: true, - title: 'Importing data', - baseUrl: ensureString(conn.getAuthInfoFields().instanceUrl), + return bulkIngestResume({ + cmdId: 'data import resume', + stageTitle: 'Updating data', + cache: await BulkImportRequestCache.create(), + jobIdOrMostRecent: flags['job-id'] ?? flags['use-most-recent'], jsonEnabled: this.jsonEnabled(), + wait: flags.wait, }); - - stages.start(); - - const job = conn.bulk2.job('ingest', { - id: resumeOpts.jobInfo.id, - }); - - stages.setupJobListeners(job); - - try { - await job.poll(5000, flags.wait.milliseconds); - - const jobInfo = await job.check(); - - // send last data update so job status/num. of records processed/failed represent the last update - stages.update(jobInfo); - - if (jobInfo.numberRecordsFailed) { - stages.error(); - // TODO: replace this msg to point to `sf data bulk results` when it's added (W-12408034) - throw messages.createError('error.failedRecordDetails', [ - jobInfo.numberRecordsFailed, - conn.getUsername(), - job.id, - ]); - } - - stages.stop(); - - return { - jobId: jobInfo.id, - processedRecords: jobInfo.numberRecordsProcessed, - successfulRecords: jobInfo.numberRecordsProcessed - (jobInfo.numberRecordsFailed ?? 0), - failedRecords: jobInfo.numberRecordsFailed, - }; - } catch (err) { - const jobInfo = await job.check(); - - // send last data update so job status/num. of records processed/failed represent the last update - stages.update(jobInfo); - - if (err instanceof Error && err.name === 'JobPollingTimeout') { - stages.error(); - throw messages.createError('error.timeout', [flags.wait.minutes, job.id]); - } - - if (jobInfo.state === 'Failed') { - stages.error(); - throw messages.createError( - 'error.jobFailed', - [jobInfo.errorMessage, conn.getUsername(), job.id], - [], - err as Error - ); - } - - if (jobInfo.state === 'Aborted') { - stages.error(); - // TODO: replace this msg to point to `sf data bulk results` when it's added (W-12408034) - throw messages.createError('error.jobAborted', [conn.getUsername(), job.id], [], err as Error); - } - - throw err; - } } } diff --git a/src/commands/data/update/bulk.ts b/src/commands/data/update/bulk.ts new file mode 100644 index 00000000..fc8fcd4b --- /dev/null +++ b/src/commands/data/update/bulk.ts @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2024, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +import { SfCommand, Flags } from '@salesforce/sf-plugins-core'; +import { Messages } from '@salesforce/core'; +import { bulkIngest, columnDelimiterFlag } from '../../../bulkIngest.js'; +import { BulkUpdateRequestCache } from '../../../bulkDataRequestCache.js'; + +Messages.importMessagesDirectoryFromMetaUrl(import.meta.url); +const messages = Messages.loadMessages('@salesforce/plugin-data', 'data.update.bulk'); + +export type DataUpdateBulkResult = { + jobId: string; + processedRecords?: number; + successfulRecords?: number; + failedRecords?: number; +}; + +export default class DataUpdateBulk extends SfCommand { + public static readonly summary = messages.getMessage('summary'); + public static readonly description = messages.getMessage('description'); + public static readonly examples = messages.getMessages('examples'); + + public static readonly flags = { + async: Flags.boolean({ + summary: messages.getMessage('flags.async.summary'), + char: 'a', + }), + wait: Flags.duration({ + summary: messages.getMessage('flags.wait.summary'), + char: 'w', + unit: 'minutes', + }), + file: Flags.file({ + summary: messages.getMessage('flags.file.summary'), + char: 'f', + required: true, + exists: true, + }), + sobject: Flags.string({ + summary: messages.getMessage('flags.sobject.summary'), + char: 's', + required: true, + }), + 'api-version': Flags.orgApiVersion(), + 'target-org': Flags.requiredOrg(), + 'line-ending': Flags.option({ + summary: messages.getMessage('flags.line-ending.summary'), + dependsOn: ['file'], + options: ['CRLF', 'LF'] as const, + })(), + 'column-delimiter': columnDelimiterFlag, + }; + + public async run(): Promise { + const { flags } = await this.parse(DataUpdateBulk); + + return bulkIngest({ + resumeCmdId: 'data update resume', + stageTitle: 'Updating data', + object: flags.sobject, + operation: 'update', + lineEnding: flags['line-ending'], + columnDelimiter: flags['column-delimiter'], + conn: flags['target-org'].getConnection(flags['api-version']), + cache: await BulkUpdateRequestCache.create(), + async: flags.async, + wait: flags.wait, + file: flags.file, + jsonEnabled: this.jsonEnabled(), + logFn: (...args) => { + this.log(...args); + }, + }); + } +} diff --git a/src/commands/data/update/resume.ts b/src/commands/data/update/resume.ts new file mode 100644 index 00000000..c61b25f0 --- /dev/null +++ b/src/commands/data/update/resume.ts @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2024, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +import { SfCommand, Flags } from '@salesforce/sf-plugins-core'; +import { Messages } from '@salesforce/core'; +import { BulkUpdateRequestCache } from '../../../bulkDataRequestCache.js'; +import { bulkIngestResume } from '../../../bulkIngest.js'; + +Messages.importMessagesDirectoryFromMetaUrl(import.meta.url); +const messages = Messages.loadMessages('@salesforce/plugin-data', 'data.update.resume'); + +export type DataUpdateResumeResult = { + jobId: string; + processedRecords?: number; + successfulRecords?: number; + failedRecords?: number; +}; + +export default class DataUpdateResume extends SfCommand { + public static readonly summary = messages.getMessage('summary'); + public static readonly description = messages.getMessage('description'); + public static readonly examples = messages.getMessages('examples'); + + public static readonly flags = { + 'use-most-recent': Flags.boolean({ + summary: messages.getMessage('flags.use-most-recent.summary'), + exactlyOne: ['job-id'], + }), + 'job-id': Flags.salesforceId({ + summary: messages.getMessage('flags.job-id.summary'), + char: 'i', + length: 18, + startsWith: '750', + exactlyOne: ['use-most-recent'], + }), + wait: Flags.duration({ + char: 'w', + unit: 'minutes', + summary: messages.getMessage('flags.wait.summary'), + defaultValue: 5, + }), + }; + + public async run(): Promise { + const { flags } = await this.parse(DataUpdateResume); + + return bulkIngestResume({ + cmdId: 'data update resume', + stageTitle: 'Updating data', + cache: await BulkUpdateRequestCache.create(), + jobIdOrMostRecent: flags['job-id'] ?? flags['use-most-recent'], + jsonEnabled: this.jsonEnabled(), + wait: flags.wait, + }); + } +} diff --git a/src/ux/bulkImportStages.ts b/src/ux/bulkIngestStages.ts similarity index 94% rename from src/ux/bulkImportStages.ts rename to src/ux/bulkIngestStages.ts index cf030a40..6344c616 100644 --- a/src/ux/bulkImportStages.ts +++ b/src/ux/bulkIngestStages.ts @@ -9,6 +9,7 @@ import { MultiStageOutput } from '@oclif/multi-stage-output'; import { IngestJobV2, JobInfoV2 } from '@jsforce/jsforce-node/lib/api/bulk2.js'; import { Schema } from '@jsforce/jsforce-node'; import terminalLink from 'terminal-link'; +import { StageStatus } from 'node_modules/@oclif/multi-stage-output/lib/stage-tracker.js'; type Options = { resume: boolean; @@ -17,7 +18,7 @@ type Options = { jsonEnabled: boolean; }; -export class BulkImportStages { +export class BulkIngestStages { private mso: MultiStageOutput; private resume: boolean; @@ -110,8 +111,8 @@ export class BulkImportStages { this.mso.updateData(data); } - public stop(): void { - this.mso.stop(); + public stop(finalStatus?: StageStatus): void { + this.mso.stop(finalStatus); } public error(): void { diff --git a/test/bulkUtils.test.ts b/test/bulkUtils.test.ts index 01b2c749..afd14513 100644 --- a/test/bulkUtils.test.ts +++ b/test/bulkUtils.test.ts @@ -7,9 +7,19 @@ import { expect } from 'chai'; -import { remainingTime } from '../src/bulkUtils.js'; +import { remainingTime, detectDelimiter } from '../src/bulkUtils.js'; describe('bulkUtils', () => { + describe('csv', () => { + it('detects column separator', async () => { + expect(await detectDelimiter('./test/test-files/csv/backquote.csv')).to.equal('BACKQUOTE'); + expect(await detectDelimiter('./test/test-files/csv/caret.csv')).to.equal('CARET'); + expect(await detectDelimiter('./test/test-files/csv/comma.csv')).to.equal('COMMA'); + expect(await detectDelimiter('./test/test-files/csv/pipe.csv')).to.equal('PIPE'); + expect(await detectDelimiter('./test/test-files/csv/semicolon.csv')).to.equal('SEMICOLON'); + expect(await detectDelimiter('./test/test-files/csv/tab.csv')).to.equal('TAB'); + }); + }); describe('remainingTime', () => { it('returns the remaining time when endWaitTime is defined', () => { const now = Date.now(); diff --git a/test/commands/data/dataBulk.nut.ts b/test/commands/data/dataBulk.nut.ts index f07a9872..c305e3a0 100644 --- a/test/commands/data/dataBulk.nut.ts +++ b/test/commands/data/dataBulk.nut.ts @@ -14,7 +14,7 @@ import { sleep } from '@salesforce/kit'; import { ensurePlainObject } from '@salesforce/ts-types'; import type { SaveResult } from '@jsforce/jsforce-node'; import { BulkResultV2 } from '../../../src/types.js'; -import { QueryResult } from './dataSoqlQuery.nut.js'; +import { QueryResult } from '../data/query/query.nut.js'; chaiConfig.truncateThreshold = 0; diff --git a/test/commands/data/export/bulk.nut.ts b/test/commands/data/export/bulk.nut.ts index 1595fcbb..d898d3d4 100644 --- a/test/commands/data/export/bulk.nut.ts +++ b/test/commands/data/export/bulk.nut.ts @@ -47,43 +47,34 @@ describe('data export bulk NUTs', () => { const outputFile = 'export-accounts.csv'; const command = `data export bulk -q "${soqlQuery}" --output-file ${outputFile} --wait 10 --json`; - // about the type assertion at the end: - // I'm passing `--json` in and `ensureExitCode: 0` so I should always have a JSON result. - const result = execCmd(command, { ensureExitCode: 0 }).jsonOutput - ?.result as DataExportBulkResult; + const result = execCmd(command, { ensureExitCode: 0 }).jsonOutput?.result; - expect(result.totalSize).to.equal(totalAccountRecords); - expect(result.filePath).to.equal(outputFile); + expect(result?.totalSize).to.equal(totalAccountRecords); + expect(result?.filePath).to.equal(outputFile); - await validateCsv(path.join(session.dir, 'data-project', outputFile), 'COMMA', ensureNumber(result.totalSize)); + await validateCsv(path.join(session.dir, 'data-project', outputFile), 'COMMA', ensureNumber(result?.totalSize)); }); it('should export records in csv format with PIPE delimiter', async () => { const outputFile = 'export-accounts.csv'; const command = `data export bulk -q "${soqlQuery}" --output-file ${outputFile} --wait 10 --column-delimiter PIPE --json`; - // about the type assertion at the end: - // I'm passing `--json` in and `ensureExitCode: 0` so I should always have a JSON result. - const result = execCmd(command, { ensureExitCode: 0 }).jsonOutput - ?.result as DataExportBulkResult; + const result = execCmd(command, { ensureExitCode: 0 }).jsonOutput?.result; - expect(result.totalSize).to.equal(totalAccountRecords); - expect(result.filePath).to.equal(outputFile); + expect(result?.totalSize).to.equal(totalAccountRecords); + expect(result?.filePath).to.equal(outputFile); - await validateCsv(path.join(session.dir, 'data-project', outputFile), 'PIPE', ensureNumber(result.totalSize)); + await validateCsv(path.join(session.dir, 'data-project', outputFile), 'PIPE', ensureNumber(result?.totalSize)); }); it('should export records in json format', async () => { const outputFile = 'export-accounts.json'; const command = `data export bulk -q "${soqlQuery}" --output-file ${outputFile} --wait 10 --result-format json --json`; - // about the type assertion at the end: - // I'm passing `--json` in and `ensureExitCode: 0` so I should always have a JSON result. - const result = execCmd(command, { ensureExitCode: 0 }).jsonOutput - ?.result as DataExportBulkResult; + const result = execCmd(command, { ensureExitCode: 0 }).jsonOutput?.result; - expect(result.totalSize).to.equal(totalAccountRecords); - expect(result.filePath).to.equal(outputFile); + expect(result?.totalSize).to.equal(totalAccountRecords); + expect(result?.filePath).to.equal(outputFile); await validateJson(path.join(session.dir, 'data-project', outputFile), ensureNumber(totalAccountRecords)); }); diff --git a/test/commands/data/export/resume.nut.ts b/test/commands/data/export/resume.nut.ts index 58671eb1..8725bcc0 100644 --- a/test/commands/data/export/resume.nut.ts +++ b/test/commands/data/export/resume.nut.ts @@ -48,26 +48,23 @@ describe('data export resume NUTs', () => { const outputFile = 'export-accounts.csv'; const command = `data export bulk -q "${soqlQuery}" --output-file ${outputFile} --async --json`; - // about the type assertion at the end: - // I'm passing `--json` in and `ensureExitCode: 0` so I should always have a JSON result. - const exportAsyncResult = execCmd(command, { ensureExitCode: 0 }).jsonOutput - ?.result as DataExportBulkResult; + const exportAsyncResult = execCmd(command, { ensureExitCode: 0 }).jsonOutput?.result; - expect(exportAsyncResult.jobId).to.be.length(18); - expect(exportAsyncResult.filePath).to.equal(outputFile); + expect(exportAsyncResult?.jobId).to.be.length(18); + expect(exportAsyncResult?.filePath).to.equal(outputFile); const exportResumeResult = execCmd( - `data export resume -i ${exportAsyncResult.jobId} --json`, + `data export resume -i ${exportAsyncResult?.jobId} --json`, { ensureExitCode: 0 } - ).jsonOutput?.result as DataExportResumeResult; + ).jsonOutput?.result; - expect(exportResumeResult.totalSize).to.be.equal(totalAccountRecords); - expect(exportResumeResult.filePath).to.equal(outputFile); + expect(exportResumeResult?.totalSize).to.be.equal(totalAccountRecords); + expect(exportResumeResult?.filePath).to.equal(outputFile); await validateCsv( path.join(session.dir, 'data-project', outputFile), 'COMMA', - ensureNumber(exportResumeResult.totalSize) + ensureNumber(exportResumeResult?.totalSize) ); }); @@ -75,21 +72,18 @@ describe('data export resume NUTs', () => { const outputFile = 'export-accounts.json'; const command = `data export bulk -q "${soqlQuery}" --output-file ${outputFile} --async --result-format json --json`; - // about the type assertion at the end: - // I'm passing `--json` in and `ensureExitCode: 0` so I should always have a JSON result. - const exportAsyncResult = execCmd(command, { ensureExitCode: 0 }).jsonOutput - ?.result as DataExportBulkResult; + const exportAsyncResult = execCmd(command, { ensureExitCode: 0 }).jsonOutput?.result; - expect(exportAsyncResult.jobId).to.be.length(18); - expect(exportAsyncResult.filePath).to.equal(outputFile); + expect(exportAsyncResult?.jobId).to.be.length(18); + expect(exportAsyncResult?.filePath).to.equal(outputFile); const exportResumeResult = execCmd( - `data export resume -i ${exportAsyncResult.jobId} --json`, + `data export resume -i ${exportAsyncResult?.jobId} --json`, { ensureExitCode: 0 } - ).jsonOutput?.result as DataExportResumeResult; + ).jsonOutput?.result; - expect(exportResumeResult.totalSize).to.be.equal(totalAccountRecords); - expect(exportResumeResult.filePath).to.equal(outputFile); + expect(exportResumeResult?.totalSize).to.be.equal(totalAccountRecords); + expect(exportResumeResult?.filePath).to.equal(outputFile); await validateJson(path.join(session.dir, 'data-project', outputFile), ensureNumber(totalAccountRecords)); }); diff --git a/test/commands/data/import/bulk.nut.ts b/test/commands/data/import/bulk.nut.ts index 22506088..c54777e4 100644 --- a/test/commands/data/import/bulk.nut.ts +++ b/test/commands/data/import/bulk.nut.ts @@ -8,7 +8,7 @@ import path from 'node:path'; import { execCmd, TestSession } from '@salesforce/cli-plugins-testkit'; import { expect } from 'chai'; import { DataImportBulkResult } from '../../../../src/commands/data/import/bulk.js'; -import { generateAccountsCsv } from './resume.nut.js'; +import { generateAccountsCsv } from '../../../testUtil.js'; describe('data import bulk NUTs', () => { let session: TestSession; @@ -36,13 +36,28 @@ describe('data import bulk NUTs', () => { const result = execCmd( `data import bulk --file ${csvFile} --sobject Account --wait 10 --json`, { ensureExitCode: 0 } - ).jsonOutput?.result as DataImportBulkResult; + ).jsonOutput?.result; - expect(result.jobId).not.be.undefined; - expect(result.jobId.length).to.equal(18); - expect(result.processedRecords).to.equal(10_000); - expect(result.successfulRecords).to.equal(10_000); - expect(result.failedRecords).to.equal(0); + expect(result?.jobId).not.be.undefined; + expect(result?.jobId.length).to.equal(18); + expect(result?.processedRecords).to.equal(10_000); + expect(result?.successfulRecords).to.equal(10_000); + expect(result?.failedRecords).to.equal(0); + }); + + it('should import account records in csv format with PIPE delimiter', async () => { + const csvFile = await generateAccountsCsv(session.dir, 'PIPE'); + + const result = execCmd( + `data import bulk --file ${csvFile} --sobject Account --wait 10 --json`, + { ensureExitCode: 0 } + ).jsonOutput?.result; + + expect(result?.jobId).not.be.undefined; + expect(result?.jobId.length).to.equal(18); + expect(result?.processedRecords).to.equal(10_000); + expect(result?.successfulRecords).to.equal(10_000); + expect(result?.failedRecords).to.equal(0); }); it('should report error msg from a failed job', async () => { diff --git a/test/commands/data/import/resume.nut.ts b/test/commands/data/import/resume.nut.ts index 0b6a667b..1098ac1e 100644 --- a/test/commands/data/import/resume.nut.ts +++ b/test/commands/data/import/resume.nut.ts @@ -5,11 +5,11 @@ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause */ import path from 'node:path'; -import { writeFile } from 'node:fs/promises'; -import { EOL } from 'node:os'; import { execCmd, TestSession } from '@salesforce/cli-plugins-testkit'; import { expect } from 'chai'; +import { ensureString } from '@salesforce/ts-types'; import { DataImportBulkResult } from '../../../../src/commands/data/import/bulk.js'; +import { generateAccountsCsv, validateCacheFile } from '../../../testUtil.js'; describe('data import resume NUTs', () => { let session: TestSession; @@ -34,24 +34,26 @@ describe('data import resume NUTs', () => { it('should resume bulk import via --job-id', async () => { const csvFile = await generateAccountsCsv(session.dir); - // about the type assertion at the end: - // I'm passing `--json` in and `ensureExitCode: 0` so I should always have a JSON result. - const exportAsyncResult = execCmd( + const importAsyncRes = execCmd( `data import bulk --file ${csvFile} --sobject account --async --json`, { ensureExitCode: 0 } - ).jsonOutput?.result as DataImportBulkResult; + ).jsonOutput?.result; - expect(exportAsyncResult.jobId).not.to.be.undefined; - expect(exportAsyncResult.jobId).to.be.length(18); + expect(importAsyncRes?.jobId).not.to.be.undefined; + expect(importAsyncRes?.jobId).to.be.length(18); - const importResumeResult = execCmd( - `data import resume -i ${exportAsyncResult.jobId} --json`, - { ensureExitCode: 0 } - ).jsonOutput?.result as DataImportBulkResult; + await validateCacheFile( + path.join(session.homeDir, '.sf', 'bulk-data-import-cache.json'), + ensureString(importAsyncRes?.jobId) + ); + + const importResumeResult = execCmd(`data import resume -i ${importAsyncRes?.jobId} --json`, { + ensureExitCode: 0, + }).jsonOutput?.result; - expect(importResumeResult.processedRecords).to.equal(10_000); - expect(importResumeResult.successfulRecords).to.equal(10_000); - expect(importResumeResult.failedRecords).to.equal(0); + expect(importResumeResult?.processedRecords).to.equal(10_000); + expect(importResumeResult?.successfulRecords).to.equal(10_000); + expect(importResumeResult?.failedRecords).to.equal(0); }); it('should resume bulk import via--use-most-recent', async () => { @@ -59,47 +61,23 @@ describe('data import resume NUTs', () => { const command = `data import bulk --file ${csvFile} --sobject account --async --json`; - // about the type assertion at the end: - // I'm passing `--json` in and `ensureExitCode: 0` so I should always have a JSON result. - const exportAsyncResult = execCmd(command, { ensureExitCode: 0 }).jsonOutput - ?.result as DataImportBulkResult; + const exportAsyncResult = execCmd(command, { ensureExitCode: 0 }).jsonOutput?.result; - expect(exportAsyncResult.jobId).not.to.be.undefined; - expect(exportAsyncResult.jobId).to.be.length(18); + expect(exportAsyncResult?.jobId).not.to.be.undefined; + expect(exportAsyncResult?.jobId).to.be.length(18); const importResumeResult = execCmd('data import resume --use-most-recent --json', { ensureExitCode: 0, - }).jsonOutput?.result as DataImportBulkResult; + }).jsonOutput?.result; - expect(importResumeResult.jobId).not.to.be.undefined; - expect(importResumeResult.jobId).to.be.length(18); + expect(importResumeResult?.jobId).not.to.be.undefined; + expect(importResumeResult?.jobId).to.be.length(18); // validate the cache is returning the job ID from the last async import - expect(importResumeResult.jobId).to.equal(exportAsyncResult.jobId); + expect(importResumeResult?.jobId).to.equal(exportAsyncResult?.jobId); - expect(importResumeResult.processedRecords).to.equal(10_000); - expect(importResumeResult.successfulRecords).to.equal(10_000); - expect(importResumeResult.failedRecords).to.equal(0); + expect(importResumeResult?.processedRecords).to.equal(10_000); + expect(importResumeResult?.successfulRecords).to.equal(10_000); + expect(importResumeResult?.failedRecords).to.equal(0); }); }); - -/** - * Generates a CSV file with 10_000 account records to insert - * - * Each `Account.name` field has a unique timestamp for idempotent runs. - */ -export async function generateAccountsCsv(savePath: string): Promise { - const id = Date.now(); - - let csv = 'NAME,TYPE,PHONE,WEBSITE' + EOL; - - for (let i = 1; i <= 10_000; i++) { - csv += `account ${id} #${i},Account,415-555-0000,http://www.accountImport${i}.com${EOL}`; - } - - const accountsCsv = path.join(savePath, 'bulkImportAccounts1.csv'); - - await writeFile(accountsCsv, csv); - - return accountsCsv; -} diff --git a/test/commands/data/dataSoqlQuery.nut.ts b/test/commands/data/query/query.nut.ts similarity index 100% rename from test/commands/data/dataSoqlQuery.nut.ts rename to test/commands/data/query/query.nut.ts diff --git a/test/commands/data/tree/dataTree.nut.ts b/test/commands/data/tree/dataTree.nut.ts index c8c685f3..90557003 100644 --- a/test/commands/data/tree/dataTree.nut.ts +++ b/test/commands/data/tree/dataTree.nut.ts @@ -8,7 +8,7 @@ import path from 'node:path'; import { expect } from 'chai'; import { execCmd, TestSession } from '@salesforce/cli-plugins-testkit'; import { Dictionary, get, getString } from '@salesforce/ts-types'; -import { QueryResult } from '../dataSoqlQuery.nut.js'; +import { QueryResult } from '../../data/query/query.nut.js'; describe('data:tree commands', () => { let testSession: TestSession; diff --git a/test/commands/data/tree/dataTreeCommonChild.nut.ts b/test/commands/data/tree/dataTreeCommonChild.nut.ts index 247537f2..bd96ae32 100644 --- a/test/commands/data/tree/dataTreeCommonChild.nut.ts +++ b/test/commands/data/tree/dataTreeCommonChild.nut.ts @@ -8,7 +8,7 @@ import path from 'node:path'; import { expect } from 'chai'; import { execCmd, TestSession } from '@salesforce/cli-plugins-testkit'; import { ImportResult } from '../../../../src/api/data/tree/importTypes.js'; -import { QueryResult } from '../dataSoqlQuery.nut.js'; +import { QueryResult } from '../query/query.nut.js'; describe('data:tree commands with a polymorphic whatId (on tasks) shared between multiple parents', () => { let testSession: TestSession; diff --git a/test/commands/data/tree/dataTreeDeep.nut.ts b/test/commands/data/tree/dataTreeDeep.nut.ts index ed8ee358..479eb8c4 100644 --- a/test/commands/data/tree/dataTreeDeep.nut.ts +++ b/test/commands/data/tree/dataTreeDeep.nut.ts @@ -8,7 +8,7 @@ import path from 'node:path'; import { expect } from 'chai'; import { execCmd, TestSession } from '@salesforce/cli-plugins-testkit'; import { Dictionary, get, getString } from '@salesforce/ts-types'; -import { QueryResult } from '../dataSoqlQuery.nut.js'; +import { QueryResult } from '../query/query.nut.js'; describe('data:tree commands with more than 2 levels', () => { let testSession: TestSession; diff --git a/test/commands/data/tree/dataTreeDeepBeta.nut.ts b/test/commands/data/tree/dataTreeDeepBeta.nut.ts index 2a9d2ece..27f3d0e9 100644 --- a/test/commands/data/tree/dataTreeDeepBeta.nut.ts +++ b/test/commands/data/tree/dataTreeDeepBeta.nut.ts @@ -8,7 +8,7 @@ import path from 'node:path'; import { expect } from 'chai'; import { execCmd, TestSession } from '@salesforce/cli-plugins-testkit'; import { Dictionary, get, getString } from '@salesforce/ts-types'; -import { QueryResult } from '../dataSoqlQuery.nut.js'; +import { QueryResult } from '../query/query.nut.js'; describe('data:tree beta commands with more than 2 levels', () => { const prefix = 'DEEP'; diff --git a/test/commands/data/tree/dataTreeMoreThan200.nut.ts b/test/commands/data/tree/dataTreeMoreThan200.nut.ts index 8352800d..03c703dc 100644 --- a/test/commands/data/tree/dataTreeMoreThan200.nut.ts +++ b/test/commands/data/tree/dataTreeMoreThan200.nut.ts @@ -8,7 +8,7 @@ import path from 'node:path'; import { expect } from 'chai'; import { execCmd, TestSession } from '@salesforce/cli-plugins-testkit'; import { ImportResult } from '../../../../src/api/data/tree/importTypes.js'; -import { QueryResult } from '../dataSoqlQuery.nut.js'; +import { QueryResult } from '../query/query.nut.js'; describe('data:tree commands with more than 200 records are batches in safe groups', () => { let testSession: TestSession; diff --git a/test/commands/data/tree/dataTreeSelfReferencing.nut.ts b/test/commands/data/tree/dataTreeSelfReferencing.nut.ts index cc95af31..15bf6de3 100644 --- a/test/commands/data/tree/dataTreeSelfReferencing.nut.ts +++ b/test/commands/data/tree/dataTreeSelfReferencing.nut.ts @@ -7,7 +7,7 @@ import path from 'node:path'; import { expect } from 'chai'; import { execCmd, TestSession } from '@salesforce/cli-plugins-testkit'; -import { QueryResult } from '../dataSoqlQuery.nut.js'; +import { QueryResult } from '../query/query.nut.js'; describe('data:tree commands with records that refer to other records of the same type in the same file', () => { let testSession: TestSession; diff --git a/test/commands/data/update/bulk.nut.ts b/test/commands/data/update/bulk.nut.ts new file mode 100644 index 00000000..12a1134f --- /dev/null +++ b/test/commands/data/update/bulk.nut.ts @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2024, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ +import path from 'node:path'; +import { execCmd, TestSession } from '@salesforce/cli-plugins-testkit'; +import { expect } from 'chai'; +import { Org } from '@salesforce/core'; +import { ensureString } from '@salesforce/ts-types'; +import { generateUpdatedCsv, generateAccountsCsv } from '../../../testUtil.js'; +import { DataUpdateBulkResult } from '../../../../src/commands/data/update/bulk.js'; +import { DataImportBulkResult } from '../../../../src/commands/data/import/bulk.js'; + +describe('data update bulk NUTs', () => { + let session: TestSession; + + before(async () => { + session = await TestSession.create({ + scratchOrgs: [ + { + config: 'config/project-scratch-def.json', + setDefault: true, + }, + ], + project: { sourceDir: path.join('test', 'test-files', 'data-project') }, + devhubAuthStrategy: 'AUTO', + }); + }); + + after(async () => { + await session?.clean(); + }); + + it('should bulk update account records', async () => { + const csvFile = await generateAccountsCsv(session.dir); + + const result = execCmd( + `data import bulk --file ${csvFile} --sobject Account --wait 10 --json`, + { ensureExitCode: 0 } + ).jsonOutput?.result; + + const conn = ( + await Org.create({ + aliasOrUsername: session.orgs.get('default')?.username, + }) + ).getConnection(); + + const importJob = conn.bulk2.job('ingest', { + id: ensureString(result?.jobId), + }); + + const successfulIds = (await importJob.getSuccessfulResults()).map((r) => r.sf__Id); + + const updatedCsv = await generateUpdatedCsv( + csvFile, + successfulIds, + path.join(session.dir, 'data-project', 'updated.csv') + ); + + const dataUpdateResult = execCmd( + `data update bulk --file ${updatedCsv} --sobject account --wait 10 --json`, + { ensureExitCode: 0 } + ).jsonOutput?.result; + + expect(dataUpdateResult?.processedRecords).to.equal(10_000); + expect(dataUpdateResult?.successfulRecords).to.equal(10_000); + expect(dataUpdateResult?.failedRecords).to.equal(0); + }); +}); diff --git a/test/commands/data/update/resume.nut.ts b/test/commands/data/update/resume.nut.ts new file mode 100644 index 00000000..1acd927c --- /dev/null +++ b/test/commands/data/update/resume.nut.ts @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2024, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ +import path from 'node:path'; +import { execCmd, TestSession } from '@salesforce/cli-plugins-testkit'; +import { expect } from 'chai'; +import { Org } from '@salesforce/core'; +import { ensureString } from '@salesforce/ts-types'; +import { generateUpdatedCsv, generateAccountsCsv, validateCacheFile } from '../../../testUtil.js'; +import { DataUpdateBulkResult } from '../../../../src/commands/data/update/bulk.js'; +import { DataImportBulkResult } from '../../../../src/commands/data/import/bulk.js'; + +describe('data update resume NUTs', () => { + let session: TestSession; + + before(async () => { + session = await TestSession.create({ + scratchOrgs: [ + { + config: 'config/project-scratch-def.json', + setDefault: true, + }, + ], + project: { sourceDir: path.join('test', 'test-files', 'data-project') }, + devhubAuthStrategy: 'AUTO', + }); + }); + + after(async () => { + await session?.clean(); + }); + + it('should resume bulk udpate via--use-most-recent', async () => { + const csvFile = await generateAccountsCsv(session.dir); + + const result = execCmd( + `data import bulk --file ${csvFile} --sobject Account --wait 10 --json`, + { ensureExitCode: 0 } + ).jsonOutput?.result; + + const conn = ( + await Org.create({ + aliasOrUsername: session.orgs.get('default')?.username, + }) + ).getConnection(); + + const importJob = conn.bulk2.job('ingest', { + id: ensureString(result?.jobId), + }); + + const successfulIds = (await importJob.getSuccessfulResults()).map((r) => r.sf__Id); + + const updatedCsv = await generateUpdatedCsv( + csvFile, + successfulIds, + path.join(session.dir, 'data-project', 'updated.csv') + ); + + const dataUpdateAsyncRes = execCmd( + `data update bulk --file ${updatedCsv} --sobject account --async --json`, + { ensureExitCode: 0 } + ).jsonOutput?.result; + + expect(dataUpdateAsyncRes?.jobId).to.be.length(18); + + await validateCacheFile( + path.join(session.homeDir, '.sf', 'bulk-data-update-cache.json'), + ensureString(dataUpdateAsyncRes?.jobId) + ); + + const dataUpdateResumeRes = execCmd( + `data update resume -i ${ensureString(dataUpdateAsyncRes?.jobId)} --wait 10 --json`, + { ensureExitCode: 0 } + ).jsonOutput?.result; + + expect(dataUpdateResumeRes?.processedRecords).to.equal(10_000); + expect(dataUpdateResumeRes?.successfulRecords).to.equal(10_000); + expect(dataUpdateResumeRes?.failedRecords).to.equal(0); + }); +}); diff --git a/test/commands/force/data/bulk/dataBulk.nut.ts b/test/commands/force/data/bulk/dataBulk.nut.ts index 7a548764..bea7e718 100644 --- a/test/commands/force/data/bulk/dataBulk.nut.ts +++ b/test/commands/force/data/bulk/dataBulk.nut.ts @@ -12,7 +12,7 @@ import { execCmd, TestSession } from '@salesforce/cli-plugins-testkit'; import { sleep } from '@salesforce/kit'; import { BatcherReturnType } from '../../../../../src/batcher.js'; import { StatusResult } from '../../../../../src/types.js'; -import { QueryResult } from '../../../data/dataSoqlQuery.nut.js'; +import { QueryResult } from '../../../data/query/query.nut.js'; let testSession: TestSession; diff --git a/test/test-files/csv/backquote.csv b/test/test-files/csv/backquote.csv new file mode 100644 index 00000000..61397731 --- /dev/null +++ b/test/test-files/csv/backquote.csv @@ -0,0 +1,11 @@ +NAME`TYPE`PHONE`WEBSITE`ANNUALREVENUE +account Upsert #0`Account`415-555-0000`http://www.accountUpsert0.com`0 +account Upsert #1`Account`415-555-0000`http://www.accountUpsert1.com`1000 +account Upsert #2`Account`415-555-0000`http://www.accountUpsert2.com`2000 +account Upsert #3`Account`415-555-0000`http://www.accountUpsert3.com`3000 +account Upsert #4`Account`415-555-0000`http://www.accountUpsert4.com`4000 +account Upsert #5`Account`415-555-0000`http://www.accountUpsert5.com`5000 +account Upsert #6`Account`415-555-0000`http://www.accountUpsert6.com`6000 +account Upsert #7`Account`415-555-0000`http://www.accountUpsert7.com`7000 +account Upsert #8`Account`415-555-0000`http://www.accountUpsert8.com`8000 +account Upsert #9`Account`415-555-0000`http://www.accountUpsert9.com`9000 diff --git a/test/test-files/csv/caret.csv b/test/test-files/csv/caret.csv new file mode 100644 index 00000000..ea4fddb3 --- /dev/null +++ b/test/test-files/csv/caret.csv @@ -0,0 +1,11 @@ +NAME^TYPE^PHONE^WEBSITE^ANNUALREVENUE +account Upsert #0^Account^415-555-0000^http://www.accountUpsert0.com^0 +account Upsert #1^Account^415-555-0000^http://www.accountUpsert1.com^1000 +account Upsert #2^Account^415-555-0000^http://www.accountUpsert2.com^2000 +account Upsert #3^Account^415-555-0000^http://www.accountUpsert3.com^3000 +account Upsert #4^Account^415-555-0000^http://www.accountUpsert4.com^4000 +account Upsert #5^Account^415-555-0000^http://www.accountUpsert5.com^5000 +account Upsert #6^Account^415-555-0000^http://www.accountUpsert6.com^6000 +account Upsert #7^Account^415-555-0000^http://www.accountUpsert7.com^7000 +account Upsert #8^Account^415-555-0000^http://www.accountUpsert8.com^8000 +account Upsert #9^Account^415-555-0000^http://www.accountUpsert9.com^9000 diff --git a/test/test-files/csv/comma.csv b/test/test-files/csv/comma.csv new file mode 100644 index 00000000..dfe5b11d --- /dev/null +++ b/test/test-files/csv/comma.csv @@ -0,0 +1,11 @@ +NAME,TYPE,PHONE,WEBSITE,ANNUALREVENUE +account Upsert #0,Account,415-555-0000,http://www.accountUpsert0.com,0 +account Upsert #1,Account,415-555-0000,http://www.accountUpsert1.com,1000 +account Upsert #2,Account,415-555-0000,http://www.accountUpsert2.com,2000 +account Upsert #3,Account,415-555-0000,http://www.accountUpsert3.com,3000 +account Upsert #4,Account,415-555-0000,http://www.accountUpsert4.com,4000 +account Upsert #5,Account,415-555-0000,http://www.accountUpsert5.com,5000 +account Upsert #6,Account,415-555-0000,http://www.accountUpsert6.com,6000 +account Upsert #7,Account,415-555-0000,http://www.accountUpsert7.com,7000 +account Upsert #8,Account,415-555-0000,http://www.accountUpsert8.com,8000 +account Upsert #9,Account,415-555-0000,http://www.accountUpsert9.com,9000 diff --git a/test/test-files/csv/pipe.csv b/test/test-files/csv/pipe.csv new file mode 100644 index 00000000..68acf614 --- /dev/null +++ b/test/test-files/csv/pipe.csv @@ -0,0 +1,11 @@ +NAME|TYPE|PHONE|WEBSITE|ANNUALREVENUE +account Upsert #0|Account|415-555-0000|http://www.accountUpsert0.com|0 +account Upsert #1|Account|415-555-0000|http://www.accountUpsert1.com|1000 +account Upsert #2|Account|415-555-0000|http://www.accountUpsert2.com|2000 +account Upsert #3|Account|415-555-0000|http://www.accountUpsert3.com|3000 +account Upsert #4|Account|415-555-0000|http://www.accountUpsert4.com|4000 +account Upsert #5|Account|415-555-0000|http://www.accountUpsert5.com|5000 +account Upsert #6|Account|415-555-0000|http://www.accountUpsert6.com|6000 +account Upsert #7|Account|415-555-0000|http://www.accountUpsert7.com|7000 +account Upsert #8|Account|415-555-0000|http://www.accountUpsert8.com|8000 +account Upsert #9|Account|415-555-0000|http://www.accountUpsert9.com|9000 diff --git a/test/test-files/csv/semicolon.csv b/test/test-files/csv/semicolon.csv new file mode 100644 index 00000000..b56d8664 --- /dev/null +++ b/test/test-files/csv/semicolon.csv @@ -0,0 +1,11 @@ +NAME;TYPE;PHONE;WEBSITE;ANNUALREVENUE +account Upsert #0;Account;415-555-0000;http://www.accountUpsert0.com;0 +account Upsert #1;Account;415-555-0000;http://www.accountUpsert1.com;1000 +account Upsert #2;Account;415-555-0000;http://www.accountUpsert2.com;2000 +account Upsert #3;Account;415-555-0000;http://www.accountUpsert3.com;3000 +account Upsert #4;Account;415-555-0000;http://www.accountUpsert4.com;4000 +account Upsert #5;Account;415-555-0000;http://www.accountUpsert5.com;5000 +account Upsert #6;Account;415-555-0000;http://www.accountUpsert6.com;6000 +account Upsert #7;Account;415-555-0000;http://www.accountUpsert7.com;7000 +account Upsert #8;Account;415-555-0000;http://www.accountUpsert8.com;8000 +account Upsert #9;Account;415-555-0000;http://www.accountUpsert9.com;9000 diff --git a/test/test-files/csv/tab.csv b/test/test-files/csv/tab.csv new file mode 100644 index 00000000..2ad69d1a --- /dev/null +++ b/test/test-files/csv/tab.csv @@ -0,0 +1,11 @@ +NAME TYPE PHONE WEBSITE ANNUALREVENUE +account Upsert #0 Account 415-555-0000 http://www.accountUpsert0.com 0 +account Upsert #1 Account 415-555-0000 http://www.accountUpsert1.com 1000 +account Upsert #2 Account 415-555-0000 http://www.accountUpsert2.com 2000 +account Upsert #3 Account 415-555-0000 http://www.accountUpsert3.com 3000 +account Upsert #4 Account 415-555-0000 http://www.accountUpsert4.com 4000 +account Upsert #5 Account 415-555-0000 http://www.accountUpsert5.com 5000 +account Upsert #6 Account 415-555-0000 http://www.accountUpsert6.com 6000 +account Upsert #7 Account 415-555-0000 http://www.accountUpsert7.com 7000 +account Upsert #8 Account 415-555-0000 http://www.accountUpsert8.com 8000 +account Upsert #9 Account 415-555-0000 http://www.accountUpsert9.com 9000 diff --git a/test/testUtil.ts b/test/testUtil.ts index 42379450..95112587 100644 --- a/test/testUtil.ts +++ b/test/testUtil.ts @@ -4,12 +4,17 @@ * Licensed under the BSD 3-Clause license. * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause */ + +import path from 'node:path'; import * as fs from 'node:fs'; +import { EOL, platform } from 'node:os'; +import { writeFile, stat, readFile } from 'node:fs/promises'; import { PassThrough, Writable } from 'node:stream'; import { pipeline } from 'node:stream/promises'; import { promisify } from 'node:util'; import { exec as execSync } from 'node:child_process'; import { Connection } from '@salesforce/core'; +import { stringify as csvStringify } from 'csv-stringify/sync'; /* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-unused-vars */ @@ -213,3 +218,103 @@ export async function validateJson(filePath: string, totalqty: number): Promise< expect(parseInt(lengthRes.stdout.trim(), 10)).equal(totalqty); } + +/** + * Takes a CSV with account records and insert the `ID` column for a bulk update operation + * + * @param sourceCsv CSV file with imported account records, shouldn't have an `ID` column. + * @param ids Array of IDs of records inserted from sourceCsv. + * @param savePath path where to save the new CSV. + * + * Each `Account.name` field has a unique timestamp for idempotent runs. + */ +export async function generateUpdatedCsv(sourceCsv: string, ids: string[], savePath: string) { + const csvReadStream = fs.createReadStream(sourceCsv); + const modifiedRows: Array<{ NAME: string; ID?: string }> = []; + let counter = 0; + + await pipeline( + csvReadStream, + new csvParse({ columns: true, delimiter: ',' }), + new PassThrough({ + objectMode: true, + transform(row: { NAME: string; ID?: string }, _encoding, callback) { + row.ID = ids[counter]; + const modifiedRow = { ID: row['ID'], ...row }; + modifiedRows.push(modifiedRow); + counter++; + callback(null, null); + }, + }), + // dummy writable + new Writable({ + write(_chunk, _encoding, callback) { + callback(); + }, + }) + ); + + await writeFile( + savePath, + csvStringify(modifiedRows, { + header: true, + // `csv-stringify` doesn't follow camelCase for its opts: https://csv.js.org/stringify/options/record_delimiter/ + /* eslint-disable-next-line camelcase */ + record_delimiter: platform() === 'win32' ? 'windows' : 'unix', + }) + ); + + return savePath; +} + +/** + * Generates a CSV file with 10_000 account records to insert + * + * @param savePath path where to save the new CSV. + * + * Each `Account.name` field has a unique timestamp for idempotent runs. + */ +export async function generateAccountsCsv( + savePath: string, + columnDelimiter: ColumnDelimiterKeys = 'COMMA' +): Promise { + const id = Date.now(); + + const delimiter = ColumnDelimiter[columnDelimiter]; + + let csv = `NAME${delimiter}TYPE${delimiter}PHONE${delimiter}WEBSITE${EOL}`; + + for (let i = 1; i <= 10_000; i++) { + csv += `account ${id} #${i}${delimiter}Account${delimiter}415-555-0000${delimiter}http://www.accountImport${i}.com${EOL}`; + } + + const accountsCsv = path.join(savePath, 'bulkImportAccounts.csv'); + + await writeFile(accountsCsv, csv); + + return accountsCsv; +} + +/** + * Validate that the cache created by a data command (async/timed out) exists has expected properties + * + * @param filePath cache file path + * @param jobId job ID + */ +export async function validateCacheFile(filePath: string, jobId: string) { + let fileStat: fs.Stats; + try { + fileStat = await stat(filePath); + } catch { + throw new Error(`No file found at ${filePath}`); + } + + if (!fileStat.isFile()) { + throw new Error(`${filePath} exists but is not a file`); + } + + const parsed = JSON.parse(await readFile(filePath, 'utf8')) as Record; + + expect(parsed[jobId]).to.exist; + expect(parsed[jobId]).to.have.all.keys(['jobId', 'username', 'apiVersion', 'timestamp']); +}