Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dont log message contents #1862

Merged
merged 1 commit into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ reqwest = "0.12.0"
redis = { version = "0.24.0", features = ["tokio-comp", "cluster"] }
cdrs-tokio = "8.0"
cassandra-protocol = "3.0"
tracing = "0.1.15"
# https://docs.rs/tracing/latest/tracing/level_filters/index.html#compile-time-filters
# `trace` level is considered development only, and may contain sensitive data, do not include it in release builds.
tracing = { version = "0.1.15", features = ["release_max_level_debug"] }
tracing-subscriber = { version = "0.3.1", features = ["env-filter", "json"] }
tracing-appender = "0.2.0"
serde_json = "1.0"
Expand Down
3 changes: 1 addition & 2 deletions shotover/src/codec/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,7 @@ impl Decoder for CassandraDecoder {
Err(CodecReadError::Parser(anyhow!(msg)))
}
err => Err(CodecReadError::Parser(anyhow!(
"Failed to parse frame {:?}",
err
"Failed to parse frame {err:?}"
))),
}
}
Expand Down
4 changes: 3 additions & 1 deletion shotover/src/codec/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ impl Encoder<Messages> for KafkaEncoder {
}) => {
dst.extend_from_slice(&body.auth_bytes);
}
_ => unreachable!("not expected {frame:?}"),
_ => unreachable!(
"Expected kafka sasl authenticate request or response but was not"
),
}
Ok(())
} else {
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ async fn reader_task<C: CodecBuilder + 'static, R: AsyncRead + Unpin + Send + 's
force_run_chain.notify_one();
}
Err(CodecReadError::RespondAndThenCloseConnection(messages)) => {
if let Err(err) = out_tx.send(messages) {
error!("Failed to send RespondAndThenCloseConnection message: {:?}", err);
if out_tx.send(messages).is_err() {
error!("Failed to send RespondAndThenCloseConnection message");
}
return Err(ConnectionError::ShotoverClosed);
}
Expand Down
20 changes: 20 additions & 0 deletions shotover/src/frame/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,26 @@ impl CassandraFrame {
}
}
}
pub(crate) fn operation_name(operation: &CassandraOperation) -> &'static str {
match operation {
CassandraOperation::Query { .. } => "Query",
CassandraOperation::Result(_) => "Result",
CassandraOperation::Error(_) => "Error",
CassandraOperation::Prepare(_) => "Prepare",
CassandraOperation::Execute(_) => "Execute",
CassandraOperation::Register(_) => "Register",
CassandraOperation::Event(_) => "Event",
CassandraOperation::Batch(_) => "Batch",
CassandraOperation::Startup(_) => "Startup",
CassandraOperation::Ready(_) => "Ready",
CassandraOperation::Authenticate(_) => "Authenticate",
CassandraOperation::Options(_) => "Options",
CassandraOperation::Supported(_) => "Supported",
CassandraOperation::AuthChallenge(_) => "AuthChallenge",
CassandraOperation::AuthResponse(_) => "AuthResponse",
CassandraOperation::AuthSuccess(_) => "AuthSuccess",
}
}

