Skip to content

Commit

Permalink
fix: empty sync
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelTaylor3D committed May 7, 2024
1 parent 69c9472 commit 1a19a04
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 83 deletions.
186 changes: 104 additions & 82 deletions src/tasks/sync-registries.js
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,23 @@ const syncOrganizationAudit = async (organization) => {

if (!CONFIG.USE_SIMULATOR) {
await new Promise((resolve) => setTimeout(resolve, 30000));

const { sync_status } = await datalayer.getSyncStatus(
organization.orgUid,
);

if (lastProcessedIndex > sync_status.generation) {
const warningMsg = [
`No data found for ${organization.name} in the current datalayer generation.`,
`DataLayer not yet caught up to generation ${lastProcessedIndex}.`,
`This issue is often temporary and could be due to a lag in data propagation.`,
'Syncing for this organization will be paused until this is resolved.',
'For ongoing issues, please contact the organization.',
].join(' ');

logger.warn(warningMsg);
return;
}
}

logger.debug(`5 Last processed index: ${lastProcessedIndex}`);
Expand All @@ -276,19 +293,6 @@ const syncOrganizationAudit = async (organization) => {
root2.root_hash,
);

if (_.isEmpty(kvDiff)) {
const warningMsg = [
`No data found for ${organization.name} in the current datalayer generation.`,
`Missing data for root hash: ${root2.root_hash}.`,
`This issue is often temporary and could be due to a lag in data propagation.`,
'Syncing for this organization will be paused until this is resolved.',
'For ongoing issues, please contact the organization.',
].join(' ');

logger.warn(warningMsg);
return;
}

const comment = kvDiff.filter(
(diff) =>
(diff.key === encodeHex('comment') ||
Expand All @@ -312,88 +316,106 @@ const syncOrganizationAudit = async (organization) => {
logger.info(
`Syncing ${organization.name} generation ${toBeProcessedIndex}`,
);
for (const diff of optimizedKvDiff) {
const key = decodeHex(diff.key);
const modelKey = key.split('|')[0];

if (_.isEmpty(optimizedKvDiff)) {
const auditData = {
orgUid: organization.orgUid,
registryId: organization.registryId,
rootHash: root2.root_hash,
type: diff.type,
table: modelKey,
change: decodeHex(diff.value),
type: 'NO CHANGE',
table: null,
change: null,
onchainConfirmationTimeStamp: root2.timestamp,
generation: toBeProcessedIndex,
comment: _.get(
tryParseJSON(
decodeHex(_.get(comment, '[0].value', encodeHex('{}'))),
),
'comment',
'',
),
author: _.get(
tryParseJSON(
decodeHex(_.get(author, '[0].value', encodeHex('{}'))),
),
'author',
'',
),
comment: '',
author: '',
};

if (modelKey && Object.keys(ModelKeys).includes(modelKey)) {
const record = JSON.parse(decodeHex(diff.value));
const primaryKeyValue =
record[ModelKeys[modelKey].primaryKeyAttributes[0]];

if (diff.type === 'INSERT') {
logger.info(`UPSERTING: ${modelKey} - ${primaryKeyValue}`);
await ModelKeys[modelKey].upsert(record, {
transaction,
mirrorTransaction,
});
} else if (diff.type === 'DELETE') {
logger.info(`DELETING: ${modelKey} - ${primaryKeyValue}`);
await ModelKeys[modelKey].destroy({
where: {
[ModelKeys[modelKey].primaryKeyAttributes[0]]: primaryKeyValue,
},
transaction,
mirrorTransaction,
});
}
await Audit.create(auditData, { transaction, mirrorTransaction });
} else {
for (const diff of optimizedKvDiff) {
const key = decodeHex(diff.key);
const modelKey = key.split('|')[0];

const auditData = {
orgUid: organization.orgUid,
registryId: organization.registryId,
rootHash: root2.root_hash,
type: diff.type,
table: modelKey,
change: decodeHex(diff.value),
onchainConfirmationTimeStamp: root2.timestamp,
generation: toBeProcessedIndex,
comment: _.get(
tryParseJSON(
decodeHex(_.get(comment, '[0].value', encodeHex('{}'))),
),
'comment',
'',
),
author: _.get(
tryParseJSON(
decodeHex(_.get(author, '[0].value', encodeHex('{}'))),
),
'author',
'',
),
};

if (modelKey && Object.keys(ModelKeys).includes(modelKey)) {
const record = JSON.parse(decodeHex(diff.value));
const primaryKeyValue =
record[ModelKeys[modelKey].primaryKeyAttributes[0]];

if (diff.type === 'INSERT') {
logger.info(`UPSERTING: ${modelKey} - ${primaryKeyValue}`);
await ModelKeys[modelKey].upsert(record, {
transaction,
mirrorTransaction,
});
} else if (diff.type === 'DELETE') {
logger.info(`DELETING: ${modelKey} - ${primaryKeyValue}`);
await ModelKeys[modelKey].destroy({
where: {
[ModelKeys[modelKey].primaryKeyAttributes[0]]:
primaryKeyValue,
},
transaction,
mirrorTransaction,
});
}

if (organization.orgUid === homeOrg?.orgUid) {
const stagingUuid = [
'unit',
'project',
'units',
'projects',
].includes(modelKey)
? primaryKeyValue
: undefined;

if (stagingUuid) {
afterCommitCallbacks.push(async () => {
logger.info(`DELETING STAGING: ${stagingUuid}`);
await Staging.destroy({
where: { uuid: stagingUuid },
if (organization.orgUid === homeOrg?.orgUid) {
const stagingUuid = [
'unit',
'project',
'units',
'projects',
].includes(modelKey)
? primaryKeyValue
: undefined;

if (stagingUuid) {
afterCommitCallbacks.push(async () => {
logger.info(`DELETING STAGING: ${stagingUuid}`);
await Staging.destroy({
where: { uuid: stagingUuid },
});
});
});
}
}
}
}

// Create the Audit record
await Audit.create(auditData, { transaction, mirrorTransaction });
await Organization.update(
{ registryHash: root2.root_hash },
{
where: { orgUid: organization.orgUid },
transaction,
mirrorTransaction,
},
);
// Create the Audit record
await Audit.create(auditData, { transaction, mirrorTransaction });
await Organization.update(
{ registryHash: root2.root_hash },
{
where: { orgUid: organization.orgUid },
transaction,
mirrorTransaction,
},
);
}
}
};

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/project.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe('Project Resource Integration Tests', function () {
homeOrgUid = await testFixtures.getHomeOrgId();
});

it.only('deletes a project end-to-end (with simulator)', async function () {
it('deletes a project end-to-end (with simulator)', async function () {
/*
Basic Idea for this test is that we are going to create a project and verify that
the new project propagates through the data layer and into our db. Then we are going
Expand Down

0 comments on commit 1a19a04

Please sign in to comment.