Skip to content

Commit

Permalink
Merge pull request #38 from mcarvin8/refactoring
Browse files Browse the repository at this point in the history
fix: add parallel processing to handlers
  • Loading branch information
mcarvin8 authored Jan 21, 2025
2 parents e145f7d + 75ff605 commit e6035e6
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 33 deletions.
69 changes: 48 additions & 21 deletions src/service/buildDisassembledFiles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { processElement } from "@src/service/processElement";
import { buildRootElementHeader } from "@src/service/buildRootElementHeader";
import { buildLeafFile } from "@src/service/buildLeafFile";
import { parseXML } from "@src/service/parseXML";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";

export async function buildDisassembledFiles(
filePath: string,
Expand All @@ -19,40 +20,51 @@ export async function buildDisassembledFiles(
): Promise<void> {
const parsedXml = await parseXML(filePath);
if (parsedXml === undefined) return;
const rootElementName = Object.keys(parsedXml)[1];

const rootElementName = Object.keys(parsedXml)[1];
const rootElement: XmlElement = parsedXml[rootElementName];
const rootElementHeader = buildRootElementHeader(
rootElement,
rootElementName,
);

let leafContent = "";
let leafCount = 0;
let hasNestedElements: boolean = false;

// Iterate through child elements to find the field name for each
for (const key of Object.keys(rootElement).filter(
const childKeys = Object.keys(rootElement).filter(
(key: string) => !key.startsWith("@"),
)) {
);

const concurrencyLimit = getConcurrencyThreshold();
const activePromises: Promise<void>[] = [];
let currentIndex = 0;

const processChildKey = async (key: string) => {
if (Array.isArray(rootElement[key])) {
for (const element of rootElement[key] as XmlElement[]) {
const [updatedLeafContent, updatedLeafCount, updatedHasNestedElements] =
await processElement({
await Promise.all(
(rootElement[key] as XmlElement[]).map(async (element) => {
const [
updatedLeafContent,
updatedLeafCount,
updatedHasNestedElements,
] = await processElement({
element,
disassembledPath,
uniqueIdElements,
rootElementName,
rootElementHeader,
key,
indent,
leafContent,
leafCount,
hasNestedElements,
leafContent: "",
leafCount: 0,
hasNestedElements: false,
});
leafContent = updatedLeafContent;
leafCount = updatedLeafCount;
hasNestedElements = updatedHasNestedElements;
}
leafContent += updatedLeafContent;
leafCount += updatedLeafCount;
hasNestedElements = hasNestedElements || updatedHasNestedElements;
}),
);
} else {
const [updatedLeafContent, updatedLeafCount, updatedHasNestedElements] =
await processElement({
Expand All @@ -63,13 +75,28 @@ export async function buildDisassembledFiles(
rootElementHeader,
key,
indent,
leafContent,
leafCount,
hasNestedElements,
leafContent: "",
leafCount: 0,
hasNestedElements: false,
});
leafContent = updatedLeafContent;
leafCount = updatedLeafCount;
hasNestedElements = updatedHasNestedElements;
leafContent += updatedLeafContent;
leafCount += updatedLeafCount;
hasNestedElements = hasNestedElements || updatedHasNestedElements;
}
};

while (currentIndex < childKeys.length || activePromises.length > 0) {
if (
currentIndex < childKeys.length &&
activePromises.length < concurrencyLimit
) {
const key = childKeys[currentIndex++];
const promise = processChildKey(key).finally(() => {
activePromises.splice(activePromises.indexOf(promise), 1);
});
activePromises.push(promise);
} else {
await Promise.race(activePromises);
}
}

Expand All @@ -90,6 +117,6 @@ export async function buildDisassembledFiles(
);
}
if (postPurge) {
unlink(filePath);
await unlink(filePath);
}
}
33 changes: 28 additions & 5 deletions src/service/disassembleXMLFileHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import ignore, { Ignore } from "ignore";
import { logger } from "@src/index";
import { INDENT } from "@src/helpers/constants";
import { buildDisassembledFiles } from "@src/service/buildDisassembledFiles";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";

export class DisassembleXMLFileHandler {
private ign: Ignore = ignore();
private readonly ign: Ignore = ignore();

async disassemble(xmlAttributes: {
filePath: string;
Expand Down Expand Up @@ -58,11 +59,18 @@ export class DisassembleXMLFileHandler {
});
} else if (fileStat.isDirectory()) {
const subFiles = await readdir(filePath);
for (const subFile of subFiles) {

const concurrencyLimit = getConcurrencyThreshold();
const activePromises: Promise<void>[] = [];
let currentIndex = 0;

// Function to process a single file
const processSubFile = async (subFile: string) => {
const subFilePath = join(filePath, subFile);
const relativeSubFilePath = this.posixPath(
relative(process.cwd(), subFilePath),
);

if (
subFilePath.endsWith(".xml") &&
!this.ign.ignores(relativeSubFilePath)
Expand All @@ -77,6 +85,21 @@ export class DisassembleXMLFileHandler {
} else if (this.ign.ignores(relativeSubFilePath)) {
logger.warn(`File ignored by ${ignorePath}: ${subFilePath}`);
}
};

while (currentIndex < subFiles.length || activePromises.length > 0) {
if (
currentIndex < subFiles.length &&
activePromises.length < concurrencyLimit
) {
const subFile = subFiles[currentIndex++];
const promise = processSubFile(subFile).finally(() => {
activePromises.splice(activePromises.indexOf(promise), 1);
});
activePromises.push(promise);
} else {
await Promise.race(activePromises); // Wait for any promise to resolve
}
}
}
}
Expand All @@ -95,11 +118,11 @@ export class DisassembleXMLFileHandler {
const fullName = basename(filePath, extname(filePath));
const baseName = fullName.split(".")[0];

let outputPath;
outputPath = join(dirPath, baseName);
let outputPath = join(dirPath, baseName);

if (prePurge && existsSync(outputPath))
if (prePurge && existsSync(outputPath)) {
await rm(outputPath, { recursive: true });
}

await buildDisassembledFiles(
filePath,
Expand Down
10 changes: 10 additions & 0 deletions src/service/getConcurrencyThreshold.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"use strict";

import { availableParallelism } from "node:os";

export function getConcurrencyThreshold(): number {
const AVAILABLE_PARALLELISM: number = availableParallelism
? availableParallelism()
: Infinity;
return Math.min(AVAILABLE_PARALLELISM, 6);
}
41 changes: 34 additions & 7 deletions src/service/reassembleXMLFileHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import { buildReassembledFile } from "@src/service/buildReassembledFiles";
import { buildXMLString } from "@src/service/buildXMLString";
import { parseXML } from "@src/service/parseXML";
import { processFilesForRootElement } from "@src/service/processFilesForRootElement";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";

export class ReassembleXMLFileHandler {
async processFilesInDirectory(
dirPath: string,
): Promise<[string[], [string, string | undefined] | undefined]> {
const combinedXmlContents: string[] = [];
let rootResult: [string, string | undefined] | undefined = undefined;
const files = await readdir(dirPath);

// Sort files based on the name
Expand All @@ -24,24 +23,52 @@ export class ReassembleXMLFileHandler {
return fullNameA.localeCompare(fullNameB);
});

for (const file of files) {
const combinedXmlContents: string[] = [];
let rootResult: [string, string | undefined] | undefined = undefined;

const concurrencyLimit = getConcurrencyThreshold();
const activePromises: Promise<void>[] = [];
let currentIndex = 0;

// Function to process a single file
const processFile = async (file: string, index: number) => {
const filePath = join(dirPath, file);
const fileStat = await stat(filePath);

if (fileStat.isFile() && filePath.endsWith(".xml")) {
const xmlParsed = await parseXML(filePath);
if (xmlParsed === undefined) continue;
if (xmlParsed === undefined) return;

const rootResultFromFile = await processFilesForRootElement(xmlParsed);
rootResult = rootResultFromFile;

const combinedXmlString = buildXMLString(xmlParsed);
combinedXmlContents.push(combinedXmlString);
combinedXmlContents[index] = combinedXmlString;
} else if (fileStat.isDirectory()) {
const [subCombinedXmlContents, subRootResult] =
await this.processFilesInDirectory(filePath);
combinedXmlContents.push(...subCombinedXmlContents);
rootResult = subRootResult;
combinedXmlContents[index] = subCombinedXmlContents.join("");
}
};

while (currentIndex < files.length || activePromises.length > 0) {
if (
currentIndex < files.length &&
activePromises.length < concurrencyLimit
) {
const index = currentIndex++;
const file = files[index];
const promise = processFile(file, index).finally(() => {
activePromises.splice(activePromises.indexOf(promise), 1);
});
activePromises.push(promise);
} else {
await Promise.race(activePromises); // Wait for any promise to complete
}
}
return [combinedXmlContents, rootResult];

return [combinedXmlContents.filter(Boolean), rootResult];
}

async reassemble(xmlAttributes: {
Expand Down
33 changes: 33 additions & 0 deletions test/main.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ describe("main function", () => {

afterEach(async () => {
jest.restoreAllMocks();
jest.resetModules();
});

afterAll(async () => {
Expand Down Expand Up @@ -253,6 +254,38 @@ describe("main function", () => {

expect(logger.error).not.toHaveBeenCalled();
});
it("should return the minimum of available parallelism and 6", () => {
jest.mock("node:os", () => ({
availableParallelism: jest.fn(() => 4), // Mock availableParallelism to return 4
}));

const {
getConcurrencyThreshold,
} = require("../src/service/getConcurrencyThreshold");
expect(getConcurrencyThreshold()).toBe(4);
});

it("should return 6 if availableParallelism returns a higher value", () => {
jest.mock("node:os", () => ({
availableParallelism: jest.fn(() => 10), // Mock availableParallelism to return 10
}));

const {
getConcurrencyThreshold,
} = require("../src/service/getConcurrencyThreshold");
expect(getConcurrencyThreshold()).toBe(6);
});

it("should return 6 if availableParallelism is undefined", () => {
jest.mock("node:os", () => ({
availableParallelism: undefined, // Simulate unavailable function
}));

const {
getConcurrencyThreshold,
} = require("../src/service/getConcurrencyThreshold");
expect(getConcurrencyThreshold()).toBe(6);
});
// This should always be the final test
it("should compare the files created in the mock directory against the baselines to confirm no changes.", async () => {
await compareDirectories(baselineDir, mockDir);
Expand Down

0 comments on commit e6035e6

Please sign in to comment.