Skip to content

Commit

Permalink
Walk through the db using id offset instead of seq offset
Browse files Browse the repository at this point in the history
  • Loading branch information
Falinor committed Jul 14, 2023
1 parent 1a71640 commit 9df7f6a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 64 deletions.
6 changes: 0 additions & 6 deletions database/migrations/068-housing-occupancy.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import { Knex } from 'knex';

exports.up = async function (knex: Knex) {
// Leave some space for UPDATEs and allow Postgres to do use HOT update
// instead of DELETE + INSERT (useful for performance)
await knex.raw('ALTER TABLE housing SET (FILLFACTOR = 80)');

await knex.schema.alterTable('housing', (table) => {
table.renameColumn('occupancy', 'occupancy_registered');
});
Expand All @@ -21,8 +17,6 @@ exports.up = async function (knex: Knex) {
await knex.schema.alterTable('housing', (table) => {
table.string('occupancy').notNullable().alter();
});

await knex.raw('ALTER TABLE housing SET (FILLFACTOR = 100)');
};

exports.down = function (knex: Knex) {
Expand Down
100 changes: 42 additions & 58 deletions database/migrations/073-housing-refactor-statuses.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import async from 'async';
import { formatDuration, intervalToDuration, isValid } from 'date-fns';
import { Knex } from 'knex';
import fp from 'lodash/fp';
import { v4 as uuidv4 } from 'uuid';
Expand All @@ -11,7 +12,7 @@ import { housingTable } from '../../server/repositories/housingRepository';
import { usersTable } from '../../server/repositories/userRepository';
import { isNotNull } from '../../frontend/src/utils/compareUtils';

const BATCH_SIZE = 1000;
const BATCH_SIZE = 100_000;

exports.up = async function (knex: Knex) {
const email = 'admin@zerologementvacant.beta.gouv.fr';
Expand All @@ -21,32 +22,29 @@ exports.up = async function (knex: Knex) {
throw new Error(`${email} not found`);
}

// Leave some space for UPDATEs and allow Postgres to do use HOT update
// instead of DELETE + INSERT (useful for performance)
await knex.raw('ALTER TABLE housing SET (FILLFACTOR = 80)');

await knex.schema.createTable('housing_updated', (table) => {
table.uuid('id').primary().notNullable();
table.integer('status').notNullable();
table.string('sub_status');
table.specificType('precisions', 'text[]');
table.specificType('vacancy_reasons', 'text[]');
table.string('occupancy');
});
const start = new Date();

let count = 0;
let offset = 0;
let length = 0;
let count = 0;
let currentId: string | undefined;

const result = await knex(housingTable).count().first();
const total = Number(result?.count);

console.log(`${total} housing found.`);

await async.doUntil(
async () => {
const housingList = await knex(housingTable)
const housingList: Housing[] = await knex(housingTable)
.modify((builder) => {
if (currentId) {
builder.where('id', '>', currentId);
}
})
.orderBy('id')
.limit(BATCH_SIZE)
.offset(offset);
.limit(BATCH_SIZE);
// Remember the last id for pagination
currentId = fp.last(housingList)?.id;

const items = housingList
.map((oldHousing) => {
Expand Down Expand Up @@ -100,44 +98,23 @@ exports.up = async function (knex: Knex) {
]
: event
);
await saveEvents(knex)(events);
const compactHousingList = items
.map((item) => item.housing)
.map(
fp.pick([
'id',
'status',
'sub_status',
'precisions',
'vacancy_reasons',
'occupancy',
])
);
if (compactHousingList.length > 0) {
await knex('housing_updated').insert(compactHousingList);
}
await async.forEach(fp.chunk(1000, events), async (events) => {
await saveEvents(knex)(events);
});
const compactHousingList = items.map((item) => item.housing);
await async.forEach(fp.chunk(1000, compactHousingList), async (hl) => {
await saveHousingList(knex)(hl);
});

length = housingList.length;
count += length;
offset += BATCH_SIZE;
console.log(`${count} / ${total} housing.`);
console.log(`${count} / ${total} housing`);
},
async () => length < BATCH_SIZE
);

await knex.raw(`
UPDATE housing h
SET
status = hu.status,
sub_status = hu.sub_status,
precisions = hu.precisions,
vacancy_reasons = hu.vacancy_reasons,
occupancy = hu.occupancy
FROM housing_updated hu
WHERE h.id = hu.id
`);
await knex.schema.dropTable('housing_updated');
await knex.raw('ALTER TABLE housing SET (FILLFACTOR = 100)');
const end = new Date();
console.log(`Done in ${formatDuration(intervalToDuration({ start, end }))}`);
};

exports.down = async function (knex: Knex) {
Expand All @@ -148,8 +125,8 @@ exports.down = async function (knex: Knex) {
throw new Error(`${email} not found`);
}

let offset = 0;
let length = 0;
let currentId: string | undefined;

// knex stream does not work in a migration...
await async.doUntil(
Expand All @@ -158,9 +135,15 @@ exports.down = async function (knex: Knex) {
.where({
name: 'Modification arborescence de suivi',
})
.modify((builder) => {
if (currentId) {
builder.where('id', '>', currentId);
}
})
.orderBy('id')
.limit(BATCH_SIZE)
.offset(offset);
.limit(BATCH_SIZE);
// Remember the last id for pagination
currentId = fp.last(events)?.id;

const housingList: Housing[] = events
.map((event) => event.old)
Expand All @@ -172,17 +155,17 @@ exports.down = async function (knex: Knex) {
...h,
mutation_date:
// @ts-ignore
h.mutation_date === '-000001-12-31T23:50:39.000Z'
? null
: // @ts-ignore
h.mutation_date,
isValid(h.mutation_date) ? h.mutation_date : null,
}))
.map(normalizeStatus);

await saveHousingList(knex)(housingList);
await async.forEach(fp.chunk(1000, housingList), async (_) => {
console.log('Saving housing list...');
await saveHousingList(knex)(_);
console.log('Save housing list');
});

length = events.length;
offset += events.length;
},
async () => length < BATCH_SIZE
);
Expand Down Expand Up @@ -653,6 +636,7 @@ const denormalizeNewStatus = denormalizeStatus([
'Suivi terminé',
]);

// @ts-ignore
function saveEvents(knex: Knex) {
return async (events: HousingEvent[]): Promise<void> => {
const housingEvents = events.map((event) => ({
Expand Down

0 comments on commit 9df7f6a

Please sign in to comment.