Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

N8N-3022 fix for postgres columns containing spaces #2989

Merged
merged 12 commits into from
Apr 19, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 51 additions & 25 deletions packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IDataObject, INodeExecutionData } from 'n8n-workflow';
import { IDataObject, INodeExecutionData, JsonObject } from 'n8n-workflow';
import pgPromise from 'pg-promise';
import pg from 'pg-promise/typescript/pg-subset';

Expand All @@ -10,12 +10,18 @@ import pg from 'pg-promise/typescript/pg-subset';
* @param {string[]} properties The properties it should include
* @returns
*/
export function getItemsCopy(items: INodeExecutionData[], properties: string[]): IDataObject[] {
export function getItemsCopy(items: INodeExecutionData[], properties: string[], guardedColumns?: {[key: string]: string}): IDataObject[] {
let newItem: IDataObject;
return items.map(item => {
newItem = {};
for (const property of properties) {
newItem[property] = item.json[property];
if (guardedColumns) {
Object.keys(guardedColumns).forEach( column => {
newItem[column] = item.json[guardedColumns[column]];
});
} else {
for (const property of properties) {
newItem[property] = item.json[property];
}
}
return newItem;
});
Expand All @@ -29,10 +35,16 @@ export function getItemsCopy(items: INodeExecutionData[], properties: string[]):
* @param {string[]} properties The properties it should include
* @returns
*/
export function getItemCopy(item: INodeExecutionData, properties: string[]): IDataObject {
export function getItemCopy(item: INodeExecutionData, properties: string[], guardedColumns?: {[key: string]: string}): IDataObject {
const newItem: IDataObject = {};
for (const property of properties) {
newItem[property] = item.json[property];
if (guardedColumns) {
Object.keys(guardedColumns).forEach( column => {
newItem[column] = item.json[guardedColumns[column]];
});
} else {
for (const property of properties) {
newItem[property] = item.json[property];
}
}
return newItem;
}
Expand Down Expand Up @@ -93,7 +105,7 @@ export async function pgQuery(
Array.prototype.push.apply(result, await t.any(allQueries[i].query, allQueries[i].values));
} catch (err) {
if (continueOnFail === false) throw err;
result.push({ ...items[i].json, code: err.code, message: err.message });
result.push({ ...items[i].json, code: (err as JsonObject).code, message: (err as JsonObject).message });
return result;
}
}
Expand All @@ -107,7 +119,7 @@ export async function pgQuery(
Array.prototype.push.apply(result, await t.any(allQueries[i].query, allQueries[i].values));
} catch (err) {
if (continueOnFail === false) throw err;
result.push({ ...items[i].json, code: err.code, message: err.message });
result.push({ ...items[i].json, code: (err as JsonObject).code, message: (err as JsonObject).message });
}
}
return result;
Expand Down Expand Up @@ -136,9 +148,15 @@ export async function pgInsert(
const table = getNodeParam('table', 0) as string;
const schema = getNodeParam('schema', 0) as string;
const columnString = getNodeParam('columns', 0) as string;
const guardedColumns: {[key: string]: string} = {};

const columns = columnString.split(',')
.map(column => column.trim().split(':'))
.map(([name, cast]) => ({ name, cast }));
.map(([name, cast], i) => {
guardedColumns[`column${i}`] = name;
return { name, cast, prop: `column${i}` };
});

const columnNames = columns.map(column => column.name);

const cs = new pgp.helpers.ColumnSet(columns, { table: { table, schema } });
Expand All @@ -148,18 +166,18 @@ export async function pgInsert(

const returning = generateReturning(pgp, getNodeParam('returnFields', 0) as string);
if (mode === 'multiple') {
const query = pgp.helpers.insert(getItemsCopy(items, columnNames), cs) + returning;
const query = pgp.helpers.insert(getItemsCopy(items, columnNames, guardedColumns), cs) + returning;
return db.any(query);
} else if (mode === 'transaction') {
return db.tx(async t => {
const result: IDataObject[] = [];
for (let i = 0; i < items.length; i++) {
const itemCopy = getItemCopy(items[i], columnNames);
const itemCopy = getItemCopy(items[i], columnNames, guardedColumns);
try {
result.push(await t.one(pgp.helpers.insert(itemCopy, cs) + returning));
} catch (err) {
if (continueOnFail === false) throw err;
result.push({ ...itemCopy, code: err.code, message: err.message });
result.push({ ...itemCopy, code: (err as JsonObject).code, message: (err as JsonObject).message });
return result;
}
}
Expand All @@ -169,7 +187,7 @@ export async function pgInsert(
return db.task(async t => {
const result: IDataObject[] = [];
for (let i = 0; i < items.length; i++) {
const itemCopy = getItemCopy(items[i], columnNames);
const itemCopy = getItemCopy(items[i], columnNames, guardedColumns);
try {
const insertResult = await t.oneOrNone(pgp.helpers.insert(itemCopy, cs) + returning);
if (insertResult !== null) {
Expand All @@ -179,7 +197,7 @@ export async function pgInsert(
if (continueOnFail === false) {
throw err;
}
result.push({ ...itemCopy, code: err.code, message: err.message });
result.push({ ...itemCopy, code: (err as JsonObject).code, message: (err as JsonObject).message });
}
}
return result;
Expand Down Expand Up @@ -209,15 +227,21 @@ export async function pgUpdate(
const schema = getNodeParam('schema', 0) as string;
const updateKey = getNodeParam('updateKey', 0) as string;
const columnString = getNodeParam('columns', 0) as string;
const columns = columnString.split(',')
const guardedColumns: {[key: string]: string} = {};

const columns: Array<{name:string, cast: string, prop:string}> = columnString.split(',')
.map(column => column.trim().split(':'))
.map(([name, cast]) => ({ name, cast }));
.map(([name, cast], i) => {
guardedColumns[`column${i}`] = name;
return { name, cast, prop: `column${i}` };
});

const updateKeys = updateKey.split(',').map(key => {
const updateKeys = updateKey.split(',').map((key, i) => {
const [name, cast] = key.trim().split(':');
const updateColumn = { name, cast };
const targetCol = columns.find((column) => column.name === name);
const updateColumn = { name, cast, prop: targetCol ? targetCol.prop : `updateColumn${i}` };
if (!targetCol) {
guardedColumns[updateColumn.prop] = name;
columns.unshift(updateColumn);
}
else if (!targetCol.cast) {
Expand All @@ -233,7 +257,7 @@ export async function pgUpdate(

// Prepare the data to update and copy it to be returned
const columnNames = columns.map(column => column.name);
const updateItems = getItemsCopy(items, columnNames);
const updateItems = getItemsCopy(items, columnNames, guardedColumns);

const returning = generateReturning(pgp, getNodeParam('returnFields', 0) as string);
if (mode === 'multiple') {
Expand All @@ -246,17 +270,19 @@ export async function pgUpdate(
+ returning;
return await db.any(query);
} else {
const where = ' WHERE ' + updateKeys.map(updateKey => pgp.as.name(updateKey.name) + ' = ${' + updateKey.name + '}').join(' AND ');
const where = ' WHERE ' +
updateKeys.map(updateKey => pgp.as.name(updateKey.name) +
' = ${' + updateKey.prop + '}').join(' AND ');
if (mode === 'transaction') {
return db.tx(async t => {
const result: IDataObject[] = [];
for (let i = 0; i < items.length; i++) {
const itemCopy = getItemCopy(items[i], columnNames);
const itemCopy = getItemCopy(items[i], columnNames, guardedColumns);
try {
Array.prototype.push.apply(result, await t.any(pgp.helpers.update(itemCopy, cs) + pgp.as.format(where, itemCopy) + returning));
} catch (err) {
if (continueOnFail === false) throw err;
result.push({ ...itemCopy, code: err.code, message: err.message });
result.push({ ...itemCopy, code: (err as JsonObject).code, message: (err as JsonObject).message });
return result;
}
}
Expand All @@ -266,12 +292,12 @@ export async function pgUpdate(
return db.task(async t => {
const result: IDataObject[] = [];
for (let i = 0; i < items.length; i++) {
const itemCopy = getItemCopy(items[i], columnNames);
const itemCopy = getItemCopy(items[i], columnNames, guardedColumns);
try {
Array.prototype.push.apply(result, await t.any(pgp.helpers.update(itemCopy, cs) + pgp.as.format(where, itemCopy) + returning));
} catch (err) {
if (continueOnFail === false) throw err;
result.push({ ...itemCopy, code: err.code, message: err.message });
result.push({ ...itemCopy, code: (err as JsonObject).code, message: (err as JsonObject).message });
}
}
return result;
Expand Down