Skip to content

Commit

Permalink
Enable arweave file data sources.
Browse files Browse the repository at this point in the history
- Add support for querying an arweave file by hash
- Add host export `arweave_get` to get the contents of a file
  • Loading branch information
mangas committed Aug 2, 2023
1 parent 1535e24 commit 909c101
Show file tree
Hide file tree
Showing 26 changed files with 2,394 additions and 1,779 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 62 additions & 0 deletions core/src/polling_monitor/arweave_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use anyhow::Error;
use bytes::Bytes;
use futures::future::BoxFuture;
use graph::{
components::link_resolver::{ArweaveClient, ArweaveResolver, FileSizeLimit},
data_source::offchain::Base64,
prelude::CheapClone,
};
use std::{sync::Arc, time::Duration};
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};

pub type ArweaveService = Buffer<Base64, BoxFuture<'static, Result<Option<Bytes>, Error>>>;

pub fn arweave_service(
client: Arc<ArweaveClient>,
timeout: Duration,
rate_limit: u16,
max_file_size: FileSizeLimit,
) -> ArweaveService {
let arweave = ArweaveServiceInner {
client,
timeout,
max_file_size,
};

let svc = ServiceBuilder::new()
.rate_limit(rate_limit.into(), Duration::from_secs(1))
.service_fn(move |req| arweave.cheap_clone().call_inner(req))
.boxed();

// The `Buffer` makes it so the rate limit is shared among clones.
// Make it unbounded to avoid any risk of starvation.
Buffer::new(svc, u32::MAX as usize)
}

#[derive(Clone)]
struct ArweaveServiceInner {
client: Arc<ArweaveClient>,
timeout: Duration,
max_file_size: FileSizeLimit,
}

impl CheapClone for ArweaveServiceInner {
fn cheap_clone(&self) -> Self {
Self {
client: self.client.cheap_clone(),
timeout: self.timeout,
max_file_size: self.max_file_size.cheap_clone(),
}
}
}

impl ArweaveServiceInner {
async fn call_inner(self, req: Base64) -> Result<Option<Bytes>, Error> {
self.client
.get_with_limit(&req, &self.max_file_size)
.await
.map(Bytes::from)
.map(Some)
.map_err(Error::from)
}
}
21 changes: 20 additions & 1 deletion core/src/polling_monitor/ipfs_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ mod test {
use tower::ServiceExt;

use cid::Cid;
use graph::{ipfs_client::IpfsClient, tokio};
use graph::{
components::link_resolver::{ArweaveClient, ArweaveResolver},
data::value::Word,
ipfs_client::IpfsClient,
tokio,
};

use uuid::Uuid;

Expand Down Expand Up @@ -156,4 +161,18 @@ mod test {
.unwrap();
assert_eq!(content.to_vec(), uid.as_bytes().to_vec());
}

#[tokio::test]
async fn arweave_get() {
const ID: &str = "8APeQ5lW0-csTcBaGdPBDLAL2ci2AT9pTn2tppGPU_8";

let cl = ArweaveClient::default();
let body = cl.get(&Word::from(ID)).await.unwrap();
let body = String::from_utf8(body).unwrap();

let expected = r#"
{"name":"Arloader NFT #1","description":"Super dope, one of a kind NFT","collection":{"name":"Arloader NFT","family":"We AR"},"attributes":[{"trait_type":"cx","value":-0.4042198883730073},{"trait_type":"cy","value":0.5641681708263335},{"trait_type":"iters","value":44}],"properties":{"category":"image","files":[{"uri":"https://arweave.net/7gWCr96zc0QQCXOsn5Vk9ROVGFbMaA9-cYpzZI8ZMDs","type":"image/png"},{"uri":"https://arweave.net/URwQtoqrbYlc5183STNy3ZPwSCRY4o8goaF7MJay3xY/1.png","type":"image/png"}]},"image":"https://arweave.net/URwQtoqrbYlc5183STNy3ZPwSCRY4o8goaF7MJay3xY/1.png"}
"#.trim_start().trim_end();
assert_eq!(expected, body);
}
}
1 change: 1 addition & 0 deletions core/src/polling_monitor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use graph::{
prometheus::{Counter, Gauge},
};

