Skip to content

Commit

Permalink
ApiVersions rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Dec 2, 2024
1 parent 059dc73 commit 66c16d7
Show file tree
Hide file tree
Showing 8 changed files with 411 additions and 349 deletions.
584 changes: 289 additions & 295 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ aws-config = { version = "1.0.0", optional = true }
aws-sdk-kms = { version = "1.1.0", optional = true }
chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true }
generic-array = { version = "0.14", features = ["serde"], optional = true }
kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] }
kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"], git = "https://github.com/tychedelia/kafka-protocol-rs" }
rustls = { version = "0.23.18", default-features = false, features = ["tls12"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
rustls-pemfile = "2.0.0"
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/codec/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl Decoder for KafkaDecoder {
} else if self.expect_raw_sasl.is_some() {
Meta {
request_header: RequestHeader {
api_key: ApiKey::SaslAuthenticateKey,
api_key: ApiKey::SaslAuthenticate,
version: 0,
},
// This code path is only used for requests, so message_id can be None.
Expand Down Expand Up @@ -210,7 +210,7 @@ impl Decoder for KafkaDecoder {
}
} else {
// set expect_raw_sasl for requests
if meta.request_header.api_key == ApiKey::SaslHandshakeKey
if meta.request_header.api_key == ApiKey::SaslHandshake
&& meta.request_header.version == 0
{
// Only parse the full frame once we manually check its a v0 sasl handshake
Expand Down Expand Up @@ -326,7 +326,7 @@ impl Encoder<Messages> for KafkaEncoder {
if let Some(tx) = self.request_header_tx.as_ref() {
let header = if message_contains_raw_sasl {
RequestHeader {
api_key: ApiKey::SaslAuthenticateKey,
api_key: ApiKey::SaslAuthenticate,
version: 0,
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/frame/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl KafkaFrame {
}),
None => Ok(KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::SaslAuthenticateKey as i16),
.with_request_api_key(ApiKey::SaslAuthenticate as i16),
body: RequestBody::SaslAuthenticate(
SaslAuthenticateRequest::default().with_auth_bytes(bytes),
),
Expand Down
154 changes: 111 additions & 43 deletions shotover/src/transforms/kafka/sink_cluster/api_versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,123 @@ use kafka_protocol::{messages::ApiKey, protocol::VersionRange};
// + Make sure any new fields do not break any of the requirements listed above
pub(crate) fn versions_supported_by_key(api_key: i16) -> Option<VersionRange> {
match ApiKey::try_from(api_key) {
Ok(ApiKey::ProduceKey) => Some(VersionRange { min: 0, max: 11 }),
Ok(ApiKey::FetchKey) => Some(VersionRange { min: 0, max: 16 }),
Ok(ApiKey::ListOffsetsKey) => Some(VersionRange { min: 0, max: 8 }),
Ok(ApiKey::MetadataKey) => Some(VersionRange { min: 0, max: 12 }),
Ok(ApiKey::OffsetCommitKey) => Some(VersionRange { min: 0, max: 9 }),
Ok(ApiKey::OffsetFetchKey) => Some(VersionRange { min: 0, max: 9 }),
Ok(ApiKey::FindCoordinatorKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::JoinGroupKey) => Some(VersionRange { min: 0, max: 9 }),
Ok(ApiKey::HeartbeatKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::LeaveGroupKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::SyncGroupKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::DescribeGroupsKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::ListGroupsKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::SaslHandshakeKey) => Some(VersionRange { min: 0, max: 1 }),
Ok(ApiKey::ApiVersionsKey) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::CreateTopicsKey) => Some(VersionRange { min: 0, max: 7 }),
Ok(ApiKey::DeleteTopicsKey) => Some(VersionRange { min: 0, max: 6 }),
Ok(ApiKey::DeleteRecordsKey) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::InitProducerIdKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::OffsetForLeaderEpochKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::AddPartitionsToTxnKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::AddOffsetsToTxnKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::EndTxnKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::TxnOffsetCommitKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::CreateAclsKey) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::DescribeConfigsKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::AlterConfigsKey) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::DescribeLogDirsKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::SaslAuthenticateKey) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::CreatePartitionsKey) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::DeleteGroupsKey) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::ElectLeadersKey) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::AlterPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::ListPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::OffsetDeleteKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::AlterPartitionKey) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::DescribeClusterKey) => Some(VersionRange { min: 0, max: 1 }),
Ok(ApiKey::DescribeProducersKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::DescribeTransactionsKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::ListTransactionsKey) => Some(VersionRange { min: 0, max: 1 }),
Ok(ApiKey::ConsumerGroupHeartbeatKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::ConsumerGroupDescribeKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::Produce) => Some(VersionRange { min: 0, max: 11 }),
Ok(ApiKey::Fetch) => Some(VersionRange { min: 0, max: 16 }),
Ok(ApiKey::ListOffsets) => Some(VersionRange { min: 0, max: 8 }),
Ok(ApiKey::Metadata) => Some(VersionRange { min: 0, max: 12 }),
Ok(ApiKey::OffsetCommit) => Some(VersionRange { min: 0, max: 9 }),
Ok(ApiKey::OffsetFetch) => Some(VersionRange { min: 0, max: 9 }),
Ok(ApiKey::FindCoordinator) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::JoinGroup) => Some(VersionRange { min: 0, max: 9 }),
Ok(ApiKey::Heartbeat) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::LeaveGroup) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::SyncGroup) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::DescribeGroups) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::ListGroups) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::SaslHandshake) => Some(VersionRange { min: 0, max: 1 }),
Ok(ApiKey::ApiVersions) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::CreateTopics) => Some(VersionRange { min: 0, max: 7 }),
Ok(ApiKey::DeleteTopics) => Some(VersionRange { min: 0, max: 6 }),
Ok(ApiKey::DeleteRecords) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::InitProducerId) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::OffsetForLeaderEpoch) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::AddPartitionsToTxn) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::AddOffsetsToTxn) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::EndTxn) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::TxnOffsetCommit) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::CreateAcls) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::DescribeConfigs) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::AlterConfigs) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::DescribeLogDirs) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::SaslAuthenticate) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::CreatePartitions) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::DeleteGroups) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::ElectLeaders) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::AlterPartitionReassignments) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::ListPartitionReassignments) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::OffsetDelete) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::AlterPartition) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::DescribeCluster) => Some(VersionRange { min: 0, max: 1 }),
Ok(ApiKey::DescribeProducers) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::DescribeTransactions) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::ListTransactions) => Some(VersionRange { min: 0, max: 1 }),
Ok(ApiKey::ConsumerGroupHeartbeat) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::ConsumerGroupDescribe) => Some(VersionRange { min: 0, max: 0 }),
// This message type has very little documentation available and kafka responds to it with an error code 35 UNSUPPORTED_VERSION
// So its not clear at all how to implement this and its not even possible to test it.
// Instead lets just ask the client to not send it at all.
// We can consider supporting it when kafka itself starts to support it but we will need to be very
// careful to correctly implement the pagination/cursor logic.
Ok(ApiKey::DescribeTopicPartitionsKey) => None,
Ok(ApiKey::DescribeTopicPartitions) => None,
Ok(_) => None,
Err(_) => None,
}
}

