Skip to content

Commit

Permalink
prevent race conditions and add logs (#889)
Browse files Browse the repository at this point in the history
  • Loading branch information
menduz authored Feb 1, 2022
1 parent 39df832 commit 6105dcf
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 31 deletions.
6 changes: 2 additions & 4 deletions content/src/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { Environment, EnvironmentConfig } from './Environment'
import { FetcherFactory } from './helpers/FetcherFactory'
import { metricsDeclaration } from './metrics'
import { MigrationManagerFactory } from './migrations/MigrationManagerFactory'
import { createBloomFilterComponent } from './ports/bloomFilter'
import { createDeploymentListComponent } from './ports/deploymentListComponent'
import { createFailedDeploymentsCache } from './ports/failedDeploymentsCache'
import { createFetchComponent } from './ports/fetcher'
import { createDatabaseComponent } from './ports/postgres'
Expand Down Expand Up @@ -74,9 +74,7 @@ export async function initComponentsWithEnv(env: Environment): Promise<AppCompon
const validator = createValidator({ storage, authenticator, catalystFetcher, env, logs })
const serverValidator = createServerValidator({ failedDeploymentsCache })

const deployedEntitiesFilter = createBloomFilterComponent({
sizeInBytes: 512
})
const deployedEntitiesFilter = createDeploymentListComponent({ database, logs })

let deployer: MetaverseContentService = ServiceFactory.create({
metrics,
Expand Down
2 changes: 1 addition & 1 deletion content/src/logic/database-queries/deployments-queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export async function* streamAllEntityIds(components: Pick<AppComponents, 'datab
SQL`
SELECT entity_id FROM deployments
`,
{ batchSize: 1000 }
{ batchSize: 10000 }
)) {
yield {
entityId: row.entity_id
Expand Down
2 changes: 1 addition & 1 deletion content/src/logic/deployments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export async function isEntityDeployed(
// this condition should be carefully handled:
// 1) it first uses the bloom filter to know wheter or not an entity may exist or definitely don't exist (.check)
// 2) then it checks against the DB (deploymentExists)
return components.deployedEntitiesFilter.check(entityId) && (await deploymentExists(components, entityId))
return (await components.deployedEntitiesFilter.check(entityId)) && (await deploymentExists(components, entityId))
}

export async function retryFailedDeploymentExecution(
Expand Down
45 changes: 45 additions & 0 deletions content/src/ports/deploymentListComponent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { IBaseComponent } from '@well-known-components/interfaces'
import future from 'fp-future'
import { streamAllEntityIds } from '../logic/database-queries/deployments-queries'
import { AppComponents } from '../types'
import { createBloomFilterComponent } from './bloomFilter'

export type DeploymentListComponent = {
add(entityId: string): void
check(entityId: string): Promise<boolean>
}

export function createDeploymentListComponent(
components: Pick<AppComponents, 'database' | 'logs'>
): DeploymentListComponent & IBaseComponent {
const bloom = createBloomFilterComponent({ sizeInBytes: 512 })

const initialized = future<void>()

const logs = components.logs.getLogger('DeploymentListComponent')

async function addFromDb() {
const start = Date.now()
logs.info(`Creating bloom filter`, {})
let elements = 0
for await (const row of streamAllEntityIds(components)) {
elements++
bloom.add(row.entityId)
}
logs.info(`Bloom filter recreated.`, { timeMs: Date.now() - start, elements })
}

return {
add(entityId: string) {
bloom.add(entityId)
},
async check(entityId: string) {
await initialized
return bloom.check(entityId)
},
async start() {
await addFromDb()
initialized.resolve()
}
}
}
16 changes: 15 additions & 1 deletion content/src/service/ServiceImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export class ServiceImpl implements MetaverseContentService {
}

// Update the current list of pointers being deployed
if (!entity.pointers)
if (!entity.pointers || entity.pointers.length == 0)
return InvalidResult({
errors: [`The entity does not have any pointer.`]
})
Expand All @@ -126,6 +126,11 @@ export class ServiceImpl implements MetaverseContentService {
const contextToDeploy: DeploymentContext = this.calculateIfLegacy(entity, auditInfo.authChain, context)

try {
ServiceImpl.LOGGER.error(`Deploying entity`, {
entityId,
pointers: entity.pointers.join(' ')
})

const storeResult = await this.storeDeploymentInDatabase(
task,
entityId,
Expand All @@ -145,6 +150,11 @@ export class ServiceImpl implements MetaverseContentService {
})
return InvalidResult({ errors: ['An internal server error occured. This will raise an automatic alarm.'] })
} else if (isInvalidDeployment(storeResult)) {
ServiceImpl.LOGGER.error(`Error deploying entity`, {
entityId,
pointers: entity.pointers.join(' '),
errors: storeResult.errors.join(' ')
})
if (storeResult.errors.length == 0) {
ServiceImpl.LOGGER.error(`Invalid InvalidResult, got 0 errors`, {
entityId,
Expand All @@ -155,6 +165,10 @@ export class ServiceImpl implements MetaverseContentService {
}
return storeResult
} else if (storeResult.wasEntityDeployed) {
ServiceImpl.LOGGER.error(`Entity deployed`, {
entityId,
pointers: entity.pointers.join(' ')
})
this.components.metrics.increment('total_deployments_count', { entity_type: entity.type }, 1)

// Invalidate cache for retrieving entities by id
Expand Down
18 changes: 1 addition & 17 deletions content/src/service/synchronization/batchDeployer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { createJobQueue } from '@dcl/snapshots-fetcher/dist/job-queue-port'
import { IDeployerComponent, RemoteEntityDeployment } from '@dcl/snapshots-fetcher/dist/types'
import { IBaseComponent } from '@well-known-components/interfaces'
import { streamAllEntityIds } from '../../logic/database-queries/deployments-queries'
import { isEntityDeployed } from '../../logic/deployments'
import { FailureReason } from '../../ports/failedDeploymentsCache'
import { AppComponents, CannonicalEntityDeployment } from '../../types'
Expand Down Expand Up @@ -115,26 +114,11 @@ export function createBatchDeployerComponent(
}
}

async function createBloomFilterDeployments() {
const start = Date.now()

const filter = components.deployedEntitiesFilter
filter.reset()
let elements = 0
for await (const row of streamAllEntityIds(components)) {
elements++
filter.add(row.entityId)
}
logs.info(`Bloom filter recreated.`, { timeMs: Date.now() - start, elements })
}

// TODO: [new-sync] every now and then cleanup the deploymentsMap of old deployments

return {
async start() {
await createBloomFilterDeployments()
},
async stop() {
// stop will wait for the queue to end.
return parallelDeploymentJobs.onIdle()
},
onIdle() {
Expand Down
4 changes: 2 additions & 2 deletions content/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { Denylist } from './denylist/Denylist'
import { Environment } from './Environment'
import { metricsDeclaration } from './metrics'
import { MigrationManager } from './migrations/MigrationManager'
import { IBloomFilterComponent } from './ports/bloomFilter'
import { DeploymentListComponent } from './ports/deploymentListComponent'
import { IFailedDeploymentsCacheComponent } from './ports/failedDeploymentsCache'
import { IDatabaseComponent } from './ports/postgres'
import { Repository } from './repository/Repository'
Expand Down Expand Up @@ -44,7 +44,7 @@ export type AppComponents = {
}
batchDeployer: IDeployerComponent
synchronizationJobManager: JobLifecycleManagerComponent
deployedEntitiesFilter: IBloomFilterComponent
deployedEntitiesFilter: DeploymentListComponent
synchronizationManager: ClusterSynchronizationManager
controller: Controller
snapshotManager: ISnapshotManager
Expand Down
7 changes: 2 additions & 5 deletions content/test/unit/service/Service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { ContentFileHash, Deployment, Entity, EntityType, EntityVersion, Hashing
import { Authenticator } from 'dcl-crypto'
import { Environment } from '../../../src/Environment'
import { metricsDeclaration } from '../../../src/metrics'
import { createBloomFilterComponent } from '../../../src/ports/bloomFilter'
import { createDeploymentListComponent } from '../../../src/ports/deploymentListComponent'
import { createFailedDeploymentsCache } from '../../../src/ports/failedDeploymentsCache'
import { createDatabaseComponent } from '../../../src/ports/postgres'
import { ContentAuthenticator } from '../../../src/service/auth/Authenticator'
Expand Down Expand Up @@ -246,10 +246,7 @@ describe('Service', function () {
const pointerManager = NoOpPointerManager.build()
const authenticator = new ContentAuthenticator('', DECENTRALAND_ADDRESS)
const database = await createDatabaseComponent({ logs, env })

const deployedEntitiesFilter = createBloomFilterComponent({
sizeInBytes: 512
})
const deployedEntitiesFilter = createDeploymentListComponent({ database, logs })

return ServiceFactory.create({
env,
Expand Down

0 comments on commit 6105dcf

Please sign in to comment.