Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index order events #145

Merged
merged 56 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
e6aeeb2
Processed OrderStarted.
mariacarmina Dec 6, 2023
8523561
Assert type json.
mariacarmina Dec 6, 2023
f201b13
Index order events.
mariacarmina Dec 6, 2023
f246123
Merged develop branch + resolved conflicts.
mariacarmina Dec 7, 2023
c6bcc23
Added test for order.
mariacarmina Dec 7, 2023
bd64514
Update price inside DDO.
mariacarmina Dec 8, 2023
fbf69c1
fix import.
mariacarmina Dec 8, 2023
196220a
Changed address for generating DID.
mariacarmina Dec 8, 2023
8e8b45b
Added logs.
mariacarmina Dec 8, 2023
f3def8f
Added more specific logs.
mariacarmina Dec 8, 2023
d00f907
Added more logs again.
mariacarmina Dec 8, 2023
2896a37
Changed imports to import Enterprise.
mariacarmina Dec 8, 2023
64e58b4
Resolve commits.
mariacarmina Dec 11, 2023
73148df
Added SSH debug.
mariacarmina Dec 11, 2023
5c954c9
Replaced ssh with displaying the addresses.
mariacarmina Dec 11, 2023
00f9eae
Updated test.
mariacarmina Dec 11, 2023
24f5859
Add logs. Fix type.
mariacarmina Dec 11, 2023
2b82dde
added more debug logs.
mariacarmina Dec 11, 2023
ff105dc
Commented the code.
mariacarmina Dec 11, 2023
380d08e
debug x2.
mariacarmina Dec 11, 2023
c62cb79
tweak
mariacarmina Dec 11, 2023
6570b54
Commented assert.
mariacarmina Dec 11, 2023
161630f
lazy loading config eventProcessor
bogdanfazakas Dec 11, 2023
e1cd623
pull changes
bogdanfazakas Dec 11, 2023
419e223
update test
bogdanfazakas Dec 11, 2023
62ec442
Added test for DDO.
mariacarmina Dec 11, 2023
a73291f
tweaks.
mariacarmina Dec 11, 2023
25a3f70
Updated OrderReused.
mariacarmina Dec 11, 2023
637388e
Added new test for OrderReused.
mariacarmina Dec 11, 2023
3d3ffbe
Added logs.
mariacarmina Dec 11, 2023
06d99c6
tweak.
mariacarmina Dec 11, 2023
52eaed7
Added tests back. Cleanup logs.
mariacarmina Dec 11, 2023
d941b5d
Added orderSchema + tests.
mariacarmina Dec 12, 2023
a643116
Merge branch 'develop' into feature/order-events
mariacarmina Dec 12, 2023
aaabd24
Fix typo.
mariacarmina Dec 12, 2023
dfecc1e
Added log.
mariacarmina Dec 12, 2023
cde58df
Removed orderTx.
mariacarmina Dec 12, 2023
abba9c5
Stored orders in db.
mariacarmina Dec 12, 2023
931cbe5
Updated schema for orders. Updated tests and logic for reuse and star…
mariacarmina Dec 12, 2023
433fffa
Remove validity.
mariacarmina Dec 12, 2023
76254cd
Fixed test regarding db.
mariacarmina Dec 12, 2023
0cd799c
Added more asserts.
mariacarmina Dec 12, 2023
9371125
Fix typo.
mariacarmina Dec 12, 2023
40d49f8
resolve conflicts.
mariacarmina Dec 13, 2023
13af6c2
added utils inside tests.
mariacarmina Dec 13, 2023
8d5b984
fix expect statement.
mariacarmina Dec 13, 2023
ce1dfb1
remove suggested logs.
mariacarmina Dec 13, 2023
035130b
Merge branch 'develop' into feature/order-events
mariacarmina Dec 14, 2023
c324a45
Merge branch 'develop' into feature/order-events
mariacarmina Dec 14, 2023
2333fb0
Merge branch 'develop' into feature/order-events
mariacarmina Dec 14, 2023
2ec5b5a
Merge branch 'develop' into feature/order-events
mariacarmina Dec 15, 2023
2cc74a8
Merge branch 'develop' into feature/order-events
mariacarmina Dec 18, 2023
5de7699
fix review.
mariacarmina Dec 18, 2023
4a90ea9
Fix conflicts.
mariacarmina Dec 18, 2023
a393874
Merge branch 'develop' into feature/order-events
mariacarmina Dec 19, 2023
83d07eb
Merge branch 'develop' into feature/order-events
mariacarmina Dec 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 151 additions & 3 deletions src/components/Indexer/eventProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
Contract,
Interface,
JsonRpcApiProvider,
ethers,
Expand All @@ -15,16 +16,24 @@ import {
getCustomLoggerForModule
} from '../../utils/logging/Logger.js'
import ERC721Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC721Template.sol/ERC721Template.json' assert { type: 'json' }
import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC20TemplateEnterprise.sol/ERC20TemplateEnterprise.json' assert { type: 'json' }
import { getConfig } from '../../utils/config.js'
import { Database } from '../database/index.js'