// This test gives visibility into the api versions that shotover doesnt support yet.
// If this test is failing after a `cargo update`, you can just alter EXPECTED_ERROR_MESSAGE to include the new versions.
// The actual upgrade can be done later.
#[test]
fn check_api_version_backlog() {
use std::fmt::Write;
const EXPECTED_ERROR_MESSAGE: &str = r#"
LeaderAndIsrKey kafka-protocol=0..7 shotover=NotSupported
StopReplicaKey kafka-protocol=0..4 shotover=NotSupported
UpdateMetadataKey kafka-protocol=0..8 shotover=NotSupported
ControlledShutdownKey kafka-protocol=0..3 shotover=NotSupported
WriteTxnMarkersKey kafka-protocol=0..1 shotover=NotSupported
DescribeAclsKey kafka-protocol=0..3 shotover=NotSupported
DeleteAclsKey kafka-protocol=0..3 shotover=NotSupported
AlterReplicaLogDirsKey kafka-protocol=0..2 shotover=NotSupported
CreateDelegationTokenKey kafka-protocol=0..3 shotover=NotSupported
RenewDelegationTokenKey kafka-protocol=0..2 shotover=NotSupported
ExpireDelegationTokenKey kafka-protocol=0..2 shotover=NotSupported
DescribeDelegationTokenKey kafka-protocol=0..3 shotover=NotSupported
IncrementalAlterConfigsKey kafka-protocol=0..1 shotover=NotSupported
DescribeClientQuotasKey kafka-protocol=0..1 shotover=NotSupported
AlterClientQuotasKey kafka-protocol=0..1 shotover=NotSupported
DescribeUserScramCredentialsKey kafka-protocol=0..0 shotover=NotSupported
AlterUserScramCredentialsKey kafka-protocol=0..0 shotover=NotSupported
VoteKey kafka-protocol=0..0 shotover=NotSupported
BeginQuorumEpochKey kafka-protocol=0..0 shotover=NotSupported
EndQuorumEpochKey kafka-protocol=0..0 shotover=NotSupported
DescribeQuorumKey kafka-protocol=0..1 shotover=NotSupported
UpdateFeaturesKey kafka-protocol=0..1 shotover=NotSupported
EnvelopeKey kafka-protocol=0..0 shotover=NotSupported
FetchSnapshotKey kafka-protocol=0..0 shotover=NotSupported
BrokerRegistrationKey kafka-protocol=0..3 shotover=NotSupported
BrokerHeartbeatKey kafka-protocol=0..1 shotover=NotSupported
UnregisterBrokerKey kafka-protocol=0..0 shotover=NotSupported
AllocateProducerIdsKey kafka-protocol=0..0 shotover=NotSupported
ControllerRegistrationKey kafka-protocol=0..0 shotover=NotSupported
GetTelemetrySubscriptionsKey kafka-protocol=0..0 shotover=NotSupported
PushTelemetryKey kafka-protocol=0..0 shotover=NotSupported
AssignReplicasToDirsKey kafka-protocol=0..0 shotover=NotSupported
ListClientMetricsResourcesKey kafka-protocol=0..0 shotover=NotSupported
DescribeTopicPartitionsKey kafka-protocol=0..0 shotover=NotSupported
"#;

let mut error_message = String::new();
for api_key in ApiKey::iter() {
let shotover_version = versions_supported_by_key(api_key as i16);

let kafka_protocol_version = api_key.valid_versions();
if shotover_version != Some(kafka_protocol_version) {
let shotover_version = match shotover_version {
Some(version) => format!("{version}"),
None => "NotSupported".to_owned(),
};
writeln!(
error_message,
"{api_key:?} kafka-protocol={kafka_protocol_version} shotover={shotover_version}"
)
.unwrap();
}
}

pretty_assertions::assert_eq!(
error_message.trim(),
EXPECTED_ERROR_MESSAGE.trim(),
"The list of message types not supported by shotover differs from the expected list defined in EXPECTED_ERROR_MESSAGE",
);
}
2 changes: 1 addition & 1 deletion shotover/src/transforms/kafka/sink_cluster/kafka_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl ConnectionFactory {
fn create_auth_request(bytes: Vec<u8>) -> Message {
Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::SaslAuthenticateKey as i16)
.with_request_api_key(ApiKey::SaslAuthenticate as i16)
.with_request_api_version(2),
body: RequestBody::SaslAuthenticate(
SaslAuthenticateRequest::default().with_auth_bytes(bytes.into()),
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1951,7 +1951,7 @@ The connection to the client has been closed."
) -> Result<KafkaNode, FindCoordinatorError> {
let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::FindCoordinatorKey as i16)
.with_request_api_key(ApiKey::FindCoordinator as i16)
.with_request_api_version(2)
.with_correlation_id(0),
body: RequestBody::FindCoordinator(
Expand Down Expand Up @@ -2035,7 +2035,7 @@ The connection to the client has been closed."
let api_version = if topic_ids.is_empty() { 4 } else { 12 };
let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::MetadataKey as i16)
.with_request_api_key(ApiKey::Metadata as i16)
.with_request_api_version(api_version)
.with_correlation_id(0),
body: RequestBody::Metadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn find_new_brokers(nodes: &mut Vec<Node>, rng: &mut SmallRng) -> Result<(

let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::MetadataKey as i16)
.with_request_api_key(ApiKey::Metadata as i16)
.with_request_api_version(4)
.with_correlation_id(0),
body: RequestBody::Metadata(MetadataRequest::default()),
Expand Down Expand Up @@ -127,7 +127,7 @@ async fn create_delegation_token_for_user(
connection.send(vec![Message::from_frame(Frame::Kafka(
KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::CreateDelegationTokenKey as i16)
.with_request_api_key(ApiKey::CreateDelegationToken as i16)
.with_request_api_version(3),
body: RequestBody::CreateDelegationToken(
CreateDelegationTokenRequest::default()
Expand Down Expand Up @@ -241,7 +241,7 @@ async fn is_delegation_token_ready(
connection.send(vec![Message::from_frame(Frame::Kafka(
KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::DescribeDelegationTokenKey as i16)
.with_request_api_key(ApiKey::DescribeDelegationToken as i16)
.with_request_api_version(3),
body: RequestBody::DescribeDelegationToken(
DescribeDelegationTokenRequest::default().with_owners(Some(vec![
Expand Down

0 comments on commit 66c16d7

Please sign in to comment.