From 29b4a253d2a6f2b09ff4c198554b7b41e8f565bf Mon Sep 17 00:00:00 2001 From: Carl Brugger Date: Wed, 17 Apr 2024 13:25:18 -0500 Subject: [PATCH] feat: refactor zip extractor --- .changeset/old-spiders-begin.md | 5 ++ package-lock.json | 4 +- plugins/zip-extractor/package.json | 4 +- plugins/zip-extractor/src/index.ts | 124 +++++++++++++++++------------ 4 files changed, 86 insertions(+), 51 deletions(-) create mode 100644 .changeset/old-spiders-begin.md diff --git a/.changeset/old-spiders-begin.md b/.changeset/old-spiders-begin.md new file mode 100644 index 000000000..8eafb432c --- /dev/null +++ b/.changeset/old-spiders-begin.md @@ -0,0 +1,5 @@ +--- +'@flatfile/plugin-zip-extractor': patch +--- + +This release fixes an issue running the zip extractor when deployed. diff --git a/package-lock.json b/package-lock.json index ec41e6a8e..779307fe2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16976,9 +16976,11 @@ "license": "ISC", "dependencies": { "@flatfile/api": "^1.7.4", + "@flatfile/plugin-job-handler": "^0.4.3", "@flatfile/util-common": "^1.1.1", "@flatfile/util-file-buffer": "^0.2.2", - "adm-zip": "^0.5.10" + "adm-zip": "^0.5.10", + "modern-async": "^2.0.0" }, "devDependencies": { "@flatfile/utils-testing": "^0.1.4", diff --git a/plugins/zip-extractor/package.json b/plugins/zip-extractor/package.json index 979cffcdd..c090414ed 100644 --- a/plugins/zip-extractor/package.json +++ b/plugins/zip-extractor/package.json @@ -32,9 +32,11 @@ "license": "ISC", "dependencies": { "@flatfile/api": "^1.7.4", + "@flatfile/plugin-job-handler": "^0.4.3", "@flatfile/util-common": "^1.1.1", "@flatfile/util-file-buffer": "^0.2.2", - "adm-zip": "^0.5.10" + "adm-zip": "^0.5.10", + "modern-async": "^2.0.0" }, "peerDependencies": { "@flatfile/listener": "^1.0.1" diff --git a/plugins/zip-extractor/src/index.ts b/plugins/zip-extractor/src/index.ts index 93bad4f59..a00f8cfc7 100644 --- a/plugins/zip-extractor/src/index.ts +++ b/plugins/zip-extractor/src/index.ts @@ -1,48 +1,69 @@ -import type { FlatfileListener } from '@flatfile/listener' -import { fileBuffer } from '@flatfile/util-file-buffer' -import api from '@flatfile/api' -import * as fs from 'fs' -import path from 'path' +import type { FlatfileEvent, FlatfileListener } from '@flatfile/listener' + +import api, { Flatfile } from '@flatfile/api' +import { jobHandler } from '@flatfile/plugin-job-handler' +import { logInfo } from '@flatfile/util-common' +import { getFileBuffer } from '@flatfile/util-file-buffer' import AdmZip from 'adm-zip' +import fs from 'fs' +import { asyncForEach } from 'modern-async' import { tmpdir } from 'os' -import { logInfo } from '@flatfile/util-common' +import path from 'path' export interface PluginOptions { readonly debug?: boolean } export const ZipExtractor = (options: PluginOptions = {}) => { - return (handler: FlatfileListener) => { - handler.use( - fileBuffer('.zip', async (file, buffer, event) => { - const { spaceId, environmentId } = event.context - const job = await api.jobs.create({ - type: 'file', - operation: 'extract', - status: 'ready', - source: event.context.fileId, - }) - try { - await api.jobs.update(job.data.id, { status: 'executing' }) - await api.jobs.ack(job.data.id, { - progress: 10, - info: 'Unzipping file', - }) - const zip = new AdmZip(buffer) - if (options.debug) { - logInfo('@flatfile/plugin-zip-extractor', `tmpdir ${tmpdir()}`) - } - const zipEntries = zip.getEntries() - await api.jobs.ack(job.data.id, { - progress: 50, - info: 'Uploading files', - }) - const uploadPromises = zipEntries.map(async (zipEntry) => { - if ( - !zipEntry.name.startsWith('.') && - !zipEntry.entryName.startsWith('__MACOSX') && - !zipEntry.isDirectory - ) { + return (listener: FlatfileListener) => { + listener.on('file:created', async (event) => { + const { fileId } = event.context + const { data: file } = await api.files.get(fileId) + if (file.mode === 'export') { + return + } + + if (!file.name.endsWith('.zip')) { + return + } + + const jobs = await api.jobs.create({ + type: Flatfile.JobType.File, + operation: 'extract-plugin-zip', + status: Flatfile.JobStatus.Ready, + source: fileId, + }) + await api.jobs.execute(jobs.data.id) + }) + listener.use( + jobHandler( + { operation: 'extract-plugin-zip' }, + async ( + event: FlatfileEvent, + tick: ( + progress: number, + message?: string + ) => Promise + ) => { + const { spaceId, environmentId } = event.context + + try { + await tick(1, 'Unzipping file') + const buffer = await getFileBuffer(event) + const zip = new AdmZip(buffer) + const zipEntries = zip + .getEntries() + .filter( + (zipEntry) => + !zipEntry.name.startsWith('.') && + !zipEntry.entryName.startsWith('__MACOSX') && + !zipEntry.isDirectory + ) + const zipEntryCount = zipEntries.length + + await tick(10, 'Uploading files') + let i = 0 + await asyncForEach(zipEntries, async (zipEntry) => { zip.extractEntryTo(zipEntry, tmpdir(), false, true) const filePath = path.join(tmpdir(), zipEntry.name) if (options.debug) { @@ -57,19 +78,24 @@ export const ZipExtractor = (options: PluginOptions = {}) => { environmentId, }) await fs.promises.unlink(filePath) - } - }) - await Promise.all(uploadPromises) - await api.jobs.complete(job.data.id, { - info: 'Extraction complete', - }) - } catch (e) { - logInfo('@flatfile/plugin-zip-extractor', `error ${e}`) - await api.jobs.fail(job.data.id, { - info: `Extraction failed ${e.message}`, - }) + await tick( + 10 + Math.round(((i + 1) / zipEntryCount) * 89), + 'File uploaded' + ) + i++ + }) + + return { + outcome: { + message: 'Extraction complete', + }, + } as Flatfile.JobCompleteDetails + } catch (e) { + logInfo('@flatfile/plugin-zip-extractor', `error ${e}`) + throw new Error(`Extraction failed ${e.message}`) + } } - }) + ) ) } }