Skip to content

Commit

Permalink
Add size limit for storing in the DB
Browse files Browse the repository at this point in the history
  • Loading branch information
michelemin committed Jan 7, 2025
1 parent b796214 commit bce21f6
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 51 deletions.
16 changes: 15 additions & 1 deletion modules/tests/test_db/harness/harness.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ curl -d "delete:my_key" http://$PLAID_LOCATION/webhook/$URL
curl -d "get:my_key" http://$PLAID_LOCATION/webhook/$URL
curl -d "delete:another_key" http://$PLAID_LOCATION/webhook/$URL
curl -d "get:another_key" http://$PLAID_LOCATION/webhook/$URL
# At this point the DB is empty
curl -d "insert:my_key:first_value" http://$PLAID_LOCATION/webhook/$URL
curl -d "insert:a_key:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" http://$PLAID_LOCATION/webhook/$URL # too many bytes for the configured storage limit
curl -d "get:a_key" http://$PLAID_LOCATION/webhook/$URL # Empty because insertion failed
curl -d "insert:a_key:a" http://$PLAID_LOCATION/webhook/$URL # this is within the limit, so it's fine
curl -d "get:a_key" http://$PLAID_LOCATION/webhook/$URL # a
curl -d "delete:my_key" http://$PLAID_LOCATION/webhook/$URL
curl -d "delete:a_key" http://$PLAID_LOCATION/webhook/$URL
# now the DB is empty, so we can insert the long key/value pair
curl -d "insert:a_key:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" http://$PLAID_LOCATION/webhook/$URL
curl -d "get:a_key" http://$PLAID_LOCATION/webhook/$URL

sleep 2

Expand All @@ -40,8 +51,11 @@ kill $RH_PID 2>&1 > /dev/null
# second_value
# Empty
# Empty
# Empty
# a
# aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa

echo -e "Empty\nEmpty\nfirst_value\nEmpty\nsecond_value\nEmpty\nEmpty" > expected.txt
echo -e "Empty\nEmpty\nfirst_value\nEmpty\nsecond_value\nEmpty\nEmpty\nEmpty\na\naaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" > expected.txt
diff expected.txt $FILE
RESULT=$?

Expand Down
6 changes: 6 additions & 0 deletions runtime/plaid/resources/plaid.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ okta = 200
"example_rule.wasm" = 50
"test_crashtest.wasm" = 150

[loading.storage_size]
default = 1_048_576 # 1 Mib
[loading.storage_size.log_type]
[loading.storage_size.module_overrides]
"test_db.wasm" = 50

# [apis."okta"]
# token = ""
# domain = ""
Expand Down
4 changes: 2 additions & 2 deletions runtime/plaid/src/bin/plaid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

info!("Loading all the modules");
// Load all the modules that form our Nanoservices and Plaid rules
let modules = Arc::new(loader::load(config.loading).unwrap());
let modules = Arc::new(loader::load(config.loading, storage.clone()).await.unwrap());
let modules_by_name = Arc::new(modules.get_modules());