import { OceanNodeConfig } from '../../@types/OceanNode.js'
export const INDEXER_LOGGER: CustomNodeLogger = getCustomLoggerForModule(
LOGGER_MODULE_NAMES.INDEXER,
LOG_LEVELS_STR.LEVEL_INFO,
defaultConsoleTransport
)

const config = await getConfig()
let config: OceanNodeConfig
// Lazy load configuration
async function getConfiguration(): Promise<OceanNodeConfig> {
if (!config) {
config = await getConfig()
}
return config
}

export const processMetadataEvents = async (
event: ethers.Log,
Expand Down Expand Up @@ -72,7 +81,11 @@ export const processMetadataStateEvent = async (
const decodedEventData = iface.parseLog(eventObj)
const metadataState = parseInt(decodedEventData.args[1].toString())
INDEXER_LOGGER.logMessage(`Processed new metadata state ${metadataState} `, true)
const dbconn = await new Database(config.dbConfig)
const dbconn = await new Database(await (await getConfiguration()).dbConfig)
INDEXER_LOGGER.logMessage(
`NFT address in processing MetadataState: ${event.address} `,
true
)
const did =
'did:op:' +
createHash('sha256')
Expand Down Expand Up @@ -102,3 +115,138 @@ export const processMetadataStateEvent = async (
INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true)
}
}

export const processOrderStartedEvent = async (
event: ethers.Log,
chainId: number,
provider: JsonRpcApiProvider
) => {
const receipt = await provider.getTransactionReceipt(event.transactionHash)
const iface = new Interface(ERC20Template.abi)
const eventObj = {
topics: receipt.logs[0].topics as string[],
data: receipt.logs[0].data
}
const decodedEventData = iface.parseLog(eventObj)
INDEXER_LOGGER.logMessage(`Decoded event data args: ${decodedEventData.args}`, true)
bogdanfazakas marked this conversation as resolved.
Show resolved Hide resolved
const serviceIndex = parseInt(decodedEventData.args[3].toString())
const timestamp = parseInt(decodedEventData.args[4].toString())
const consumer = decodedEventData.args[0].toString()
const payer = decodedEventData.args[1].toString()
INDEXER_LOGGER.logMessage(
`Processed new order for service index ${serviceIndex} at ${timestamp}`,
true
)
const config = await getConfiguration()
const dbconn = await new Database(config.dbConfig)
const datatokenContract = new Contract(
event.address,
ERC20Template.abi,
await provider.getSigner()
)
const nftAddress = await datatokenContract.getERC721Address()
const did =
'did:op:' +
createHash('sha256')
.update(getAddress(nftAddress) + chainId.toString(10))
.digest('hex')
try {
const ddo = await dbconn.ddo.retrieve(did)
if (!ddo) {
INDEXER_LOGGER.logMessage(
`Detected OrderStarted changed for ${did}, but it does not exists.`
)
return
}
if ('stats' in ddo && ddo.services[serviceIndex].datatoken === event.address) {
ddo.stats.orders += 1
} else {
// Still update until we validate and polish schemas for DDO.
// But it should update ONLY if first condition is met.
ddo.stats = {
orders: 1
}
}
await dbconn.order.create(
event.transactionHash,
'startOrder',
timestamp,
consumer,
payer
)
INDEXER_LOGGER.logMessage(`Found did ${did} for order starting on network ${chainId}`)
return ddo
} catch (err) {
INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true)
}
}

