Skip to content

Commit

Permalink
Fix coordinates and switch to kamoot (#440)
Browse files Browse the repository at this point in the history
* Fix coordinates and switch to kamoot

* Add limit to webhook

* Make photon url configurable

* Rm lat lng, street
  • Loading branch information
ChewingGlass authored Oct 6, 2023
1 parent dbb2495 commit fea7ec9
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const parseH3BNLocation = (location: BN) =>

export class ReverseGeoCache extends Model {
declare h3: string;
declare streetAddress: string;
declare street: string;
declare city: string;
declare state: string;
declare country: string;
Expand All @@ -26,7 +26,7 @@ ReverseGeoCache.init(
type: DECIMAL,
primaryKey: true,
},
streetAddress: DataTypes.STRING,
street: DataTypes.STRING,
lat: DataTypes.DECIMAL(8, 6),
long: DataTypes.DECIMAL(9, 6),
city: DataTypes.STRING,
Expand All @@ -50,7 +50,7 @@ export const ExtractHexLocationPlugin = ((): IPlugin => {
"city",
"state",
"country",
"street_address",
"street",
"lat",
"long",
];
Expand Down Expand Up @@ -81,7 +81,7 @@ export const ExtractHexLocationPlugin = ((): IPlugin => {
...schema[accountName],
lat: DataTypes.DECIMAL(8, 6),
long: DataTypes.DECIMAL(9, 6),
streetAddress: DataTypes.STRING,
street: DataTypes.STRING,
city: DataTypes.STRING,
state: DataTypes.STRING,
country: DataTypes.STRING,
Expand All @@ -99,26 +99,19 @@ export const ExtractHexLocationPlugin = ((): IPlugin => {
if (!reverseGeod) {
if (!locationFetchCache[location]) {
locationFetchCache[location] = (async () => {
const coords = parseH3BNLocation(location);
const response = await mapbox.fetchLocation(coords);
let placeName, parts, streetAddress, city, state, country;
if (response.features && response.features.length > 0) {
placeName = response.features[0].place_name;
parts = placeName.split(",");
streetAddress = parts[parts.length - 4]?.trim();
city = parts[parts.length - 3]?.trim();
state = parts[parts.length - 2]?.split(" ")[1]?.trim();
country = parts[parts.length - 1]?.trim();
}
const coords = parseH3BNLocation(new BN(location));
const { city, state, country, name, raw } =
await mapbox.fetchParsedLocation(coords);

return await ReverseGeoCache.create({
location: location.toString(),
streetAddress,
street: name,
city,
state,
country,
lat: coords[0],
long: coords[1],
raw: response.features,
raw,
});
})();
}
Expand All @@ -131,17 +124,14 @@ export const ExtractHexLocationPlugin = ((): IPlugin => {
// Remove raw response, format camelcase
if (reverseGeod) {
delete reverseGeod.dataValues.raw;
reverseGeod.dataValues.streetAddress =
reverseGeod?.dataValues.street_address;
delete reverseGeod.dataValues.street_address;
}

return {
...account,
city: null,
state: null,
country: null,
streetAddress: null,
street: null,
lat: null,
long: null,
..._omit(reverseGeod?.dataValues || {}, ["createdAt", "updatedAt"]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,86 +7,91 @@ import { sanitizeAccount } from "./sanitizeAccount";
import cachedIdlFetch from "./cachedIdlFetch";
import { FastifyInstance } from "fastify";
import { IAccountConfig, IInitedPlugin } from "../types";
import { initPlugins } from "../plugins";
import { truthy } from "./upsertProgramAccounts";
import pLimit from "p-limit";

interface HandleAccountWebhookArgs {
fastify: FastifyInstance;
programId: PublicKey;
accounts: IAccountConfig[];
account: any;
sequelize?: Sequelize;
pluginsByAccountType: Record<string, IInitedPlugin[]>
pluginsByAccountType: Record<string, IInitedPlugin[]>;
}

export async function handleAccountWebhook({
// Ensure we never have more txns open than the pool size - 1
const limit = pLimit(
(process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 5) - 1
);
export function handleAccountWebhook({
fastify,
programId,
accounts,
account,
sequelize = database,
pluginsByAccountType
pluginsByAccountType,
}: HandleAccountWebhookArgs) {
const idl = await cachedIdlFetch.fetchIdl({
programId: programId.toBase58(),
provider,
});
return limit(async () => {
const idl = await cachedIdlFetch.fetchIdl({
programId: programId.toBase58(),
provider,
});

if (!idl) {
throw new Error(`unable to fetch idl for ${programId}`);
}
if (!idl) {
throw new Error(`unable to fetch idl for ${programId}`);
}

if (
!accounts.every(({ type }) =>
idl.accounts!.some(({ name }) => name === type)
)
) {
throw new Error("idl does not have every account type");
}
if (
!accounts.every(({ type }) =>
idl.accounts!.some(({ name }) => name === type)
)
) {
throw new Error("idl does not have every account type");
}

const t = await sequelize.transaction();
const now = new Date().toISOString();
try {
const program = new anchor.Program(idl, programId, provider);
const data = Buffer.from(account.data[0], account.data[1]);
const t = await sequelize.transaction();
const now = new Date().toISOString();
try {
const program = new anchor.Program(idl, programId, provider);
const data = Buffer.from(account.data[0], account.data[1]);

const accName = accounts.find(({ type }) => {
return (
data &&
anchor.BorshAccountsCoder.accountDiscriminator(type).equals(
data.subarray(0, 8)
)
);
})?.type;
const accName = accounts.find(({ type }) => {
return (
data &&
anchor.BorshAccountsCoder.accountDiscriminator(type).equals(
data.subarray(0, 8)
)
);
})?.type;

if (accName) {
const decodedAcc = program.coder.accounts.decode(
accName!,
data as Buffer
);
let sanitized = sanitizeAccount(decodedAcc);
for (const plugin of pluginsByAccountType[accName]) {
if (plugin?.processAccount) {
sanitized = await plugin.processAccount(sanitized, t);
if (accName) {
const decodedAcc = program.coder.accounts.decode(
accName!,
data as Buffer
);
let sanitized = sanitizeAccount(decodedAcc);
for (const plugin of pluginsByAccountType[accName]) {
if (plugin?.processAccount) {
sanitized = await plugin.processAccount(sanitized, t);
}
}
const model = sequelize.models[accName];
await model.upsert(
{
address: account.pubkey,
refreshed_at: now,
...sanitized,
},
{ transaction: t }
);
}
const model = sequelize.models[accName];
await model.upsert(
{
address: account.pubkey,
refreshed_at: now,
...sanitized,
},
{ transaction: t }
);
}

await t.commit();
// @ts-ignore
fastify.customMetrics.accountWebhookCounter.inc();
} catch (err) {
await t.rollback();
console.error("While inserting, err", err);
throw err;
}
await t.commit();
// @ts-ignore
fastify.customMetrics.accountWebhookCounter.inc();
} catch (err) {
await t.rollback();
console.error("While inserting, err", err);
throw err;
}
});
}
46 changes: 44 additions & 2 deletions packages/account-postgres-sink-service/src/utils/mapboxService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,52 @@ export class MapboxService {

public async fetchLocation(coords: [number, number]): Promise<any> {
return this.limit(async () => {
const response = await this.axiosInstance.get(
`https://api.mapbox.com/geocoding/v5/mapbox.places/${coords[0]},${coords[1]}.json?access_token=${process.env.MAPBOX_ACCESS_TOKEN}`
const response = await axios.get(
`${process.env.PHOTON_URL!}/reverse?lat=${coords[0]}&lon=${coords[1]}`
);

// Uncomment for mapbox impl
// const response = await this.axiosInstance.get(
// `https://api.mapbox.com/geocoding/v5/mapbox.places/${coords[0]},${coords[1]}.json?access_token=${process.env.MAPBOX_ACCESS_TOKEN}`
// );
return response.data;
});
}

public async fetchParsedLocation(
coords: [number, number]
): Promise<{
city?: string;
state?: string;
country?: string;
name?: string;
raw: any[];
}> {
const response = await this.fetchLocation(coords);
let city, state, country, name;
if (response.features && response.features.length > 0) {
({
properties: { city, state, country, name },
} = response.features[0]);
}

// Uncomment for mapbox impl
// let placeName, parts, street, city, state, country;
// if (response.features && response.features.length > 0) {
// placeName = response.features[0].place_name;
// parts = placeName.split(",");
// street = parts[parts.length - 4]?.trim();
// city = parts[parts.length - 3]?.trim();
// state = parts[parts.length - 2]?.split(" ")[1]?.trim();
// country = parts[parts.length - 1]?.trim();
// }

return {
city,
state,
country,
name,
raw: response.features,
};
}
}
5 changes: 1 addition & 4 deletions packages/metadata-service/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ server.get<{ Params: { eccCompact: string } }>(
},
...locationAttributes("iot", record?.iot_hotspot_info),
...locationAttributes("iot", record?.mobile_hotspot_info),
],
]
};
}
);
Expand All @@ -146,12 +146,9 @@ function locationAttributes(
}

return [
{ trait_type: `${name}_street_address`, value: info.streetAddress },
{ trait_type: `${name}_city`, value: info.city },
{ trait_type: `${name}_state`, value: info.state },
{ trait_type: `${name}_country`, value: info.country },
{ trait_type: `${name}_lat`, value: info.lat },
{ trait_type: `${name}_long`, value: info.long },
];
}

Expand Down
8 changes: 4 additions & 4 deletions packages/metadata-service/src/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export const sequelize = new Sequelize({

export class MobileHotspotInfo extends Model {
declare address: string;
declare streetAddress: string;
declare street: string;
declare city: string;
declare state: string;
declare country: string;
Expand All @@ -65,7 +65,7 @@ MobileHotspotInfo.init(
type: STRING,
primaryKey: true,
},
streetAddress: DataTypes.STRING,
street: DataTypes.STRING,
lat: DataTypes.DECIMAL(8, 6),
long: DataTypes.DECIMAL(9, 6),
city: DataTypes.STRING,
Expand All @@ -84,7 +84,7 @@ MobileHotspotInfo.init(
export class IotHotspotInfo extends Model {
declare address: string;
declare asset: string;
declare streetAddress: string;
declare street: string;
declare city: string;
declare state: string;
declare country: string;
Expand All @@ -97,7 +97,7 @@ IotHotspotInfo.init(
type: STRING,
primaryKey: true,
},
streetAddress: DataTypes.STRING,
street: DataTypes.STRING,
lat: DataTypes.DECIMAL(8, 6),
long: DataTypes.DECIMAL(9, 6),
city: DataTypes.STRING,
Expand Down

0 comments on commit fea7ec9

Please sign in to comment.