Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filipe/arweave file ds #4789

Merged
merged 2 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 62 additions & 0 deletions core/src/polling_monitor/arweave_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use anyhow::Error;
use bytes::Bytes;
use futures::future::BoxFuture;
use graph::{
components::link_resolver::{ArweaveClient, ArweaveResolver, FileSizeLimit},
data_source::offchain::Base64,
prelude::CheapClone,
};
use std::{sync::Arc, time::Duration};
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};

pub type ArweaveService = Buffer<Base64, BoxFuture<'static, Result<Option<Bytes>, Error>>>;

pub fn arweave_service(
client: Arc<ArweaveClient>,
timeout: Duration,
rate_limit: u16,
max_file_size: FileSizeLimit,
) -> ArweaveService {
let arweave = ArweaveServiceInner {
client,
timeout,
max_file_size,
};

let svc = ServiceBuilder::new()
.rate_limit(rate_limit.into(), Duration::from_secs(1))
.service_fn(move |req| arweave.cheap_clone().call_inner(req))
.boxed();

// The `Buffer` makes it so the rate limit is shared among clones.
// Make it unbounded to avoid any risk of starvation.
Buffer::new(svc, u32::MAX as usize)
}

#[derive(Clone)]
struct ArweaveServiceInner {
client: Arc<ArweaveClient>,
timeout: Duration,
max_file_size: FileSizeLimit,
}

impl CheapClone for ArweaveServiceInner {
fn cheap_clone(&self) -> Self {
Self {
client: self.client.cheap_clone(),
timeout: self.timeout,
max_file_size: self.max_file_size.cheap_clone(),
}
}
}

impl ArweaveServiceInner {
async fn call_inner(self, req: Base64) -> Result<Option<Bytes>, Error> {
self.client
.get_with_limit(&req, &self.max_file_size)
.await
.map(Bytes::from)
.map(Some)
.map_err(Error::from)
}
}
21 changes: 20 additions & 1 deletion core/src/polling_monitor/ipfs_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ mod test {
use tower::ServiceExt;

use cid::Cid;
use graph::{ipfs_client::IpfsClient, tokio};
use graph::{
components::link_resolver::{ArweaveClient, ArweaveResolver},
data::value::Word,
ipfs_client::IpfsClient,
tokio,
};

use uuid::Uuid;

Expand Down Expand Up @@ -156,4 +161,18 @@ mod test {
.unwrap();
assert_eq!(content.to_vec(), uid.as_bytes().to_vec());
}

#[tokio::test]
async fn arweave_get() {
const ID: &str = "8APeQ5lW0-csTcBaGdPBDLAL2ci2AT9pTn2tppGPU_8";

let cl = ArweaveClient::default();
let body = cl.get(&Word::from(ID)).await.unwrap();
let body = String::from_utf8(body).unwrap();

let expected = r#"
{"name":"Arloader NFT #1","description":"Super dope, one of a kind NFT","collection":{"name":"Arloader NFT","family":"We AR"},"attributes":[{"trait_type":"cx","value":-0.4042198883730073},{"trait_type":"cy","value":0.5641681708263335},{"trait_type":"iters","value":44}],"properties":{"category":"image","files":[{"uri":"https://arweave.net/7gWCr96zc0QQCXOsn5Vk9ROVGFbMaA9-cYpzZI8ZMDs","type":"image/png"},{"uri":"https://arweave.net/URwQtoqrbYlc5183STNy3ZPwSCRY4o8goaF7MJay3xY/1.png","type":"image/png"}]},"image":"https://arweave.net/URwQtoqrbYlc5183STNy3ZPwSCRY4o8goaF7MJay3xY/1.png"}
"#.trim_start().trim_end();
assert_eq!(expected, body);
}
}
1 change: 1 addition & 0 deletions core/src/polling_monitor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use graph::{
prometheus::{Counter, Gauge},
};

#[derive(Clone)]
pub struct PollingMonitorMetrics {
pub requests: Counter,
pub errors: Counter,
Expand Down
13 changes: 10 additions & 3 deletions core/src/polling_monitor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod arweave_service;
mod ipfs_service;
mod metrics;

Expand All @@ -23,6 +24,7 @@ use tower::util::rng::HasherRng;
use tower::{Service, ServiceExt};

pub use self::metrics::PollingMonitorMetrics;
pub use arweave_service::{arweave_service, ArweaveService};
pub use ipfs_service::{ipfs_service, IpfsService};

const MIN_BACKOFF: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -100,7 +102,7 @@ pub fn spawn_monitor<ID, S, E, Res: Send + 'static>(
service: S,
response_sender: mpsc::UnboundedSender<(ID, Res)>,
logger: Logger,
metrics: PollingMonitorMetrics,
metrics: Arc<PollingMonitorMetrics>,
) -> PollingMonitor<ID>
where
S: Service<ID, Response = Option<Res>, Error = E> + Send + 'static,
Expand Down Expand Up @@ -257,7 +259,12 @@ mod tests {
) {
let (svc, handle) = mock::pair();
let (tx, rx) = mpsc::unbounded_channel();
let monitor = spawn_monitor(svc, tx, log::discard(), PollingMonitorMetrics::mock());
let monitor = spawn_monitor(
svc,
tx,
log::discard(),
Arc::new(PollingMonitorMetrics::mock()),
);
(handle, monitor, rx)
}

