Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ethereum and update Iotex actions #132

Merged
merged 8 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/data/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
},
"devDependencies": {
"@types/node": "^17.0.38",
"esbuild": "^0.14.42",
"esbuild": "^0.15.9",
"esno": "^0.16.3"
},
"dependencies": {
Expand Down
4 changes: 2 additions & 2 deletions common/data/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export function schemaToGlueColumns(jsonSchema: JsonSchema): glue.Column[] {

let type: glue.Type = glue.Schema[typeKey]

if (name.endsWith('at')) type = glue.Schema.DATE
if (name.endsWith('at')) type = glue.Schema.TIMESTAMP

if (name === 'candidate_list') type = glue.Schema.array(glue.Schema.STRING)

Expand All @@ -41,7 +41,7 @@ export type EventTableColumn = {
to_address: string
candidate: string
candidate_list: string[]
amount: number
amount: string
duration: number
auto_stake: boolean
// payload: Record<string, unknown>
Expand Down
2 changes: 1 addition & 1 deletion common/data/src/schemas/agg.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
},
"first_staked_at": {
"type": "string",
"description": "The first date (MM-DD-YYYY) that a wallet staked"
"description": "First staked at datestring"
},
"total_staked_amount": {
"type": "string",
Expand Down
12 changes: 6 additions & 6 deletions common/data/src/schemas/event.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@
},
"height": {
"type": "integer",
"description": "The block height of the event"
"description": "The height of the event"
},
"created_at": {
"type": "string",
"description": "The date (MM-DD-YYYY) of the event"
"description": "The date and time of the event in ISO 8601 format e.g. 2015-03-04T22:44:30.652Z"
},
"address": {
"type": "string",
"description": "The address that initiated the event"
"description": "The address which initiated the event"
},
"to_address": {
"type": "string",
"description": "The address which received the action event"
"description": "The recipient's address"
},
"candidate": {
"type": "string",
Expand All @@ -46,11 +46,11 @@
},
"amount": {
"type": "string",
"description": "The amount of the currency in the event"
"description": "The amount of currency associated with the event"
},
"duration":{
"type": "string",
"description": "The duration of the action"
"description": "The duration of the event"
},
"auto_stake": {
"type": "boolean",
Expand Down
2 changes: 1 addition & 1 deletion common/helpers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
},
"devDependencies": {
"@types/node": "^17.0.38",
"esbuild": "^0.14.42",
"esbuild": "^0.15.9",
"esno": "^0.16.3"
}
}
232 changes: 232 additions & 0 deletions common/helpers/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import { S3Client, S3ClientConfig, PutObjectCommand, GetObjectCommand } from '@aws-sdk/client-s3'
import { AthenaClient, AthenaClientConfig } from '@aws-sdk/client-athena'
import { defaultProvider } from '@aws-sdk/credential-provider-node'
import { StartQueryExecutionCommand, GetQueryExecutionCommand } from '@aws-sdk/client-athena'
import { EventTableColumn } from '@casimir/data'

/**
* Converts any string to PascalCase.
*
Expand All @@ -9,4 +15,230 @@ export function pascalCase(str: string): string {
return str.replace(/\w+/g, (word) => {
return word[0].toUpperCase() + word.slice(1).toLowerCase()
})
}

let athena: AthenaClient | null = null
let s3: S3Client | null = null

/**
* Creates a new Athena client
*
* @param opt - Athena client config
* @returns Athena client
*
*/
export async function newAthenaClient(opt?: AthenaClientConfig): Promise<AthenaClient> {
if (opt?.region === undefined) {
opt = {
region: 'us-east-2'
}
}

if (opt.credentials === undefined) {
opt = {
credentials: defaultProvider()
}
}
const client = new AthenaClient(opt)
athena = client

return client
}

/**
* Creates a new S3 client
*
* @param opt - S3 client config
* @returns S3 client
*
*/
export async function newS3Client (opt?: S3ClientConfig): Promise<S3Client> {
if (s3) {
return s3
}

if (opt?.region === undefined) {
opt = {
region: 'us-east-2'
}
}

if (opt.credentials === undefined) {
opt = {
credentials: defaultProvider()
}
}

const client = new S3Client(opt)
s3 = client

return client
}

/**
* Uploads data to S3
*
* @param input.bucket - Bucket destination
* @param input.key - Key destination
* @param input.data - Data to be uploaded
*
*/
export async function uploadToS3( input: { bucket: string, key: string, data: string }): Promise<void> {
if (!s3) {
s3 = await newS3Client()
}

const upload = new PutObjectCommand({
Bucket: input.bucket,
Key: input.key,
Body: input.data
})

const { $metadata } = await s3.send(upload)
if ($metadata.httpStatusCode !== 200) throw new Error('Error uploading to s3')
}

/**
* Get data from S3
*
* @param input.bucket - Bucket destination
* @param input.key - Key destination
* @param input.data - Data to be uploaded
* @return data - Data from S3
*
*/
export async function getFromS3(bucket: string, key: string): Promise<string> {
if (!s3) {
s3 = await newS3Client()
}

const { $metadata, Body } = await s3.send(new GetObjectCommand({
Bucket: bucket,
Key: key
// Bucket: 'cms-lds-agg',
// Key: `cms_hcf_aggregates/${res.QueryExecutionId}.csv`
}))

if ($metadata.httpStatusCode !== 200) throw new Error('FailedQuery: unable retrieve result from S3')
if (Body === undefined) throw new Error('InvalidQueryResult: query result is undefined')

let chunk = ''

// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
for await (const data of Body) {
chunk += data.toString()
}
return chunk
}

let retry = 0
let backoff = 500

/**
* Poll for Athena query's result
*
* @param input.bucket - Bucket destination
* @param input.key - Key destination
* @param input.data - Data to be uploaded
*
*/
async function pollAthenaQueryOutput(queryId: string): Promise<void> {
if (!athena) {
athena = await newAthenaClient()
}

const getStateCmd = new GetQueryExecutionCommand({
QueryExecutionId: queryId
})

const { $metadata, QueryExecution } = await athena.send(getStateCmd)

if ($metadata.httpStatusCode !== 200) throw new Error('FailedQuery: unable to query Athena')
if (QueryExecution === undefined) throw new Error('InvalidQueryExecution: query execution is undefined')
if (QueryExecution.Status === undefined) throw new Error('InvalidQueryExecutionStatus: query execution status is undefined')

if (QueryExecution.Status.State === 'QUEUED' || QueryExecution.Status.State === 'RUNNING') {
setTimeout(() => {
pollAthenaQueryOutput(queryId)
retry++
backoff = backoff + 500
}, backoff)
}

if (QueryExecution.Status.State === 'FAILED') {
const reason = QueryExecution.Status.StateChangeReason
if (reason && reason.includes('HIVE_BAD_DATA')) {
throw new Error('FailedQuery: Check the table for bad data')
} else {
throw new Error('QueryFailed: query failed')
}
}
if (QueryExecution.Status.State === 'SUCCEEDED')
return
}

/**
* Runs a SQL query on Athena table
*
* @param query - SQL query to run (make sure the correct permissions are set)
* @return string - Query result
*/
export async function queryAthena(query: string): Promise<EventTableColumn[] | null> {

if (!athena) {
athena = await newAthenaClient()
}

const execCmd = new StartQueryExecutionCommand({
QueryString: query,
WorkGroup: 'primary',
ResultConfiguration: {
OutputLocation: 's3://cms-lds-agg/cms_hcf_aggregates/'
}
})

const { $metadata, QueryExecutionId } = await athena.send(execCmd)

if ($metadata.httpStatusCode !== 200) {
throw new Error('FailedQuery: unable to query Athena')
}

if (QueryExecutionId === undefined) {
throw new Error('InvalidQueryExecutionId: query execution id is undefined')
}

await pollAthenaQueryOutput(QueryExecutionId)

// wait for athena to finish writing to s3
await new Promise(resolve => setTimeout(resolve, 2000))

const raw = await getFromS3('cms-lds-agg', `cms_hcf_aggregates/${QueryExecutionId}.csv`)

const rows = raw.split('\n').filter(r => r !== '')

if (rows.length <= 1) {
return null
}

const header = rows.splice(0, 1)[0].split(',').map((h: string) => h.trim().replace(/"/g, ''))

const events: EventTableColumn[] = []

rows.forEach((curr, i) => {
const row = curr.split(',')
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
const event: EventTableColumn = {}
row.forEach((r, i) => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
event[header[i]] = r.trim().replace(/"/g, '')
})

if (event) {
events.push(event)
}
})
return events
}
Loading