#[derive(PartialEq, Debug, Clone)]
pub enum CassandraOperation {
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/frame/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Display for KafkaFrame {
header.unknown_tagged_fields
)?;
}
write!(f, " {:?}", body)?;
write!(f, " {body:?}")?;
}
KafkaFrame::Response {
version,
Expand All @@ -63,7 +63,7 @@ impl Display for KafkaFrame {
header.unknown_tagged_fields
)?;
}
write!(f, " {body:?}",)?;
write!(f, " {body:?}")?;
}
}
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions shotover/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,14 @@ impl Display for Frame {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
match self {
#[cfg(feature = "cassandra")]
Frame::Cassandra(frame) => write!(f, "Cassandra {}", frame),
Frame::Cassandra(frame) => write!(f, "Cassandra {frame}"),
#[cfg(feature = "valkey")]
Frame::Valkey(frame) => write!(f, "Valkey {:?}", frame),
Frame::Valkey(frame) => write!(f, "Valkey {frame:?}"),
#[cfg(feature = "kafka")]
Frame::Kafka(frame) => write!(f, "Kafka {}", frame),
Frame::Kafka(frame) => write!(f, "Kafka {frame}"),
Frame::Dummy => write!(f, "Shotover internal dummy message"),
#[cfg(feature = "opensearch")]
Frame::OpenSearch(frame) => write!(f, "OpenSearch: {:?}", frame),
Frame::OpenSearch(frame) => write!(f, "OpenSearch: {frame:?}"),
}
}
}
5 changes: 1 addition & 4 deletions shotover/src/frame/valkey.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ pub fn valkey_query_name(frame: &ValkeyFrame) -> Option<String> {
return Some(query_type);
}
Err(err) => {
tracing::error!(
"Failed to convert valkey bulkstring to string, err: {:?}",
err
)
tracing::error!("Failed to convert valkey bulkstring to string, err: {err:?}")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ impl Message {
message_type,
}) = &self.inner
{
format!("Unparseable {:?} message {:?}", message_type, bytes)
format!("Unparseable {message_type:?} message {bytes:?}")
} else {
unreachable!("self.frame() failed so MessageInner must still be RawBytes")
}
Expand Down
20 changes: 11 additions & 9 deletions shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use tokio_tungstenite::tungstenite::{
protocol::Message as WsMessage,
};
use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
use tracing::Instrument;
use tracing::{debug, error, warn};
use tracing::{trace, Instrument};

