From 60e0a61ccc17c421c91948a2a12847e97df564dc Mon Sep 17 00:00:00 2001 From: Brett Sun Date: Wed, 17 Apr 2019 15:50:00 +0200 Subject: [PATCH] Wrapper: add installedRepos observable (#268) Adds an `installedRepos` observable that emits an array of `InstalledRepos`. --- packages/aragon-wrapper/src/core/apm/index.js | 30 ++ .../aragon-wrapper/src/core/aragonOS/index.js | 10 +- packages/aragon-wrapper/src/index.js | 302 +++++++++++++++++- packages/aragon-wrapper/src/interfaces.js | 2 + .../aragon-wrapper/src/interfaces.test.js | 3 +- 5 files changed, 338 insertions(+), 9 deletions(-) create mode 100644 packages/aragon-wrapper/src/core/apm/index.js diff --git a/packages/aragon-wrapper/src/core/apm/index.js b/packages/aragon-wrapper/src/core/apm/index.js new file mode 100644 index 00000000..7c94369d --- /dev/null +++ b/packages/aragon-wrapper/src/core/apm/index.js @@ -0,0 +1,30 @@ +import { makeProxy } from '../../utils' + +export async function makeRepoProxy (appId, apm, web3) { + const repoAddress = await apm.ensResolve(appId) + return makeProxy(repoAddress, 'Repo', web3) +} + +export async function getAllRepoVersions (repoProxy) { + const versions = [] + const versionCount = await repoProxy.call('getVersionsCount') + + // Versions index starts at 1 + for (let versionId = 1; versionId <= versionCount; ++versionId) { + versions.push(await getRepoVersionById(repoProxy, versionId)) + } + + return Promise.all(versions) +} + +export function getRepoVersionById (repoProxy, versionId) { + return repoProxy + .call('getByVersionId', versionId) + .then(({ contentURI, contractAddress, semanticVersion }) => ({ + contentURI, + contractAddress, + version: semanticVersion.join('.'), + // Keeping this as a string makes comparisons a bit easier down the line + versionId: versionId.toString() + })) +} diff --git a/packages/aragon-wrapper/src/core/aragonOS/index.js b/packages/aragon-wrapper/src/core/aragonOS/index.js index 4fd84480..89ae26a2 100644 --- a/packages/aragon-wrapper/src/core/aragonOS/index.js +++ b/packages/aragon-wrapper/src/core/aragonOS/index.js @@ -40,4 +40,12 @@ function getKernelNamespace (hash) { } } -export { getAragonOsInternalAppInfo, getKernelNamespace } +function isAragonOsInternalApp (appId) { + return APP_MAPPINGS.has(appId) +} + +export { + getAragonOsInternalAppInfo, + getKernelNamespace, + isAragonOsInternalApp +} diff --git a/packages/aragon-wrapper/src/index.js b/packages/aragon-wrapper/src/index.js index 9e147518..12318402 100644 --- a/packages/aragon-wrapper/src/index.js +++ b/packages/aragon-wrapper/src/index.js @@ -1,15 +1,18 @@ // Externals -import { ReplaySubject, Subject, BehaviorSubject, merge } from 'rxjs' +import { ReplaySubject, Subject, BehaviorSubject, merge, of } from 'rxjs' import { + concatMap, debounceTime, distinctUntilChanged, filter, first, map, + mergeAll, mergeMap, publishReplay, scan, skipWhile, + startWith, switchMap, tap, withLatestFrom @@ -30,6 +33,12 @@ import Messenger from '@aragon/rpc-messenger' import * as handlers from './rpc/handlers' // Utilities +import { makeRepoProxy, getAllRepoVersions, getRepoVersionById } from './core/apm' +import { + getAragonOsInternalAppInfo, + getKernelNamespace, + isAragonOsInternalApp +} from './core/aragonOS' import { CALLSCRIPT_ID, encodeCallScript } from './evmscript' import { addressesEqual, @@ -42,8 +51,6 @@ import { ANY_ENTITY } from './utils' -import { getAragonOsInternalAppInfo, getKernelNamespace } from './core/aragonOS' - // Templates import Templates from './templates' @@ -303,7 +310,12 @@ export default class Aragon { * @return {void} */ initApps () { - // Cache requests so we don't make unnecessary calls when a call is already in-flight + /****************************** + * * + * CACHING * + * * + ******************************/ + const applicationInfoCache = new AsyncRequestCache((cacheKey) => { const [appId, codeAddress] = cacheKey.split('.') return getAragonOsInternalAppInfo(appId) || @@ -344,6 +356,12 @@ export default class Aragon { })) }) + /****************************** + * * + * APPS * + * * + ******************************/ + // Get all installed app proxy addresses const installedApps$ = this.permissions.pipe( map(Object.keys), @@ -424,8 +442,8 @@ export default class Aragon { }) } ), - // Emit resolved array of promises - mergeMap(updatedApps => Promise.all(updatedApps)) + // Emit resolved array of promises, one at a time + concatMap(updatedApps => Promise.all(updatedApps)) ) // We merge these two observables, which both return the full list of apps attached with their @@ -436,7 +454,7 @@ export default class Aragon { // Get artifact info for apps const appsWithInfo$ = apps$.pipe( - mergeMap( + concatMap( (apps) => Promise.all( apps.map(async (app) => { let appInfo @@ -457,6 +475,276 @@ export default class Aragon { publishReplay(1) ) this.apps.connect() + + /******************************* + * * + * REPOS * + * * + ******************************/ + + // Initialize installed repos from the list of apps + const installedRepoCache = new Map() + const repo$ = apps$.pipe( + // Map installed apps into a deduped list of their aragonPM repos, with these assumptions: + // - No apps are lying about their appId (malicious apps _could_ masquerade as other + // apps by setting this value themselves) + // - `contractAddress`s will stay the same across all installed apps. + // This is technically not true as apps could set this value themselves + // (e.g. as pinned apps do), but these apps wouldn't be able to upgrade anyway + // + // Ultimately returns an array of objects, holding the repo's: + // - appId + // - base contractAddress + map((apps) => Object.values( + apps + .filter(({ appId }) => !isAragonOsInternalApp(appId)) + .reduce((installedRepos, { appId, codeAddress, updated }) => { + installedRepos[appId] = { + appId, + updated, + contractAddress: codeAddress + } + return installedRepos + }, {}) + )), + + // Filter list of installed repos into: + // - New repos we haven't seen before (so we only subscribe once to their events) + // - Repos with apps that were updated in the kernel, to recalculate their current version + map((repos) => { + const newRepoAppIds = [] + const updatedRepoAppIds = [] + + repos.forEach((repo) => { + const { appId, updated } = repo + if (!installedRepoCache.has(appId)) { + newRepoAppIds.push(appId) + } else if (updated) { + updatedRepoAppIds.push(appId) + } + + // Mark repo as seen and cache installed information + installedRepoCache.set(appId, repo) + }) + + return [newRepoAppIds, updatedRepoAppIds] + }), + + // Stop if there's no new repos or updated apps + filter(([newRepoAppIds, updatedRepoAppIds]) => + newRepoAppIds.length || updatedRepoAppIds.length + ), + + // Project new repos into their ids and web3 proxy objects + concatMap(async ([newRepoAppIds, updatedRepoAppIds]) => { + const newRepos = await Promise.all( + newRepoAppIds.map(async (appId) => { + const repoProxy = await makeRepoProxy(appId, this.apm, this.web3) + await repoProxy.updateInitializationBlock() + + return { + appId, + repoProxy + } + }) + ) + return [newRepos, updatedRepoAppIds] + }), + + // Here's where the fun begins! + // It'll be easy to get lost, so remember to take it slowly. + // Just remember, with this `mergeMap()`, we'll be subscribing to all the projected (returned) + // observables and merging their respective emissions into a single observable. + // + // The output of this merged observable are update events containing the following: + // - `appId`: mandatory, signifies which repo was updated + // - `repoAddress`: optional, address of the repo contract itself + // - `versions`: optional, new version information + mergeMap(([newRepos, updatedRepoAppIds]) => { + // Create a new observable to project each new update as its own update emission. + const update$ = of(...updatedRepoAppIds).pipe( + map((appId) => ({ appId })) + ) + + // Create a new observable to project each new repo as its own emission. + const newRepo$ = of(...newRepos) + + // Create a new observable to project each new repo's address as its own update emission. + const repoAddress$ = newRepo$.pipe( + map(({ appId, repoProxy }) => ({ + appId, + repoAddress: repoProxy.address + })) + ) + + // Create a new observable that projects each NewVersion event as its own update event + // emission. + // This one is a bit trickier, due to the higher order observable. Keep reading. + const version$ = newRepo$.pipe( + // `mergeMap()` to "flatten" the async transformation. This async function returns an + // observable, which is ultimately the NewVersion stream. More on this, after the break. + // Note: we don't care about the ordering, so we use `mergeMap()` instead of `concatMap()` + mergeMap(async ({ appId, repoProxy }) => { + const initialVersions = [ + // Immediately query state from the repo contract, to avoid having to wait until all + // past events sync (may be long) + ...await getAllRepoVersions(repoProxy) + ] + + // Return an observable subscribed to NewVersion events, giving us: + // - Timestamps for versions that were published prior to this process running + // - Notifications for newly published versions + // + // Reduce this with the cached version information to emit version updates for the repo. + return repoProxy.events('NewVersion').pipe( + // Project each event to a new version info object, one at a time + concatMap(async (event) => { + const { versionId: eventVersionId } = event.returnValues + + // Adjust from Ethereum time + const timestamp = (await this.web3.eth.getBlock(event.blockNumber)).timestamp * 1000 + + const versionIndex = initialVersions.findIndex(({ versionId }) => versionId === eventVersionId) + const versionInfo = + versionIndex === -1 + ? await getRepoVersionById(repoProxy, eventVersionId) + : initialVersions[versionIndex] + + return { + ...versionInfo, + timestamp + } + }), + + // Trick to immediately emit (e.g. similar to a do/while loop) + startWith(null), + + // Reduce newly emitted versions into the full list of versions + scan(({ appId, versions }, newVersionInfo) => { + let newVersions = versions + if (newVersionInfo) { + const versionIndex = versions.findIndex(({ versionId }) => versionId === newVersionInfo.versionId) + + if (versionIndex === -1) { + newVersions = versions.concat(newVersionInfo) + } else { + newVersions = Array.from(versions) + newVersions[versionIndex] = newVersionInfo + } + } + + return { + appId, + versions: newVersions + } + }, { + appId, + versions: initialVersions + }) + ) + }), + + // This bit is interesting. + // We've "flattened" our async transformation with the `mergeMap()` above, but it still + // returns an observable. We need to flatten this observable's emissions into the upper + // stream, which is what `mergeAll()` achieves. + mergeAll() + ) + + // Merge all of the repo update events resulting from the apps being updated, and return it + // to the upper `mergeMap()` so it can be re-flattened into a single event stream. + return merge(repoAddress$, version$, update$) + }), + + // Reduce the event stream into a current representation of the installed repos, and which + // repo to update next. + scan(({ repos }, repoUpdate) => { + const { appId: updatedAppId, ...update } = repoUpdate + const updatedRepoInfo = { + ...repos[updatedAppId], + ...update + } + + return { + repos: { + ...repos, + [updatedAppId]: updatedRepoInfo + }, + updatedRepoAppId: updatedAppId + } + }, { + repos: {}, + updatedRepoAppId: null + }), + + // Stop if we don't have enough information yet to continue + filter(({ repos, updatedRepoAppId }) => + !!updatedRepoAppId && Array.isArray(repos[updatedRepoAppId].versions) + ), + + // Grab the full information of the updated repo using its latest values. + // With this, we've taken the basic stream of updates for events and mapped them onto their + // full repo objects. + concatMap(async ({ repos, updatedRepoAppId: appId }) => { + const { repoAddress, versions } = repos[appId] + const installedRepoInfo = installedRepoCache.get(appId) + + const latestVersion = versions[versions.length - 1] + const currentVersion = Array.from(versions) + // Apply reverse to find the latest version with the currently installed contract address + .reverse() + .find(version => version.contractAddress === installedRepoInfo.contractAddress) + + // Get info for the current and latest versions of the repo + const currentVersionRequest = applicationInfoCache + .request(`${appId}.${currentVersion.contractAddress}`) + .catch(() => ({})) + .then(content => ({ + content, + version: currentVersion.version + })) + + const versionInfos = await Promise.all([ + currentVersionRequest, + currentVersion.contractAddress === latestVersion.contractAddress + ? currentVersionRequest // current version is also the latest, no need to refetch + : applicationInfoCache + .request(`${appId}.${latestVersion.contractAddress}`) + .catch(() => ({})) + .then(content => ({ + content, + version: latestVersion.version + })) + ]) + + // Emit updated repo information + return { + appId, + repoAddress, + versions, + currentVersion: versionInfos[0], + latestVersion: versionInfos[1] + } + }) + ) + + this.installedRepos = repo$.pipe( + // Finally, we reduce the merged updates from individual repos into one final, expanding array + // of the installed repos + scan((repos, updatedRepo) => { + const repoIndex = repos.findIndex(repo => repo.repoAddress === updatedRepo.repoAddress) + if (repoIndex === -1) { + return repos.concat(updatedRepo) + } else { + const nextRepos = Array.from(repos) + nextRepos[repoIndex] = updatedRepo + return nextRepos + } + }, []), + debounceTime(100), + publishReplay(1) + ) + this.installedRepos.connect() } /** diff --git a/packages/aragon-wrapper/src/interfaces.js b/packages/aragon-wrapper/src/interfaces.js index 711d26af..fefccbd6 100644 --- a/packages/aragon-wrapper/src/interfaces.js +++ b/packages/aragon-wrapper/src/interfaces.js @@ -5,6 +5,7 @@ import abiAragonERCProxy from '@aragon/os/abi/ERCProxy' import abiAragonForwarder from '@aragon/os/abi/IForwarder' import abiAragonKernel from '@aragon/os/abi/Kernel' import abiAragonEVMScriptRegistry from '@aragon/os/abi/EVMScriptRegistry' +import abiAragonRepo from '@aragon/os/abi/Repo' import abiERC20 from '@aragon/os/abi/ERC20' // Artifacts @@ -19,6 +20,7 @@ const ABIS = { 'aragon/Forwarder': abiAragonForwarder.abi, 'aragon/Kernel': abiAragonKernel.abi, 'aragon/EVM Script Registry': abiAragonEVMScriptRegistry.abi, + 'aragon/Repo': abiAragonRepo.abi, 'standard/ERC20': abiERC20.abi } diff --git a/packages/aragon-wrapper/src/interfaces.test.js b/packages/aragon-wrapper/src/interfaces.test.js index 5369b393..42dc3b04 100644 --- a/packages/aragon-wrapper/src/interfaces.test.js +++ b/packages/aragon-wrapper/src/interfaces.test.js @@ -7,7 +7,7 @@ test.afterEach.always(() => { }) test('interfaces: getAbi', async (t) => { - t.plan(8) + t.plan(9) // arrange const availableABIs = [ 'aragon/ACL', @@ -16,6 +16,7 @@ test('interfaces: getAbi', async (t) => { 'aragon/Forwarder', 'aragon/Kernel', 'aragon/EVM Script Registry', + 'aragon/Repo', 'standard/ERC20' ] // act