Expand All @@ -267,7 +274,7 @@ mod tests {
let shared_svc = tower::buffer::Buffer::new(tower::limit::ConcurrencyLimit::new(svc, 1), 1);
let make_monitor = |svc| {
let (tx, rx) = mpsc::unbounded_channel();
let metrics = PollingMonitorMetrics::mock();
let metrics = Arc::new(PollingMonitorMetrics::mock());
let monitor = spawn_monitor(svc, tx, log::discard(), metrics);
(monitor, rx)
};
Expand Down
38 changes: 34 additions & 4 deletions core/src/subgraph/context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub mod instance;

use crate::polling_monitor::{spawn_monitor, IpfsService, PollingMonitor, PollingMonitorMetrics};
use crate::polling_monitor::{
spawn_monitor, ArweaveService, IpfsService, PollingMonitor, PollingMonitorMetrics,
};
use anyhow::{self, Error};
use bytes::Bytes;
use graph::{
Expand All @@ -9,7 +11,10 @@ use graph::{
store::{DeploymentId, SubgraphFork},
subgraph::{MappingError, SharedProofOfIndexing},
},
data_source::{offchain, CausalityRegion, DataSource, TriggerData},
data_source::{
offchain::{self, Base64},
CausalityRegion, DataSource, TriggerData,
},
ipfs_client::CidFile,
prelude::{
BlockNumber, BlockState, CancelGuard, CheapClone, DeploymentHash, MetricsRegistry,
Expand Down Expand Up @@ -186,6 +191,8 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
pub struct OffchainMonitor {
ipfs_monitor: PollingMonitor<CidFile>,
ipfs_monitor_rx: mpsc::UnboundedReceiver<(CidFile, Bytes)>,
arweave_monitor: PollingMonitor<Base64>,
arweave_monitor_rx: mpsc::UnboundedReceiver<(Base64, Bytes)>,
}

impl OffchainMonitor {
Expand All @@ -194,25 +201,34 @@ impl OffchainMonitor {
registry: Arc<MetricsRegistry>,
subgraph_hash: &DeploymentHash,
ipfs_service: IpfsService,
arweave_service: ArweaveService,
) -> Self {
let metrics = Arc::new(PollingMonitorMetrics::new(registry, subgraph_hash));
// The channel is unbounded, as it is expected that `fn ready_offchain_events` is called
// frequently, or at least with the same frequency that requests are sent.
let (ipfs_monitor_tx, ipfs_monitor_rx) = mpsc::unbounded_channel();
let (arweave_monitor_tx, arweave_monitor_rx) = mpsc::unbounded_channel();

let ipfs_monitor = spawn_monitor(
ipfs_service,
ipfs_monitor_tx,
logger,
PollingMonitorMetrics::new(registry, subgraph_hash),
logger.cheap_clone(),
metrics.cheap_clone(),
);

let arweave_monitor = spawn_monitor(arweave_service, arweave_monitor_tx, logger, metrics);
Self {
ipfs_monitor,
ipfs_monitor_rx,
arweave_monitor,
arweave_monitor_rx,
}
}

fn add_source(&mut self, source: offchain::Source) -> Result<(), Error> {
match source {
offchain::Source::Ipfs(cid_file) => self.ipfs_monitor.monitor(cid_file),
offchain::Source::Arweave(base64) => self.arweave_monitor.monitor(base64),
};
Ok(())
}
Expand All @@ -233,6 +249,20 @@ impl OffchainMonitor {
Err(TryRecvError::Empty) => break,
}
}

loop {
match self.arweave_monitor_rx.try_recv() {
Ok((base64, data)) => triggers.push(offchain::TriggerData {
source: offchain::Source::Arweave(base64),
data: Arc::new(data),
}),
Err(TryRecvError::Disconnected) => {
anyhow::bail!("arweave monitor unexpectedly terminated")
}
Err(TryRecvError::Empty) => break,
}
}

Ok(triggers)
}
}
6 changes: 5 additions & 1 deletion core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::polling_monitor::IpfsService;
use crate::polling_monitor::{ArweaveService, IpfsService};
use crate::subgraph::context::{IndexingContext, SubgraphKeepAlive};
use crate::subgraph::inputs::IndexingInputs;
use crate::subgraph::loader::load_dynamic_data_sources;
Expand Down Expand Up @@ -30,6 +30,7 @@ pub struct SubgraphInstanceManager<S: SubgraphStore> {
instances: SubgraphKeepAlive,
link_resolver: Arc<dyn LinkResolver>,
ipfs_service: IpfsService,
arweave_service: ArweaveService,
static_filters: bool,
env_vars: Arc<EnvVars>,
}
Expand Down Expand Up @@ -165,6 +166,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
metrics_registry: Arc<MetricsRegistry>,
link_resolver: Arc<dyn LinkResolver>,
ipfs_service: IpfsService,
arweave_service: ArweaveService,
static_filters: bool,
) -> Self {
let logger = logger_factory.component_logger("SubgraphInstanceManager", None);
Expand All @@ -180,6 +182,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
ipfs_service,
static_filters,
env_vars,
arweave_service,
}
}

Expand Down Expand Up @@ -387,6 +390,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
registry.cheap_clone(),
&manifest.id,
self.ipfs_service.clone(),
self.arweave_service.clone(),
);

// Initialize deployment_head with current deployment head. Any sort of trouble in
Expand Down
2 changes: 1 addition & 1 deletion graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ chrono = "0.4.25"
envconfig = "0.10.0"
Inflector = "0.11.3"
isatty = "0.1.9"
reqwest = { version = "0.11.14", features = ["json", "stream", "multipart"] }
reqwest = { version = "0.11.18", features = ["json", "stream", "multipart"] }
ethabi = "17.2"
hex = "0.4.3"
http = "0.2.3"
Expand Down
Loading