info!(
Expand Down Expand Up @@ -353,4 +353,4 @@ where
T: Send + Sync + Clone,
{
warp::any().map(move || users.clone())
}
}
53 changes: 53 additions & 0 deletions runtime/plaid/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ pub struct Env {
pub api: Arc<Api>,
// A handle to the storage system if one is configured
pub storage: Option<Arc<Storage>>,
// Number of bytes the module is currently saving in persistent storage
pub storage_current: Arc<RwLock<u64>>,
// Max number of bytes the module can save to persistent storage
pub storage_limit: u64,
// A sender to the external logging system
pub external_logging_system: Logger,
/// Memory for host-guest communication
Expand Down Expand Up @@ -237,6 +241,8 @@ fn prepare_for_execution(
message: message.create_duplicate(),
api: api.clone(),
storage: storage.clone(),
storage_current: plaid_module.storage_current.clone(),
storage_limit: plaid_module.storage_limit,
external_logging_system: els.clone(),
memory: None,
response,
Expand Down Expand Up @@ -307,6 +313,43 @@ fn prepare_for_execution(
Ok((store, instance, ep, envr))
}

/// Update bytes counter for storage used by a module
fn update_used_storage(
plaid_module: &Arc<PlaidModule>,
env: &FunctionEnv<Env>,
store: &Store,
) -> Result<(), ExecutorError> {
// Take the new value for used storage from the env
let new_used_storage = match env.as_ref(&store).storage_current.read() {
Ok(data) => *data,
Err(e) => {
error!(
"Critical error getting a read lock on used storage: {:?}",
e
);
return Err(ExecutorError::MemoryError(
"Critical error getting a read lock on used storage".to_string(),
));
}
};
// Write the new value in the long-lived struct that represents the module
match plaid_module.storage_current.write() {
Ok(mut v) => {
*v = new_used_storage;
Ok(())
}
Err(e) => {
error!(
"Critical error getting a write lock on used storage: {:?}",
e
);
Err(ExecutorError::MemoryError(
"Critical error getting a write lock on used storage".to_string(),
))
}
}
}

/// Update a module's persistent response
fn update_persistent_response(
plaid_module: &Arc<PlaidModule>,
Expand Down Expand Up @@ -504,6 +547,16 @@ fn process_message_with_module(
.unwrap();
}

// Update the counter for used storage
if let Err(e) = update_used_storage(&module, &env, &mut store) {
els.log_module_error(
module.name.clone(),
format!("Failed to update used storage counter: {e}"),
message.data.clone(),
)
.unwrap();
}

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions runtime/plaid/src/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub enum FunctionErrors {
CacheDisabled = -7,
CouldNotGetAdequateMemory = -8,
FailedToWriteGuestMemory = -9,
StorageLimitReached = -10,
}

#[derive(Debug)]
Expand Down
123 changes: 101 additions & 22 deletions runtime/plaid/src/functions/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,84 @@ pub fn insert(
};

let storage_key = key.clone();
let get_key = key.clone();
let key_len = storage_key.as_bytes().len();

// We check if this insert would overwrite some existing data. If so, we need to take that into account when
// computing the storage that would be occupied at the end of the insert operation.
// Note: if we have existing data, then we need to count the key's length as well. This is because at the end
// of a possible insertion, we would have only one key.
let existing_data_size = match env_data
.api
.clone()
.runtime
.block_on(async move { storage.get(&env_data.name, &get_key).await })
{
Ok(data) => match data {
None => 0u64,
Some(d) => d.len() as u64 + key_len as u64,
},
Err(_) => {
return FunctionErrors::InternalApiError as i32;
}
};

// Check that adding this new data (key + value) would not bring us above the storage limit for the module
// Note: we _substract_ the size of existing data. If we were to insert the new data, the old data would be overwritten.
let used_storage = match env_data.storage_current.read() {
Ok(data) => *data,
Err(_) => panic!(),
};
let would_be_used_storage =
used_storage + key_len as u64 + value.len() as u64 - existing_data_size; // no problem with underflowing because the result will never be negative (since used_storage >= existing_data_size)
if would_be_used_storage > env_data.storage_limit {
error!("{}: Could not insert key/value with key {storage_key} as that would bring us above the configured storage limit. Used: {used_storage}, would be used: {would_be_used_storage}, limit: {}", env_data.name, env_data.storage_limit);
return FunctionErrors::StorageLimitReached as i32;
}

let result = env_data.api.clone().runtime.block_on(async move {
storage
.insert(env_data.name.clone(), storage_key, value)
.await
});

match result {
Ok(Some(data)) => {
// If the data is too large to fit in the buffer that was passed to us. Unfortunately this is a somewhat
// unrecoverable state because we've overwritten the value already. We could fail insertion if the data
// buffer passed is too small in future? That would mean doing a get call first, which the client can do
// too.
match safely_write_data_back(&memory_view, &data, data_buffer, data_buffer_len) {
Ok(x) => x,
Ok(data) => {
// The insertion went fine: update counter for used storage
match env_data.storage_current.write() {
Ok(mut storage) => {
*storage = would_be_used_storage;
}
Err(e) => {
error!(
"{}: Data write error in storage_insert: {:?}",
env_data.name, e
"Critical error getting a write lock on used storage: {:?}",
e
);
e as i32
return FunctionErrors::InternalApiError as i32;
}
}
match data {
Some(data) => {
// If the data is too large to fit in the buffer that was passed to us. Unfortunately this is a somewhat
// unrecoverable state because we've overwritten the value already. We could fail insertion if the data
// buffer passed is too small in future? That would mean doing a get call first, which the client can do
// too.
match safely_write_data_back(&memory_view, &data, data_buffer, data_buffer_len)
{
Ok(x) => x,
Err(e) => {
error!(
"{}: Data write error in storage_insert: {:?}",
env_data.name, e
);
e as i32
}
}
}
// This occurs when there is no such key so the number of bytes that have been copied back are 0
None => 0,
}
}
// This occurs when there is no such key so the number of bytes that have been copied back are 0
Ok(None) => 0,
// If the storage system errors (for example a network problem if using a networked storage provider)
// the error is made opaque to the client here and we log what happened
Err(e) => {
Expand Down Expand Up @@ -255,6 +308,7 @@ pub fn delete(
return FunctionErrors::ParametersNotUtf8 as i32;
}
};
let key_len = key.as_bytes().len();

let result = match data_buffer_len {
// This is a call just to get the size of the buffer, so we do storage.get
Expand All @@ -272,19 +326,44 @@ pub fn delete(
};

match result {
Ok(Some(data)) => {
match safely_write_data_back(&memory_view, &data, data_buffer, data_buffer_len) {
Ok(x) => x,
Err(e) => {
error!(
"{}: Data write error in storage_delete: {:?}",
env_data.name, e
);
e as i32
Ok(data) => {
match data {
Some(data) => {
// Check if we were _actually_ deleting something
match data_buffer_len {
0 => {}
_ => {
// We were deleting something, so we decrease the counter for used storage
match env_data.storage_current.write() {
Ok(mut storage) => {
// no underflow: the result can never become negative
*storage = *storage - key_len as u64 - data.len() as u64;
}
Err(e) => {
error!(
"Critical error getting a write lock on used storage: {:?}",
e
);
return FunctionErrors::InternalApiError as i32;
}
}
}
}
match safely_write_data_back(&memory_view, &data, data_buffer, data_buffer_len)
{
Ok(x) => x,
Err(e) => {
error!(
"{}: Data write error in storage_delete: {:?}",
env_data.name, e
);
e as i32
}
}
}
None => 0,
}
}
Ok(None) => 0,
Err(_) => 0,
}
}
Loading

0 comments on commit bce21f6

Please sign in to comment.