Skip to content

Commit

Permalink
normalize responses
Browse files Browse the repository at this point in the history
  • Loading branch information
paulo-ocean committed Oct 16, 2024
1 parent 55f6306 commit 584b6c0
Showing 1 changed file with 161 additions and 136 deletions.
297 changes: 161 additions & 136 deletions src/components/database/ElasticSearchDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
AbstractDdoStateDatabase,
AbstractIndexerDatabase,
AbstractLogDatabase,
AbstractNonceDatabase,
AbstractOrderDatabase
} from './BaseDatabase.js'
import { createElasticsearchClient } from './ElasticsearchConfigHelper.js'
Expand All @@ -14,131 +13,131 @@ import { DATABASE_LOGGER } from '../../utils/logging/common.js'
import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
import { validateObject } from '../core/utils/validateDdoHandler.js'

export class ElasticsearchNonceDatabase extends AbstractNonceDatabase {
private client: Client
private index: string

constructor(config: OceanNodeDBConfig) {
super(config)
this.client = new Client({ node: config.url })
this.index = 'nonce'
this.initializeIndex()
}

private async initializeIndex() {
try {
const indexExists = await this.client.indices.exists({ index: this.index })
if (!indexExists) {
await this.client.indices.create({
index: this.index,
body: {
mappings: {
properties: {
id: { type: 'keyword' },
nonce: { type: 'integer' }
}
}
}
})
}
} catch (e) {
DATABASE_LOGGER.error(e.message)
}
}

async create(address: string, nonce: number) {
try {
await this.client.index({
index: this.index,
id: address,
body: { nonce },
refresh: 'wait_for'
})
return { id: address, nonce }
} catch (error) {
const errorMsg = `Error when creating new nonce entry ${nonce} for address ${address}: ${error.message}`
DATABASE_LOGGER.logMessageWithEmoji(
errorMsg,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
return null
}
}

async retrieve(address: string) {
try {
const result = await this.client.get({
index: this.index,
id: address
})
return result._source
} catch (error) {
const errorMsg = `Error when retrieving nonce entry for address ${address}: ${error.message}`
DATABASE_LOGGER.logMessageWithEmoji(
errorMsg,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
return null
}
}

async update(address: string, nonce: number) {
try {
const exists = await this.client.exists({
index: this.index,
id: address
})

if (exists) {
await this.client.update({
index: this.index,
id: address,
body: {
doc: { nonce }
},
refresh: 'wait_for'
})
} else {
await this.create(address, nonce)
}

return { id: address, nonce }
} catch (error) {
const errorMsg = `Error when updating nonce entry ${nonce} for address ${address}: ${error.message}`
DATABASE_LOGGER.logMessageWithEmoji(
errorMsg,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
return null
}
}

async delete(address: string) {
try {
await this.client.delete({
index: this.index,
id: address,
refresh: 'wait_for'
})
return { id: address }
} catch (error) {
const errorMsg = `Error when deleting nonce entry for address ${address}: ${error.message}`
DATABASE_LOGGER.logMessageWithEmoji(
errorMsg,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
return null
}
}
}
// export class ElasticsearchNonceDatabase extends AbstractNonceDatabase {
// private client: Client
// private index: string

// constructor(config: OceanNodeDBConfig) {
// super(config)
// this.client = new Client({ node: config.url })
// this.index = 'nonce'
// this.initializeIndex()
// }

// private async initializeIndex() {
// try {
// const indexExists = await this.client.indices.exists({ index: this.index })
// if (!indexExists) {
// await this.client.indices.create({
// index: this.index,
// body: {
// mappings: {
// properties: {
// id: { type: 'keyword' },
// nonce: { type: 'integer' }
// }
// }
// }
// })
// }
// } catch (e) {
// DATABASE_LOGGER.error(e.message)
// }
// }

// async create(address: string, nonce: number) {
// try {
// await this.client.index({
// index: this.index,
// id: address,
// body: { nonce },
// refresh: 'wait_for'
// })
// return { id: address, nonce }
// } catch (error) {
// const errorMsg = `Error when creating new nonce entry ${nonce} for address ${address}: ${error.message}`
// DATABASE_LOGGER.logMessageWithEmoji(
// errorMsg,
// true,
// GENERIC_EMOJIS.EMOJI_CROSS_MARK,
// LOG_LEVELS_STR.LEVEL_ERROR
// )
// return null
// }
// }

// async retrieve(address: string) {
// try {
// const result = await this.client.get({
// index: this.index,
// id: address
// })
// return result._source
// } catch (error) {
// const errorMsg = `Error when retrieving nonce entry for address ${address}: ${error.message}`
// DATABASE_LOGGER.logMessageWithEmoji(
// errorMsg,
// true,
// GENERIC_EMOJIS.EMOJI_CROSS_MARK,
// LOG_LEVELS_STR.LEVEL_ERROR
// )
// return null
// }
// }

// async update(address: string, nonce: number) {
// try {
// const exists = await this.client.exists({
// index: this.index,
// id: address
// })

// if (exists) {
// await this.client.update({
// index: this.index,
// id: address,
// body: {
// doc: { nonce }
// },
// refresh: 'wait_for'
// })
// } else {
// await this.create(address, nonce)
// }

// return { id: address, nonce }
// } catch (error) {
// const errorMsg = `Error when updating nonce entry ${nonce} for address ${address}: ${error.message}`
// DATABASE_LOGGER.logMessageWithEmoji(
// errorMsg,
// true,
// GENERIC_EMOJIS.EMOJI_CROSS_MARK,
// LOG_LEVELS_STR.LEVEL_ERROR
// )
// return null
// }
// }

// async delete(address: string) {
// try {
// await this.client.delete({
// index: this.index,
// id: address,
// refresh: 'wait_for'
// })
// return { id: address }
// } catch (error) {
// const errorMsg = `Error when deleting nonce entry for address ${address}: ${error.message}`
// DATABASE_LOGGER.logMessageWithEmoji(
// errorMsg,
// true,
// GENERIC_EMOJIS.EMOJI_CROSS_MARK,
// LOG_LEVELS_STR.LEVEL_ERROR
// )
// return null
// }
// }
// }

export class ElasticsearchIndexerDatabase extends AbstractIndexerDatabase {
private client: Client
Expand Down Expand Up @@ -361,7 +360,9 @@ export class ElasticsearchDdoStateDatabase extends AbstractDdoStateDatabase {
}
}
})
return result.hits.hits.map((hit: any) => hit._source)
return result.hits.hits.map((hit: any) => {
return normalizeDocumentId(hit._source, hit._id)
})
} catch (error) {
const errorMsg = `Error when searching by query ${JSON.stringify(query)}: ${
error.message
Expand Down Expand Up @@ -467,7 +468,9 @@ export class ElasticsearchOrderDatabase extends AbstractOrderDatabase {
}
}
const result = await this.provider.search(searchParams)
return result.hits.hits.map((hit: any) => hit._source)
return result.hits.hits.map((hit: any) => {
return normalizeDocumentId(hit._source, hit._id)
})
} catch (error) {
const errorMsg =
`Error when searching order entry by query ${JSON.stringify(query)}: ` +
Expand Down Expand Up @@ -521,7 +524,7 @@ export class ElasticsearchOrderDatabase extends AbstractOrderDatabase {
index: this.getSchema().index,
id: orderId
})
return result._source
return normalizeDocumentId(result._source, result._id)
} catch (error) {
const errorMsg = `Error when retrieving order ${orderId}: ` + error.message
DATABASE_LOGGER.logMessageWithEmoji(errorMsg, true, LOG_LEVELS_STR.LEVEL_ERROR)
Expand Down Expand Up @@ -652,7 +655,10 @@ export class ElasticsearchDdoDatabase extends AbstractDdoDatabase {
}
})
if (response.hits?.hits.length > 0) {
results.push(response)
const nomalizedResponse = response.hits.hits.map((hit: any) => {
return normalizeDocumentId(hit._source, hit._id)
})
results.push(nomalizedResponse)
}
} catch (error) {
const schemaErrorMsg = `Error for schema ${query.index}: ${error.message}`
Expand All @@ -675,7 +681,10 @@ export class ElasticsearchDdoDatabase extends AbstractDdoDatabase {
}
})
if (response.hits?.hits.length > 0) {
results.push(response)
const nomalizedResponse = response.hits.hits.map((hit: any) => {
return normalizeDocumentId(hit._source, hit._id)
})
results.push(nomalizedResponse)
}
} catch (error) {
const schemaErrorMsg = `Error for schema ${schema.index}: ${error.message}`
Expand Down Expand Up @@ -779,7 +788,7 @@ export class ElasticsearchDdoDatabase extends AbstractDdoDatabase {
// make sure we do not have different responses 4 between DBs
// do the same thing on other methods
if (response._id === ddo.id) {
response.id = response._id
return normalizeDocumentId(response, response._id)
}
return response
} else {
Expand All @@ -792,7 +801,7 @@ export class ElasticsearchDdoDatabase extends AbstractDdoDatabase {
await this.delete(ddo.id)
const response = await this.create(ddo)
if (response._id === ddo.id) {
response.id = response._id
return normalizeDocumentId(response, response._id)
}
return response
}
Expand Down Expand Up @@ -925,7 +934,7 @@ export class ElasticsearchLogDatabase extends AbstractLogDatabase {
})
// uniformize result response (we need an id for the retrieveLog API)
if (result._id) {
logEntry.id = result._id
return normalizeDocumentId(logEntry, result._id)
}
return logEntry
} catch (error) {
Expand All @@ -946,7 +955,7 @@ export class ElasticsearchLogDatabase extends AbstractLogDatabase {
index: this.index,
id
})
return result._source
return normalizeDocumentId(result._source, result._id)
} catch (error) {
const errorMsg = `Error when retrieving log entry: ${error.message}`
DATABASE_LOGGER.logMessageWithEmoji(
Expand Down Expand Up @@ -1009,7 +1018,9 @@ export class ElasticsearchLogDatabase extends AbstractLogDatabase {
console.log('logs results hits:', result.hits)
console.log('logs results hits hits:', result.hits.hits)

return result.hits.hits.map((hit: any) => hit._source)
return result.hits.hits.map((hit: any) => {
return normalizeDocumentId(hit._source, hit._id)
})
} catch (error) {
const errorMsg = `Error when retrieving multiple log entries: ${error.message}`
DATABASE_LOGGER.logMessageWithEmoji(
Expand Down Expand Up @@ -1089,3 +1100,17 @@ export class ElasticsearchLogDatabase extends AbstractLogDatabase {
}
}
}

/**
* Make DB agnostic APIs. The response should be similar, no matter what DB engine is used
* Normalizes the document responses to match same kind of typesense ones
* @param dbResult response from DB
* @param _id id of the element
* @returns result object with id property
*/
export function normalizeDocumentId(dbResult: any, _id?: any): any {
if (_id && !dbResult.id) {
dbResult.id = _id
}
return dbResult
}

0 comments on commit 584b6c0

Please sign in to comment.