export const processOrderReusedEvent = async (
mariacarmina marked this conversation as resolved.
Show resolved Hide resolved
event: ethers.Log,
chainId: number,
provider: JsonRpcApiProvider
) => {
const receipt = await provider.getTransactionReceipt(event.transactionHash)
const iface = new Interface(ERC20Template.abi)
const eventObj = {
topics: receipt.logs[0].topics as string[],
data: receipt.logs[0].data
}
const decodedEventData = iface.parseLog(eventObj)
const startOrderId = decodedEventData.args[0].toString()
const timestamp = parseInt(decodedEventData.args[2].toString())
const payer = decodedEventData.args[1].toString()
INDEXER_LOGGER.logMessage(`Processed reused order at ${timestamp}`, true)
const config = await getConfiguration()
const dbconn = await new Database(config.dbConfig)
const datatokenContract = new Contract(
event.address,
ERC20Template.abi,
await provider.getSigner()
)
const nftAddress = await datatokenContract.getERC721Address()
const did =
'did:op:' +
createHash('sha256')
.update(getAddress(nftAddress) + chainId.toString(10))
.digest('hex')
try {
const ddo = await dbconn.ddo.retrieve(did)
if (!ddo) {
INDEXER_LOGGER.logMessage(
`Detected OrderReused changed for ${did}, but it does not exists.`
)
return
}
INDEXER_LOGGER.logMessage(`Found did ${did} on network ${chainId}.`)
bogdanfazakas marked this conversation as resolved.
Show resolved Hide resolved
ddo.stats.orders += 1

try {
const startOrder = await dbconn.order.retrieve(startOrderId)
if (!startOrder) {
INDEXER_LOGGER.logMessage(
`Detected OrderReused changed for order ${startOrderId}, but it does not exists.`
)
return
}
await dbconn.order.create(
mariacarmina marked this conversation as resolved.
Show resolved Hide resolved
event.transactionHash,
'reuseOrder',
timestamp,
startOrder.consumer,
payer,
startOrderId
)
} catch (error) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error retrieving startOrder for reuseOrder: ${error}`,
true
)
}

return ddo
} catch (err) {
INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error retrieving DDO: ${err}`, true)
}
}
4 changes: 3 additions & 1 deletion src/components/Indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ export class OceanIndexer {
if (
event.method === EVENTS.METADATA_CREATED ||
event.method === EVENTS.METADATA_UPDATED ||
event.method === EVENTS.METADATA_STATE
event.method === EVENTS.METADATA_STATE ||
event.method === EVENTS.ORDER_STARTED ||
event.method === EVENTS.ORDER_REUSED
) {
this.createOrUpdateDDO(event.network, event.data, event.method)
}
Expand Down
45 changes: 39 additions & 6 deletions src/components/Indexer/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ import {
defaultConsoleTransport,
getCustomLoggerForModule
} from '../../utils/logging/Logger.js'
import { processMetadataEvents, processMetadataStateEvent } from './eventProcessor.js'
import {
processMetadataEvents,
processOrderStartedEvent,
processOrderReusedEvent,
processMetadataStateEvent
} from './eventProcessor.js'

export const INDEXER_LOGGER: CustomNodeLogger = getCustomLoggerForModule(
LOGGER_MODULE_NAMES.INDEXER,
Expand Down Expand Up @@ -107,9 +112,17 @@ export const processChunkLogs = async (
} else if (event && event.type === EVENTS.EXCHANGE_RATE_CHANGED) {
INDEXER_LOGGER.logMessage('-- EXCHANGE_RATE_CHANGED -- ', true)
storeEvents[event.type] = await processExchangeRateChanged()
} else if (event && event.type === EVENTS.ORDER_STARTED) {
INDEXER_LOGGER.logMessage('-- ORDER_STARTED -- ', true)
storeEvents[event.type] = await procesOrderStarted()
} else if (
event &&
(event.type === EVENTS.ORDER_STARTED || event.type === EVENTS.ORDER_REUSED)
) {
INDEXER_LOGGER.logMessage('-- ORDER_STARTED || ORDER_REUSED -- ', true)
storeEvents[event.type] = await procesOrdersEvents(
log,
event.type,
provider,
chainId
)
} else if (event && event.type === EVENTS.TOKEN_URI_UPDATE) {
INDEXER_LOGGER.logMessage('-- TOKEN_URI_UPDATE -- ', true)
storeEvents[event.type] = await processTokenUriUpadate()
Expand All @@ -129,8 +142,28 @@ const processExchangeRateChanged = async (): Promise<string> => {
return 'EXCHANGE_RATE_CHANGED'
}

const procesOrderStarted = async (): Promise<string> => {
return 'ORDER_STARTED'
const procesOrdersEvents = async (
mariacarmina marked this conversation as resolved.
Show resolved Hide resolved
log: ethers.Log,
eventType: string,
provider: JsonRpcApiProvider,
chainId: number
): Promise<any> => {
if (eventType === EVENTS.ORDER_STARTED) {
try {
return await processOrderStartedEvent(log, chainId, provider)
} catch (e) {
INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Error proccessing order: ${e}`)
}
} else if (eventType === EVENTS.ORDER_REUSED) {
try {
return await processOrderReusedEvent(log, chainId, provider)
} catch (e) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error proccessing order reused: ${e}`
)
}
}
}

const processTokenUriUpadate = async (): Promise<string> => {
Expand Down
99 changes: 98 additions & 1 deletion src/components/database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,109 @@ import {
LOGGER_MODULE_NAMES,
newCustomDBTransport
} from '../../utils/logging/Logger.js'
import { Logger } from 'winston'

export const DATABASE_LOGGER: CustomNodeLogger = getCustomLoggerForModule(
LOGGER_MODULE_NAMES.DATABASE,
LOG_LEVELS_STR.LEVEL_INFO,
defaultConsoleTransport
)

export class OrderDatabase {
private provider: Typesense

constructor(
private config: OceanNodeDBConfig,
private schema: Schema
) {
return (async (): Promise<OrderDatabase> => {
this.provider = new Typesense(convertTypesenseConfig(this.config.url))
try {
await this.provider.collections(this.schema.name).retrieve()
} catch (error) {
if (error instanceof TypesenseError && error.httpStatus === 404) {
await this.provider.collections().create(this.schema)
}
}
return this
})() as unknown as OrderDatabase
}

async search(query: Record<string, any>) {
try {
const results = []
const result = await this.provider
.collections(this.schema.name)
.documents()
.search(query as TypesenseSearchParams)
results.push(result)
return results
} catch (error) {
return null
}
}

async create(
orderId: string,
type: string,
timestamp: number,
consumer: string,
payer: string,
startOrderId?: string
) {
try {
return await this.provider
.collections(this.schema.name)
.documents()
.create({ id: orderId, type, timestamp, consumer, payer, startOrderId })
} catch (error) {
return null
}
}

async retrieve(orderId: string) {
try {
return await this.provider
.collections(this.schema.name)
.documents()
.retrieve(orderId)
} catch (error) {
return null
}
}

async update(
orderId: string,
type: string,
timestamp: number,
consumer: string,
payer: string,
startOrderId?: string
) {
try {
return await this.provider
.collections(this.schema.name)
.documents()
.update(orderId, { type, timestamp, consumer, payer, startOrderId })
} catch (error) {
if (error instanceof TypesenseError && error.httpStatus === 404) {
return await this.provider
.collections(this.schema.name)
.documents()
.create({ id: orderId, type, timestamp, consumer, payer, startOrderId })
}
return null
}
}

async delete(orderId: string) {
try {
return await this.provider.collections(this.schema.name).documents().delete(orderId)
} catch (error) {
return null
}
}
}

export class DdoDatabase {
private provider: Typesense

Expand Down Expand Up @@ -352,6 +447,7 @@ export class Database {
nonce: NonceDatabase
indexer: IndexerDatabase
logs: LogDatabase
order: OrderDatabase

constructor(private config: OceanNodeDBConfig) {
return (async (): Promise<Database> => {
Expand All @@ -363,6 +459,7 @@ export class Database {
this.nonce = await new NonceDatabase(config, schemas.nonceSchemas)
this.indexer = await new IndexerDatabase(config, schemas.indexerSchemas)
this.logs = await new LogDatabase(config, schemas.logSchemas)
this.order = await new OrderDatabase(config, schemas.orderSchema)
return this
})() as unknown as Database
}
Expand Down
Loading
Loading