#[derive(Clone)]
pub struct PollingMonitorMetrics {
pub requests: Counter,
pub errors: Counter,
Expand Down
13 changes: 10 additions & 3 deletions core/src/polling_monitor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod arweave_service;
mod ipfs_service;
mod metrics;

Expand All @@ -23,6 +24,7 @@ use tower::util::rng::HasherRng;
use tower::{Service, ServiceExt};

pub use self::metrics::PollingMonitorMetrics;
pub use arweave_service::{arweave_service, ArweaveService};
pub use ipfs_service::{ipfs_service, IpfsService};

const MIN_BACKOFF: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -100,7 +102,7 @@ pub fn spawn_monitor<ID, S, E, Res: Send + 'static>(
service: S,
response_sender: mpsc::UnboundedSender<(ID, Res)>,
logger: Logger,
metrics: PollingMonitorMetrics,
metrics: Arc<PollingMonitorMetrics>,
) -> PollingMonitor<ID>
where
S: Service<ID, Response = Option<Res>, Error = E> + Send + 'static,
Expand Down Expand Up @@ -257,7 +259,12 @@ mod tests {
) {
let (svc, handle) = mock::pair();
let (tx, rx) = mpsc::unbounded_channel();
let monitor = spawn_monitor(svc, tx, log::discard(), PollingMonitorMetrics::mock());
let monitor = spawn_monitor(
svc,
tx,
log::discard(),
Arc::new(PollingMonitorMetrics::mock()),
);
(handle, monitor, rx)
}

