Skip to content

Commit

Permalink
Index order events (#145)
Browse files Browse the repository at this point in the history
* Processed OrderStarted.

* Assert type json.

* Index order events.

* Added test for order.

* Update price inside DDO.

* fix import.

* Changed address for generating DID.

* Added logs.

* Added more specific logs.

* Added more logs again.

* Changed imports to import Enterprise.

* Added SSH debug.

* Replaced ssh with displaying the addresses.

* Updated test.

* Add logs. Fix type.

* added more debug logs.

* Commented the code.

* debug x2.

* tweak

* Commented assert.

* lazy loading config eventProcessor

* update test

* Added test for DDO.

* tweaks.

* Updated OrderReused.

* Added new test for OrderReused.

* Added logs.

* tweak.

* Added tests back. Cleanup logs.

* Added orderSchema + tests.

* Fix typo.

* Added log.

* Removed orderTx.

* Stored orders in db.

* Updated schema for orders. Updated tests and logic for reuse and startOrder processing events.

* Remove validity.

* Fixed test regarding db.

* Added more asserts.

* Fix typo.

* added utils inside tests.

* fix expect statement.

* remove suggested logs.

* fix review.

---------

Co-authored-by: Bogdan Fazakas <bogdan.fazakas@gmail.com>
  • Loading branch information
mariacarmina and bogdanfazakas authored Dec 19, 2023
1 parent 7c0d826 commit 8b69f76
Show file tree
Hide file tree
Showing 10 changed files with 517 additions and 29 deletions.
151 changes: 149 additions & 2 deletions src/components/Indexer/eventProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
Contract,
Interface,
JsonRpcApiProvider,
ethers,
Expand All @@ -15,8 +16,10 @@ import {
getCustomLoggerForModule
} from '../../utils/logging/Logger.js'
import ERC721Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC721Template.sol/ERC721Template.json' assert { type: 'json' }
import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC20TemplateEnterprise.sol/ERC20TemplateEnterprise.json' assert { type: 'json' }
import { getConfig } from '../../utils/config.js'
import { Database } from '../database/index.js'
import { OceanNodeConfig } from '../../@types/OceanNode.js'
import { MetadataStates } from '../../utils/constants.js'

export const INDEXER_LOGGER: CustomNodeLogger = getCustomLoggerForModule(
Expand All @@ -25,7 +28,14 @@ export const INDEXER_LOGGER: CustomNodeLogger = getCustomLoggerForModule(
defaultConsoleTransport
)

const config = await getConfig()
let config: OceanNodeConfig
// Lazy load configuration
async function getConfiguration(): Promise<OceanNodeConfig> {
if (!config) {
config = await getConfig()
}
return config
}

function getTokenInfo(services: any[]): any[] {
const datatokens: any[] = []
Expand Down Expand Up @@ -86,7 +96,11 @@ export const processMetadataStateEvent = async (
const decodedEventData = iface.parseLog(eventObj)
const metadataState = parseInt(decodedEventData.args[1].toString())
INDEXER_LOGGER.logMessage(`Processed new metadata state ${metadataState} `, true)
const dbconn = await new Database(config.dbConfig)
const dbconn = await new Database(await (await getConfiguration()).dbConfig)
INDEXER_LOGGER.logMessage(
`NFT address in processing MetadataState: ${event.address} `,
true
)
const did =
'did:op:' +
createHash('sha256')
Expand Down Expand Up @@ -152,3 +166,136 @@ export const processMetadataStateEvent = async (
INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true)
}
}

export const processOrderStartedEvent = async (
event: ethers.Log,
chainId: number,
provider: JsonRpcApiProvider
) => {
const receipt = await provider.getTransactionReceipt(event.transactionHash)
const iface = new Interface(ERC20Template.abi)
const eventObj = {
topics: receipt.logs[0].topics as string[],
data: receipt.logs[0].data
}
const decodedEventData = iface.parseLog(eventObj)
const serviceIndex = parseInt(decodedEventData.args[3].toString())
const timestamp = parseInt(decodedEventData.args[4].toString())
const consumer = decodedEventData.args[0].toString()
const payer = decodedEventData.args[1].toString()
INDEXER_LOGGER.logMessage(
`Processed new order for service index ${serviceIndex} at ${timestamp}`,
true
)
const config = await getConfiguration()
const dbconn = await new Database(config.dbConfig)
const datatokenContract = new Contract(
event.address,
ERC20Template.abi,
await provider.getSigner()
)
const nftAddress = await datatokenContract.getERC721Address()
const did =
'did:op:' +
createHash('sha256')
.update(getAddress(nftAddress) + chainId.toString(10))
.digest('hex')
try {
const ddo = await dbconn.ddo.retrieve(did)
if (!ddo) {
INDEXER_LOGGER.logMessage(
`Detected OrderStarted changed for ${did}, but it does not exists.`
)
return
}
if ('stats' in ddo && ddo.services[serviceIndex].datatoken === event.address) {
ddo.stats.orders += 1
} else {
// Still update until we validate and polish schemas for DDO.
// But it should update ONLY if first condition is met.
ddo.stats = {
orders: 1
}
}
await dbconn.order.create(
event.transactionHash,
'startOrder',
timestamp,
consumer,
payer
)
INDEXER_LOGGER.logMessage(`Found did ${did} for order starting on network ${chainId}`)
return ddo
} catch (err) {
INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true)
}
}

export const processOrderReusedEvent = async (
event: ethers.Log,
chainId: number,
provider: JsonRpcApiProvider
) => {
const receipt = await provider.getTransactionReceipt(event.transactionHash)
const iface = new Interface(ERC20Template.abi)
const eventObj = {
topics: receipt.logs[0].topics as string[],
data: receipt.logs[0].data
}
const decodedEventData = iface.parseLog(eventObj)
const startOrderId = decodedEventData.args[0].toString()
const timestamp = parseInt(decodedEventData.args[2].toString())
const payer = decodedEventData.args[1].toString()
INDEXER_LOGGER.logMessage(`Processed reused order at ${timestamp}`, true)
const config = await getConfiguration()
const dbconn = await new Database(config.dbConfig)
const datatokenContract = new Contract(
event.address,
ERC20Template.abi,
await provider.getSigner()
)
const nftAddress = await datatokenContract.getERC721Address()
const did =
'did:op:' +
createHash('sha256')
.update(getAddress(nftAddress) + chainId.toString(10))
.digest('hex')
try {
const ddo = await dbconn.ddo.retrieve(did)
if (!ddo) {
INDEXER_LOGGER.logMessage(
`Detected OrderReused changed for ${did}, but it does not exists.`
)
return
}
ddo.stats.orders += 1

try {
const startOrder = await dbconn.order.retrieve(startOrderId)
if (!startOrder) {
INDEXER_LOGGER.logMessage(
`Detected OrderReused changed for order ${startOrderId}, but it does not exists.`
)
return
}
await dbconn.order.create(
event.transactionHash,
'reuseOrder',
timestamp,
startOrder.consumer,
payer,
startOrderId
)
} catch (error) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error retrieving startOrder for reuseOrder: ${error}`,
true
)
}

return ddo
} catch (err) {
INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true)
}
}
4 changes: 3 additions & 1 deletion src/components/Indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ export class OceanIndexer {
if (
event.method === EVENTS.METADATA_CREATED ||
event.method === EVENTS.METADATA_UPDATED ||
event.method === EVENTS.METADATA_STATE
event.method === EVENTS.METADATA_STATE ||
event.method === EVENTS.ORDER_STARTED ||
event.method === EVENTS.ORDER_REUSED
) {
this.createOrUpdateDDO(event.network, event.data, event.method)
}
Expand Down
38 changes: 33 additions & 5 deletions src/components/Indexer/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ import {
defaultConsoleTransport,
getCustomLoggerForModule
} from '../../utils/logging/Logger.js'
import { processMetadataEvents, processMetadataStateEvent } from './eventProcessor.js'
import {
processMetadataEvents,
processOrderStartedEvent,
processOrderReusedEvent,
processMetadataStateEvent
} from './eventProcessor.js'

export const INDEXER_LOGGER: CustomNodeLogger = getCustomLoggerForModule(
LOGGER_MODULE_NAMES.INDEXER,
Expand Down Expand Up @@ -108,8 +113,11 @@ export const processChunkLogs = async (
INDEXER_LOGGER.logMessage('-- EXCHANGE_RATE_CHANGED -- ', true)
storeEvents[event.type] = await processExchangeRateChanged()
} else if (event && event.type === EVENTS.ORDER_STARTED) {
INDEXER_LOGGER.logMessage('-- ORDER_STARTED -- ', true)
storeEvents[event.type] = await procesOrderStarted()
INDEXER_LOGGER.logMessage(`-- ${event.type} triggered`, true)
storeEvents[event.type] = await procesOrderStarted(log, provider, chainId)
} else if (event && event.type === EVENTS.ORDER_REUSED) {
INDEXER_LOGGER.logMessage(`-- ${event.type} triggered`, true)
storeEvents[event.type] = await processOrderReused(log, provider, chainId)
} else if (event && event.type === EVENTS.TOKEN_URI_UPDATE) {
INDEXER_LOGGER.logMessage('-- TOKEN_URI_UPDATE -- ', true)
storeEvents[event.type] = await processTokenUriUpadate()
Expand All @@ -129,8 +137,28 @@ const processExchangeRateChanged = async (): Promise<string> => {
return 'EXCHANGE_RATE_CHANGED'
}

const procesOrderStarted = async (): Promise<string> => {
return 'ORDER_STARTED'
const procesOrderStarted = async (
log: ethers.Log,
provider: JsonRpcApiProvider,
chainId: number
): Promise<any> => {
try {
return await processOrderStartedEvent(log, chainId, provider)
} catch (e) {
INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error proccessing order: ${e}`)
}
}

