Skip to content

Commit

Permalink
Update spin deps (#14)
Browse files Browse the repository at this point in the history
* Update spin deps

Signed-off-by: Darwin Boersma <darwin@sadlark.com>

* Add regex for template ARN

Signed-off-by: Darwin Boersma <darwin@sadlark.com>

---------

Signed-off-by: Darwin Boersma <darwin@sadlark.com>
  • Loading branch information
ogghead authored Oct 5, 2024
1 parent e9c9636 commit eb58fba
Show file tree
Hide file tree
Showing 11 changed files with 882 additions and 3,140 deletions.
3,786 changes: 750 additions & 3,036 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 8 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,27 @@ rust-version = "1.78"

[workspace.package]
authors = ["Darwin Boersma <darwin@sadlark.com>", "Fermyon Engineering <engineering@fermyon.com>"]
version = "0.3.0"
version = "0.3.1"
edition = "2021"

[workspace]
members = ["sdk", "sdk/macro"]

[dependencies]
anyhow = "1.0.68"
async-trait = "0.1"
aws-config = "1.5.5"
aws-sdk-kinesis = "1.40.0"
clap = { version = "3.1.15", features = ["derive", "env"] }
futures = "0.3.25"
serde = "1.0"
spin-core = { git = "https://github.com/fermyon/spin", tag = "v2.6.0" }
spin-telemetry = { git = "https://github.com/fermyon/spin", tag = "v2.6.0" }
spin-trigger = { git = "https://github.com/fermyon/spin", tag = "v2.6.0" }
tokio = { version = "1.37", features = ["full"] }
tokio-scoped = "0.2.0"
tokio-stream = "0.1.15"
spin-core = { git = "https://github.com/fermyon/spin", rev = "4fbf872505976ded774005f602584c2feefbfd8b" }
spin-factors = { git = "https://github.com/fermyon/spin", rev = "4fbf872505976ded774005f602584c2feefbfd8b" }
spin-runtime-factors = { git = "https://github.com/fermyon/spin", rev = "4fbf872505976ded774005f602584c2feefbfd8b" }
spin-telemetry = { git = "https://github.com/fermyon/spin", rev = "4fbf872505976ded774005f602584c2feefbfd8b" }
spin-trigger = { git = "https://github.com/fermyon/spin", rev = "4fbf872505976ded774005f602584c2feefbfd8b" }
tokio = { version = "1.40", features = ["rt", "macros", "time", "signal"] }
tracing = { version = "0.1", features = ["log"] }
wasmtime = { version = "21.0.1" }
wasmtime = { version = "22.0" }

[target.'cfg(target_os = "linux")'.dependencies]
# This needs to be an explicit dependency to enable
Expand Down
2 changes: 1 addition & 1 deletion guest_rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ name = "spin-kinesis-sdk"
version = { workspace = true }
authors = { workspace = true }
edition = { workspace = true }
rust-version = "1.71"
rust-version = "1.78"
include = ["../kinesis.wit"]

[lib]
name = "spin_kinesis_sdk"

[dependencies]
spin-executor = "3.0.1"
spin-executor = "3.0"
spin-kinesis-macro = { path = "macro" }
wit-bindgen = { workspace = true }
2 changes: 1 addition & 1 deletion sdk/macro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "spin-kinesis-macro"
version = "0.1.0"
edition = "2021"
rust-version = "1.71"
rust-version = "1.78"
include = ["../../kinesis.wit"]

[lib]
Expand Down
2 changes: 1 addition & 1 deletion spin-pluginify.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "trigger-kinesis"
description = "A Spin trigger for Amazon Kinesis events"
version = "0.3"
version = "0.3.1"
spin_compatibility = ">=2.6"
license = "Apache-2.0"
package = "./target/release/trigger-kinesis"
117 changes: 76 additions & 41 deletions src/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use aws_sdk_kinesis::{
types::{Shard, ShardIteratorType},
Client,
};
use spin_trigger::TriggerAppEngine;
use spin_factors::RuntimeFactors;
use spin_trigger::TriggerApp;
use tracing::{instrument, Instrument};

use crate::{KinesisRecordProcessor, KinesisTrigger};

pub struct ShardProcessor {
engine: Arc<TriggerAppEngine<KinesisTrigger>>,
pub struct ShardProcessor<F: RuntimeFactors> {
app: Arc<TriggerApp<KinesisTrigger, F>>,
component_id: Arc<String>,
tx_finished_shard: tokio::sync::mpsc::Sender<String>,
kinesis_client: Client,
Expand All @@ -23,11 +24,11 @@ pub struct ShardProcessor {
shard_iterator_type: ShardIteratorType,
}

impl ShardProcessor {
impl<F: RuntimeFactors> ShardProcessor<F> {
/// Try to create a new poller for the given stream and shard
#[instrument(name = "spin_trigger_kinesis.new_shard_processor", skip_all, fields(otel.name = format!("new_shard_processor {}", component_id)))]
pub fn new(
engine: &Arc<TriggerAppEngine<KinesisTrigger>>,
app: &Arc<TriggerApp<KinesisTrigger, F>>,
component_id: &Arc<String>,
tx_finished_shard: &tokio::sync::mpsc::Sender<String>,
kinesis_client: &Client,
Expand All @@ -38,7 +39,7 @@ impl ShardProcessor {
shard_iterator_type: ShardIteratorType,
) -> Self {
Self {
engine: engine.clone(),
app: app.clone(),
component_id: component_id.clone(),
tx_finished_shard: tx_finished_shard.clone(),
kinesis_client: kinesis_client.clone(),
Expand All @@ -50,6 +51,33 @@ impl ShardProcessor {
}
}

/// Get records from the shard using this poller. This will return an empty vector if there is no shard iterator
pub async fn poll(self) {
match self.get_shard_iterator().await {
Some(mut shard_iterator) => {
while let Some(new_shard_iterator) = self.process_batch(shard_iterator).await {
shard_iterator = new_shard_iterator;
}
}
None => {
tracing::error!(
"[Kinesis] Null shard iterator for poller {} on component {}. Exiting poll loop",
self.shard_id,
self.component_id,
);
}
}

self.close().await;
}

#[instrument(name = "spin_trigger_kinesis.close_shard_processor", skip_all, fields(otel.name = format!("close_shard_processor {}", self.component_id)))]
async fn close(self) {
if let Err(err) = self.tx_finished_shard.send(self.shard_id).await {
tracing::error!("[Kinesis] Error sending shard id to finished channel: {err}");
}
}

#[instrument(name = "spin_trigger_kinesis.get_shard_iterator", skip_all, fields(otel.name = format!("get_shard_iterator {}", self.component_id)))]
async fn get_shard_iterator(&self) -> Option<String> {
self.kinesis_client
Expand Down Expand Up @@ -94,7 +122,7 @@ impl ShardProcessor {
))
.await;
} else if !records.is_empty() {
let processor = KinesisRecordProcessor::new(&self.engine, &self.component_id);
let processor = KinesisRecordProcessor::new(&self.app, &self.component_id);
// Wait until processing is completed for these records
processor.process_records(records).in_current_span().await
}
Expand All @@ -109,25 +137,6 @@ impl ShardProcessor {
}
}
}

/// Get records from the shard using this poller. This will return an empty vector if there is no shard iterator
pub async fn poll(self) {
let Some(mut shard_iterator) = self.get_shard_iterator().await else {
tracing::trace!(
"[Kinesis] Null shard iterator for poller {}. Exiting poll loop",
self.shard_id
);
return;
};

while let Some(new_shard_iterator) = self.process_batch(shard_iterator).await {
shard_iterator = new_shard_iterator;
}

if let Err(err) = self.tx_finished_shard.send(self.shard_id).await {
tracing::error!("[Kinesis] Error sending shard id to finished channel: {err}");
}
}
}

pub struct ShardDetector {
Expand All @@ -137,6 +146,7 @@ pub struct ShardDetector {
rx_finished_shard: tokio::sync::mpsc::Receiver<String>,
tx_new_shard: tokio::sync::mpsc::Sender<String>,
running_shards: HashSet<String>,
component_id: Arc<String>,
}

impl ShardDetector {
Expand All @@ -156,23 +166,36 @@ impl ShardDetector {
client: client.clone(),
rx_finished_shard,
tx_new_shard,
component_id: component_id.clone(),
running_shards: HashSet::new(),
}
}

/// Poll for new shards -- if we find new shards, we need to create a new shard poller for each
pub async fn poll_new_shards(mut self) {
loop {
tokio::select! {
Some(shard) = self.rx_finished_shard.recv() => {
self.running_shards.remove(&shard);
},
Ok(ListShardsOutput { shards: Some(new_shards), .. }) = self
if !self.rx_finished_shard.is_empty() {
match self.rx_finished_shard.recv().await {
Some(shard) => {
self.running_shards.remove(&shard);
}
None => {
tracing::error!("[Kinesis] Finished shard channel closed.");
}
}
} else {
match self
.client
.list_shards()
.stream_arn(self.stream_arn.as_ref())
.send() => {
for Shard {shard_id, ..} in new_shards {
.send()
.await
{
Ok(ListShardsOutput {
shards: Some(shards),
..
}) => {
for Shard { shard_id, .. } in shards {
if self.running_shards.contains(&shard_id) {
continue;
}
Expand All @@ -182,19 +205,31 @@ impl ShardDetector {
"[Kinesis] Error sending new shard to main loop: {:?}",
e
);
return;
} else {
self.running_shards.insert(shard_id);
}
}
tokio::time::sleep(self.detector_poll_millis).await;
},
else => {
tracing::error!(
"[Kinesis] Unable to fetch shards from iterator. Shard detector exiting."
);
return;
}
Ok(ListShardsOutput { shards: None, .. }) => {
tracing::error!(
"[Kinesis] No shards found in stream for {}",
self.component_id
);
}
Err(e) => {
tracing::error!(
"[Kinesis] Error listing shards in detector for {}: {:?}",
self.component_id,
e
);
}
}
tokio::time::sleep(self.detector_poll_millis)
.instrument(tracing::info_span!(
"spin_trigger_kinesis.detector_poll_idle",
"otel.name" = format!("detector_poll_idle {}", self.component_id)
))
.await;
}
}
}
Expand Down
Loading

0 comments on commit eb58fba

Please sign in to comment.