Expand All @@ -267,7 +274,7 @@ mod tests {
let shared_svc = tower::buffer::Buffer::new(tower::limit::ConcurrencyLimit::new(svc, 1), 1);
let make_monitor = |svc| {
let (tx, rx) = mpsc::unbounded_channel();
let metrics = PollingMonitorMetrics::mock();
let metrics = Arc::new(PollingMonitorMetrics::mock());
let monitor = spawn_monitor(svc, tx, log::discard(), metrics);
(monitor, rx)
};
Expand Down
38 changes: 34 additions & 4 deletions core/src/subgraph/context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub mod instance;

use crate::polling_monitor::{spawn_monitor, IpfsService, PollingMonitor, PollingMonitorMetrics};
use crate::polling_monitor::{
spawn_monitor, ArweaveService, IpfsService, PollingMonitor, PollingMonitorMetrics,
};
use anyhow::{self, Error};
use bytes::Bytes;
use graph::{
Expand All @@ -9,7 +11,10 @@ use graph::{
store::{DeploymentId, SubgraphFork},
subgraph::{MappingError, SharedProofOfIndexing},
},
data_source::{offchain, CausalityRegion, DataSource, TriggerData},
data_source::{
offchain::{self, Base64},
CausalityRegion, DataSource, TriggerData,
},
ipfs_client::CidFile,
prelude::{
BlockNumber, BlockState, CancelGuard, CheapClone, DeploymentHash, MetricsRegistry,
Expand Down Expand Up @@ -186,6 +191,8 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
pub struct OffchainMonitor {
ipfs_monitor: PollingMonitor<CidFile>,
ipfs_monitor_rx: mpsc::UnboundedReceiver<(CidFile, Bytes)>,
arweave_monitor: PollingMonitor<Base64>,
arweave_monitor_rx: mpsc::UnboundedReceiver<(Base64, Bytes)>,
}

impl OffchainMonitor {
Expand All @@ -194,25 +201,34 @@ impl OffchainMonitor {
registry: Arc<MetricsRegistry>,
subgraph_hash: &DeploymentHash,
ipfs_service: IpfsService,
arweave_service: ArweaveService,
) -> Self {
let metrics = Arc::new(PollingMonitorMetrics::new(registry, subgraph_hash));
// The channel is unbounded, as it is expected that `fn ready_offchain_events` is called
// frequently, or at least with the same frequency that requests are sent.
let (ipfs_monitor_tx, ipfs_monitor_rx) = mpsc::unbounded_channel();
let (arweave_monitor_tx, arweave_monitor_rx) = mpsc::unbounded_channel();

let ipfs_monitor = spawn_monitor(
ipfs_service,
ipfs_monitor_tx,
logger,
PollingMonitorMetrics::new(registry, subgraph_hash),
logger.cheap_clone(),
metrics.cheap_clone(),
);

let arweave_monitor = spawn_monitor(arweave_service, arweave_monitor_tx, logger, metrics);
Self {
ipfs_monitor,
ipfs_monitor_rx,
arweave_monitor,
arweave_monitor_rx,
}
}

fn add_source(&mut self, source: offchain::Source) -> Result<(), Error> {
match source {
offchain::Source::Ipfs(cid_file) => self.ipfs_monitor.monitor(cid_file),
offchain::Source::Arweave(base64) => self.arweave_monitor.monitor(base64),
};
Ok(())
}
Expand All @@ -233,6 +249,20 @@ impl OffchainMonitor {
Err(TryRecvError::Empty) => break,
}
}

loop {
match self.arweave_monitor_rx.try_recv() {
Ok((base64, data)) => triggers.push(offchain::TriggerData {
source: offchain::Source::Arweave(base64),
data: Arc::new(data),
}),
Err(TryRecvError::Disconnected) => {
anyhow::bail!("arweave monitor unexpectedly terminated")
}
Err(TryRecvError::Empty) => break,
}
}

Ok(triggers)
}
}
6 changes: 5 additions & 1 deletion core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::polling_monitor::IpfsService;
use crate::polling_monitor::{ArweaveService, IpfsService};
use crate::subgraph::context::{IndexingContext, SubgraphKeepAlive};
use crate::subgraph::inputs::IndexingInputs;
use crate::subgraph::loader::load_dynamic_data_sources;
Expand Down Expand Up @@ -30,6 +30,7 @@ pub struct SubgraphInstanceManager<S: SubgraphStore> {
instances: SubgraphKeepAlive,
link_resolver: Arc<dyn LinkResolver>,
ipfs_service: IpfsService,
arweave_service: ArweaveService,
static_filters: bool,
env_vars: Arc<EnvVars>,
}
Expand Down Expand Up @@ -165,6 +166,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
metrics_registry: Arc<MetricsRegistry>,
link_resolver: Arc<dyn LinkResolver>,
ipfs_service: IpfsService,
arweave_service: ArweaveService,
static_filters: bool,
) -> Self {
let logger = logger_factory.component_logger("SubgraphInstanceManager", None);
Expand All @@ -180,6 +182,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
ipfs_service,
static_filters,
env_vars,
arweave_service,
}
}

Expand Down Expand Up @@ -387,6 +390,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
registry.cheap_clone(),
&manifest.id,
self.ipfs_service.clone(),
self.arweave_service.clone(),
);

// Initialize deployment_head with current deployment head. Any sort of trouble in
Expand Down
2 changes: 1 addition & 1 deletion graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ chrono = "0.4.25"
envconfig = "0.10.0"
Inflector = "0.11.3"
isatty = "0.1.9"
reqwest = { version = "0.11.14", features = ["json", "stream", "multipart"] }
reqwest = { version = "0.11.18", features = ["json", "stream", "multipart"] }
ethabi = "17.2"
hex = "0.4.3"
http = "0.2.3"
Expand Down
Loading

0 comments on commit 909c101

Please sign in to comment.