const processOrderReused = async (
log: ethers.Log,
provider: JsonRpcApiProvider,
chainId: number
): Promise<any> => {
try {
return await processOrderReusedEvent(log, chainId, provider)
} catch (e) {
INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error proccessing order reused: ${e}`)
}
}

const processTokenUriUpadate = async (): Promise<string> => {
Expand Down
99 changes: 98 additions & 1 deletion src/components/database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,109 @@ import {
LOGGER_MODULE_NAMES,
newCustomDBTransport
} from '../../utils/logging/Logger.js'
import { Logger } from 'winston'

export const DATABASE_LOGGER: CustomNodeLogger = getCustomLoggerForModule(
LOGGER_MODULE_NAMES.DATABASE,
LOG_LEVELS_STR.LEVEL_INFO,
defaultConsoleTransport
)

export class OrderDatabase {
private provider: Typesense

constructor(
private config: OceanNodeDBConfig,
private schema: Schema
) {
return (async (): Promise<OrderDatabase> => {
this.provider = new Typesense(convertTypesenseConfig(this.config.url))
try {
await this.provider.collections(this.schema.name).retrieve()
} catch (error) {
if (error instanceof TypesenseError && error.httpStatus === 404) {
await this.provider.collections().create(this.schema)
}
}
return this
})() as unknown as OrderDatabase
}

async search(query: Record<string, any>) {
try {
const results = []
const result = await this.provider
.collections(this.schema.name)
.documents()
.search(query as TypesenseSearchParams)
results.push(result)
return results
} catch (error) {
return null
}
}

async create(
orderId: string,
type: string,
timestamp: number,
consumer: string,
payer: string,
startOrderId?: string
) {
try {
return await this.provider
.collections(this.schema.name)
.documents()
.create({ id: orderId, type, timestamp, consumer, payer, startOrderId })
} catch (error) {
return null
}
}

async retrieve(orderId: string) {
try {
return await this.provider
.collections(this.schema.name)
.documents()
.retrieve(orderId)
} catch (error) {
return null
}
}

async update(
orderId: string,
type: string,
timestamp: number,
consumer: string,
payer: string,
startOrderId?: string
) {
try {
return await this.provider
.collections(this.schema.name)
.documents()
.update(orderId, { type, timestamp, consumer, payer, startOrderId })
} catch (error) {
if (error instanceof TypesenseError && error.httpStatus === 404) {
return await this.provider
.collections(this.schema.name)
.documents()
.create({ id: orderId, type, timestamp, consumer, payer, startOrderId })
}
return null
}
}

async delete(orderId: string) {
try {
return await this.provider.collections(this.schema.name).documents().delete(orderId)
} catch (error) {
return null
}
}
}

export class DdoDatabase {
private provider: Typesense

Expand Down Expand Up @@ -352,6 +447,7 @@ export class Database {
nonce: NonceDatabase
indexer: IndexerDatabase
logs: LogDatabase
order: OrderDatabase

constructor(private config: OceanNodeDBConfig) {
return (async (): Promise<Database> => {
Expand All @@ -363,6 +459,7 @@ export class Database {
this.nonce = await new NonceDatabase(config, schemas.nonceSchemas)
this.indexer = await new IndexerDatabase(config, schemas.indexerSchemas)
this.logs = await new LogDatabase(config, schemas.logSchemas)
this.order = await new OrderDatabase(config, schemas.orderSchema)
return this
})() as unknown as Database
}
Expand Down
Loading

0 comments on commit 8b69f76

Please sign in to comment.