pub struct TcpCodecListener<C: CodecBuilder> {
chain_builder: TransformChainBuilder,
Expand Down Expand Up @@ -358,13 +358,13 @@ async fn spawn_websocket_read_write_tasks<
Err(CodecReadError::RespondAndThenCloseConnection(messages)) => {
if let Err(err) = out_tx.send(messages) {
// TODO we need to send a close message to the client
error!("Failed to send RespondAndThenCloseConnection message: {:?}", err);
error!("Failed to send RespondAndThenCloseConnection message: {err}");
}
return;
}
Err(CodecReadError::Parser(err)) => {
// TODO we need to send a close message to the client, protocol error
warn!("failed to decode message: {:?}", err);
warn!("failed to decode message: {err:?}");
return;
}
Err(CodecReadError::Io(_err)) => {
Expand Down Expand Up @@ -480,12 +480,12 @@ pub fn spawn_read_write_tasks<
}
Err(CodecReadError::RespondAndThenCloseConnection(messages)) => {
if let Err(err) = out_tx.send(messages) {
error!("Failed to send RespondAndThenCloseConnection message: {:?}", err);
error!("Failed to send RespondAndThenCloseConnection message: {err}");
}
return;
}
Err(CodecReadError::Parser(err)) => {
warn!("failed to decode message: {:?}", err);
warn!("failed to decode message: {err:?}");
return;
}
Err(CodecReadError::Io(err)) => {
Expand All @@ -494,7 +494,7 @@ pub fn spawn_read_write_tasks<
// We shouldnt report that as a warning because its common for clients to do
// that for performance reasons.
if !matches!(err.kind(), ErrorKind::UnexpectedEof) {
warn!("failed to receive message on tcp stream: {:?}", err);
warn!("failed to receive message on tcp stream: {err:?}");
}
return;
}
Expand Down Expand Up @@ -699,7 +699,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
while let Ok(x) = in_rx.try_recv() {
requests.extend(x);
}
debug!("A transform in the chain requested that a chain run occur, requests {:?}", requests);
debug!("running transform chain because a transform in the chain requested that a chain run occur");
if let Some(close_reason) = self.send_receive_chain(local_addr, &out_tx, requests).await? {
return Ok(close_reason)
}
Expand All @@ -710,7 +710,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
while let Ok(x) = in_rx.try_recv() {
requests.extend(x);
}
debug!("Received requests from client {:?}", requests);
debug!("running transform chain because requests received from client");
if let Some(close_reason) = self.send_receive_chain(local_addr, &out_tx, requests).await? {
return Ok(close_reason)
}
Expand All @@ -731,6 +731,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
out_tx: &mpsc::UnboundedSender<Messages>,
requests: Messages,
) -> Result<Option<CloseReason>> {
trace!("running transform chain with requests: {requests:?}");
let mut wrapper = ChainState::new_with_addr(requests, local_addr);

self.pending_requests.process_requests(&wrapper.requests);
Expand All @@ -748,7 +749,8 @@ impl<C: CodecBuilder + 'static> Handler<C> {

// send the result of the process up stream
if !responses.is_empty() {
debug!("sending response to client: {:?}", responses);
debug!("sending {} responses to client", responses.len());
trace!("sending response to client: {responses:?}");
if out_tx.send(responses).is_err() {
// the client has disconnected so we should terminate this connection
return Ok(Some(CloseReason::ClientClosed));
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ impl CassandraSinkCluster {
}
Err(GetReplicaErr::NoPreparedMetadata) => {
let id = execute.id.clone();
tracing::info!("forcing re-prepare on {:?}", id);
tracing::info!("forcing re-prepare on {id:?}");
// this shotover node doesn't have the metadata.
// send an unprepared error in response to force
// the client to reprepare the query
Expand Down Expand Up @@ -632,7 +632,7 @@ impl CassandraSinkCluster {
self.set_control_connection(connection, address);
}
tracing::info!(
"Control connection finalized against node at: {:?}",
"Control connection finalized against node at: {}",
self.control_connection_address.unwrap()
);

Expand Down
41 changes: 23 additions & 18 deletions shotover/src/transforms/cassandra/sink_cluster/rewrite.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::node::ConnectionFactory;
use super::node_pool::NodePool;
use super::ShotoverNode;
use crate::frame::cassandra::operation_name;
use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use crate::message::{Message, MessageIdMap, Messages};
use crate::{
Expand Down Expand Up @@ -350,9 +351,16 @@ impl MessageRewriter {
CassandraOperation::Error(_),
..
})) => None,
other => {
tracing::error!("Response to Prepare query was not a Prepared, was instead: {other:?}");
warnings.push(format!("Shotover: Response to Prepare query was not a Prepared, was instead: {other:?}"));
Some(Frame::Cassandra(CassandraFrame { operation, .. })) => {
let operation_name = operation_name(operation);
tracing::error!("Response to Prepare query was not a Prepared, was instead: {operation_name}");
warnings.push(format!("Shotover: Response to Prepare query was not a Prepared, was instead: {operation_name}"));
None
}
Some(_) => unreachable!("Response to prepare was not cassandra message"),
None => {
tracing::error!("Response to Prepare query was not parseable");
warnings.push("Shotover: Response to Prepare query was not parseable".to_owned());
None
}
})
Expand All @@ -366,12 +374,15 @@ impl MessageRewriter {
output
});

tracing::error!(
"Nodes did not return the same response to PREPARE statement {err_str}"
tracing::warn!(
"Nodes did not return the same response to PREPARE statement"
);
tracing::trace!(
"Nodes did not return the same response to PREPARE statement:{err_str}"
);
warnings.push(format!(
"Shotover: Nodes did not return the same response to PREPARE statement {err_str}"
));
"Shotover: Nodes did not return the same response to PREPARE statement {err_str}"
));
}
}

Expand Down Expand Up @@ -544,10 +555,7 @@ impl MessageRewriter {
}
Ok(())
} else {
Err(anyhow!(
"Failed to parse system.local response {:?}",
peers_response
))
Err(anyhow!("Failed to parse system.local response"))
}
}

Expand Down Expand Up @@ -670,10 +678,7 @@ impl MessageRewriter {
}
Ok(())
} else {
Err(anyhow!(
"Failed to parse system.local response {:?}",
local_response
))
Err(anyhow!("Failed to parse system.local response"))
}
}
}
Expand Down Expand Up @@ -831,13 +836,13 @@ fn parse_system_nodes(mut response: Message) -> Result<Vec<NodeInfo>, MessagePar
"system.local returned error: {error:?}",
))),
operation => Err(MessageParseError::ParseFailure(anyhow!(
"system.local returned unexpected cassandra operation: {operation:?}",
"system.local returned unexpected cassandra operation: {:?}",
operation_name(operation)
))),
}
} else {
Err(MessageParseError::ParseFailure(anyhow!(
"Failed to parse system.local response {:?}",
response
"Failed to parse system.local response"
)))
}
}
Loading
Loading