Skip to content

Commit

Permalink
feat: refactor zip extractor
Browse files Browse the repository at this point in the history
  • Loading branch information
carlbrugger committed Apr 17, 2024
1 parent 6d01f4c commit 29b4a25
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 51 deletions.
5 changes: 5 additions & 0 deletions .changeset/old-spiders-begin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@flatfile/plugin-zip-extractor': patch
---

This release fixes an issue running the zip extractor when deployed.
4 changes: 3 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion plugins/zip-extractor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
"license": "ISC",
"dependencies": {
"@flatfile/api": "^1.7.4",
"@flatfile/plugin-job-handler": "^0.4.3",
"@flatfile/util-common": "^1.1.1",
"@flatfile/util-file-buffer": "^0.2.2",
"adm-zip": "^0.5.10"
"adm-zip": "^0.5.10",
"modern-async": "^2.0.0"
},
"peerDependencies": {
"@flatfile/listener": "^1.0.1"
Expand Down
124 changes: 75 additions & 49 deletions plugins/zip-extractor/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,69 @@
import type { FlatfileListener } from '@flatfile/listener'
import { fileBuffer } from '@flatfile/util-file-buffer'
import api from '@flatfile/api'
import * as fs from 'fs'
import path from 'path'
import type { FlatfileEvent, FlatfileListener } from '@flatfile/listener'

import api, { Flatfile } from '@flatfile/api'
import { jobHandler } from '@flatfile/plugin-job-handler'
import { logInfo } from '@flatfile/util-common'
import { getFileBuffer } from '@flatfile/util-file-buffer'
import AdmZip from 'adm-zip'
import fs from 'fs'
import { asyncForEach } from 'modern-async'
import { tmpdir } from 'os'
import { logInfo } from '@flatfile/util-common'
import path from 'path'

export interface PluginOptions {
readonly debug?: boolean
}

export const ZipExtractor = (options: PluginOptions = {}) => {
return (handler: FlatfileListener) => {
handler.use(
fileBuffer('.zip', async (file, buffer, event) => {
const { spaceId, environmentId } = event.context
const job = await api.jobs.create({
type: 'file',
operation: 'extract',
status: 'ready',
source: event.context.fileId,
})
try {
await api.jobs.update(job.data.id, { status: 'executing' })
await api.jobs.ack(job.data.id, {
progress: 10,
info: 'Unzipping file',
})
const zip = new AdmZip(buffer)
if (options.debug) {
logInfo('@flatfile/plugin-zip-extractor', `tmpdir ${tmpdir()}`)
}
const zipEntries = zip.getEntries()
await api.jobs.ack(job.data.id, {
progress: 50,
info: 'Uploading files',
})
const uploadPromises = zipEntries.map(async (zipEntry) => {
if (
!zipEntry.name.startsWith('.') &&
!zipEntry.entryName.startsWith('__MACOSX') &&
!zipEntry.isDirectory
) {
return (listener: FlatfileListener) => {
listener.on('file:created', async (event) => {
const { fileId } = event.context
const { data: file } = await api.files.get(fileId)
if (file.mode === 'export') {
return
}

if (!file.name.endsWith('.zip')) {
return
}

const jobs = await api.jobs.create({
type: Flatfile.JobType.File,
operation: 'extract-plugin-zip',
status: Flatfile.JobStatus.Ready,
source: fileId,
})
await api.jobs.execute(jobs.data.id)
})
listener.use(
jobHandler(
{ operation: 'extract-plugin-zip' },
async (
event: FlatfileEvent,
tick: (
progress: number,
message?: string
) => Promise<Flatfile.JobResponse>
) => {
const { spaceId, environmentId } = event.context

try {
await tick(1, 'Unzipping file')
const buffer = await getFileBuffer(event)
const zip = new AdmZip(buffer)
const zipEntries = zip
.getEntries()
.filter(
(zipEntry) =>
!zipEntry.name.startsWith('.') &&
!zipEntry.entryName.startsWith('__MACOSX') &&
!zipEntry.isDirectory
)
const zipEntryCount = zipEntries.length

await tick(10, 'Uploading files')
let i = 0
await asyncForEach(zipEntries, async (zipEntry) => {
zip.extractEntryTo(zipEntry, tmpdir(), false, true)
const filePath = path.join(tmpdir(), zipEntry.name)
if (options.debug) {
Expand All @@ -57,19 +78,24 @@ export const ZipExtractor = (options: PluginOptions = {}) => {
environmentId,
})
await fs.promises.unlink(filePath)
}
})
await Promise.all(uploadPromises)
await api.jobs.complete(job.data.id, {
info: 'Extraction complete',
})
} catch (e) {
logInfo('@flatfile/plugin-zip-extractor', `error ${e}`)
await api.jobs.fail(job.data.id, {
info: `Extraction failed ${e.message}`,
})
await tick(
10 + Math.round(((i + 1) / zipEntryCount) * 89),
'File uploaded'
)
i++
})

return {
outcome: {
message: 'Extraction complete',
},
} as Flatfile.JobCompleteDetails
} catch (e) {
logInfo('@flatfile/plugin-zip-extractor', `error ${e}`)
throw new Error(`Extraction failed ${e.message}`)
}
}
})
)
)
}
}
Expand Down

0 comments on commit 29b4a25

Please sign in to comment.