diff --git a/readyset-mysql/tests/query_cache.rs b/readyset-mysql/tests/query_cache.rs index 9ccd6b3e2a..d36785e415 100644 --- a/readyset-mysql/tests/query_cache.rs +++ b/readyset-mysql/tests/query_cache.rs @@ -758,3 +758,140 @@ async fn test_binlog_transaction_compression() { shutdown_tx.shutdown().await; } + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_char_padding_lookup() { + let query_status_cache: &'static _ = Box::leak(Box::new(QueryStatusCache::new())); + let (opts, _handle, shutdown_tx) = setup( + query_status_cache, + true, // fallback enabled + MigrationMode::OutOfBand, + UnsupportedSetMode::Error, + ) + .await; + let mut conn = Conn::new(opts).await.unwrap(); + conn.query_drop( + "CREATE TABLE `col_pad_lookup` ( + id int NOT NULL PRIMARY KEY, + c CHAR(3) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;", + ) + .await + .unwrap(); + conn.query_drop("INSERT INTO `col_pad_lookup` VALUES (1, 'ࠈࠈ');") + .await + .unwrap(); + conn.query_drop("INSERT INTO `col_pad_lookup` VALUES (2, 'A');") + .await + .unwrap(); + conn.query_drop("INSERT INTO `col_pad_lookup` VALUES (3, 'AAA');") + .await + .unwrap(); + sleep().await; + conn.query_drop("CREATE CACHE test FROM SELECT id, c FROM col_pad_lookup WHERE c = ?") + .await + .unwrap(); + let row: Vec<(u32, String)> = conn + .query("SELECT id, c FROM col_pad_lookup WHERE c = 'ࠈࠈ'") + .await + .unwrap(); + assert_eq!(row.len(), 1); + let last_status = last_query_info(&mut conn).await; + assert_eq!(last_status.destination, QueryDestination::Readyset); + assert_eq!(row[0].1, "ࠈࠈ "); + + let row: Vec<(u32, String)> = conn + .query("SELECT id, c FROM col_pad_lookup WHERE c = 'A'") + .await + .unwrap(); + assert_eq!(row.len(), 1); + let last_status = last_query_info(&mut conn).await; + assert_eq!(last_status.destination, QueryDestination::Readyset); + assert_eq!(row[0].1, "A "); + + let row: Vec<(u32, String)> = conn + .query("SELECT id, c FROM col_pad_lookup WHERE c = 'AAA'") + .await + .unwrap(); + assert_eq!(row.len(), 1); + let last_status = last_query_info(&mut conn).await; + assert_eq!(last_status.destination, QueryDestination::Readyset); + assert_eq!(row[0].1, "AAA"); + shutdown_tx.shutdown().await; +} + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_binary_padding_lookup() { + let query_status_cache: &'static _ = Box::leak(Box::new(QueryStatusCache::new())); + let (opts, _handle, shutdown_tx) = setup( + query_status_cache, + true, // fallback enabled + MigrationMode::OutOfBand, + UnsupportedSetMode::Error, + ) + .await; + let mut conn = Conn::new(opts).await.unwrap(); + conn.query_drop( + "CREATE TABLE `col_pad_bin_lookup` ( + id int NOT NULL PRIMARY KEY, + b BINARY(3) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;", + ) + .await + .unwrap(); + conn.query_drop("INSERT INTO `col_pad_bin_lookup` VALUES (1, 'ࠈ');") + .await + .unwrap(); + conn.query_drop("INSERT INTO `col_pad_bin_lookup` VALUES (2, '¥');") + .await + .unwrap(); + conn.query_drop("INSERT INTO `col_pad_bin_lookup` VALUES (3, 'A');") + .await + .unwrap(); + conn.query_drop("INSERT INTO `col_pad_bin_lookup` VALUES (4, 'A¥');") + .await + .unwrap(); + + sleep().await; + conn.query_drop("CREATE CACHE test FROM SELECT id, b FROM col_pad_bin_lookup WHERE b = ?") + .await + .unwrap(); + let row: Vec<(u32, String)> = conn + .query("SELECT id, b FROM col_pad_bin_lookup WHERE b = 'ࠈ'") + .await + .unwrap(); + assert_eq!(row.len(), 1); + let last_status = last_query_info(&mut conn).await; + assert_eq!(last_status.destination, QueryDestination::Readyset); + assert_eq!(row[0].1, "0xe0a088"); + + let row: Vec<(u32, String)> = conn + .query("SELECT id, b FROM col_pad_bin_lookup WHERE b = '¥'") + .await + .unwrap(); + assert_eq!(row.len(), 1); + let last_status = last_query_info(&mut conn).await; + assert_eq!(last_status.destination, QueryDestination::Readyset); + assert_eq!(row[0].1, "0xc2a500"); + + let row: Vec<(u32, String)> = conn + .query("SELECT id, b FROM col_pad_bin_lookup WHERE b = 'A'") + .await + .unwrap(); + assert_eq!(row.len(), 1); + let last_status = last_query_info(&mut conn).await; + assert_eq!(last_status.destination, QueryDestination::Readyset); + assert_eq!(row[0].1, "0x410000"); + + let row: Vec<(u32, String)> = conn + .query("SELECT id, b FROM col_pad_bin_lookup WHERE b = 'A¥'") + .await + .unwrap(); + assert_eq!(row.len(), 1); + let last_status = last_query_info(&mut conn).await; + assert_eq!(last_status.destination, QueryDestination::Readyset); + assert_eq!(row[0].1, "0x41c2a5"); + shutdown_tx.shutdown().await; +} diff --git a/replicators/src/mysql_connector/connector.rs b/replicators/src/mysql_connector/connector.rs index 39e574dbda..a0f430c3e7 100644 --- a/replicators/src/mysql_connector/connector.rs +++ b/replicators/src/mysql_connector/connector.rs @@ -5,7 +5,7 @@ use std::io; use async_trait::async_trait; use binlog::consts::{BinlogChecksumAlg, EventType}; use metrics::counter; -use mysql::binlog::events::StatusVarVal; +use mysql::binlog::events::{OptionalMetaExtractor, StatusVarVal}; use mysql::binlog::jsonb::{self, JsonbToJsonError}; use mysql::prelude::Queryable; use mysql_async as mysql; @@ -22,6 +22,7 @@ use replication_offset::mysql::MySqlPosition; use replication_offset::ReplicationOffset; use tracing::{error, info, warn}; +use crate::mysql_connector::utils::mysql_pad_collation_column; use crate::noria_adapter::{Connector, ReplicationAction}; const CHECKSUM_QUERY: &str = "SET @source_binlog_checksum='CRC32'"; @@ -847,6 +848,7 @@ fn binlog_val_to_noria_val( val: &mysql_common::value::Value, col_kind: mysql_common::constants::ColumnType, meta: &[u8], + collation: u16, ) -> mysql::Result { // Not all values are coerced to the value expected by ReadySet directly @@ -861,8 +863,8 @@ fn binlog_val_to_noria_val( } }; - match (col_kind, meta) { - (ColumnType::MYSQL_TYPE_TIMESTAMP2, &[0]) => { + match (col_kind, meta, collation) { + (ColumnType::MYSQL_TYPE_TIMESTAMP2, &[0], _) => { //https://github.com/blackbeam/rust_mysql_common/blob/408effed435c059d80a9e708bcfa5d974527f476/src/binlog/value.rs#L144 // When meta is 0, `mysql_common` encodes this value as number of seconds (since UNIX // EPOCH) @@ -876,7 +878,7 @@ fn binlog_val_to_noria_val( // Can unwrap because we know it maps directly to [`DfValue`] Ok(time.into()) } - (ColumnType::MYSQL_TYPE_TIMESTAMP2, _) => { + (ColumnType::MYSQL_TYPE_TIMESTAMP2, _, _) => { // When meta is anything else, `mysql_common` encodes this value as number of // seconds.microseconds (since UNIX EPOCH) let s = String::from_utf8_lossy(buf); @@ -887,6 +889,14 @@ fn binlog_val_to_noria_val( // Can wrap because we know this maps directly to [`DfValue`] Ok(time.into()) } + (ColumnType::MYSQL_TYPE_STRING, meta, collation) => { + Ok(mysql_pad_collation_column( + buf, + col_kind, + collation, + meta[1] as usize, // 2nd byte of meta is the length of the string + )) + } _ => Ok(val.try_into().map_err(|e| { mysql_async::Error::Other(Box::new(internal_err!("Unable to coerce value {}", e))) })?), @@ -897,6 +907,9 @@ fn binlog_row_to_noria_row( binlog_row: &BinlogRow, tme: &binlog::events::TableMapEvent<'static>, ) -> mysql::Result> { + let opt_meta_extractor = OptionalMetaExtractor::new(tme.iter_optional_meta()).unwrap(); + let mut charset_iter = opt_meta_extractor.iter_charset(); + let mut enum_and_set_charset_iter = opt_meta_extractor.iter_enum_and_set_charset(); (0..binlog_row.len()) .map(|idx| { match binlog_row.as_ref(idx).unwrap() { @@ -912,7 +925,17 @@ fn binlog_row_to_noria_row( .unwrap(), tme.get_column_metadata(idx).unwrap(), ); - binlog_val_to_noria_val(val, kind, meta) + let charset = if kind.is_character_type() { + charset_iter.next().transpose()?.unwrap_or_default() + } else if kind.is_enum_or_set_type() { + enum_and_set_charset_iter + .next() + .transpose()? + .unwrap_or_default() + } else { + Default::default() + }; + binlog_val_to_noria_val(val, kind, meta, charset) } BinlogValue::Jsonb(val) => { let json: Result = val.clone().try_into(); // urgh no TryFrom impl diff --git a/replicators/src/mysql_connector/mod.rs b/replicators/src/mysql_connector/mod.rs index 8cb1b9043d..af4f20af2d 100644 --- a/replicators/src/mysql_connector/mod.rs +++ b/replicators/src/mysql_connector/mod.rs @@ -1,5 +1,6 @@ mod connector; mod snapshot; +mod utils; pub(crate) use connector::MySqlBinlogConnector; pub(crate) use snapshot::MySqlReplicator; diff --git a/replicators/src/mysql_connector/snapshot.rs b/replicators/src/mysql_connector/snapshot.rs index 6f1da46d1e..7bcddb3177 100644 --- a/replicators/src/mysql_connector/snapshot.rs +++ b/replicators/src/mysql_connector/snapshot.rs @@ -11,6 +11,8 @@ use itertools::Itertools; use mysql::prelude::Queryable; use mysql::{Transaction, TxOpts}; use mysql_async as mysql; +use mysql_common::constants::ColumnType; +use mysql_srv::ColumnFlags; use nom_sql::{DialectDisplay, NonReplicatedRelation, NotReplicatedReason, Relation}; use readyset_client::recipe::changelist::{Change, ChangeList}; use readyset_data::Dialect; @@ -21,6 +23,7 @@ use tokio::task::JoinHandle; use tracing::{debug, error, info, info_span, warn}; use tracing_futures::Instrument; +use super::utils::mysql_pad_collation_column; use crate::db_util::DatabaseSchemas; use crate::table_filter::TableFilter; use crate::TablesSnapshottingGaugeGuard; @@ -765,7 +768,33 @@ fn mysql_row_to_noria_row(row: mysql::Row) -> ReadySetResult { + let bytes = match val.clone() { + mysql_common::value::Value::Bytes(b) => b, + _ => { + return Err(internal_err!( + "Expected MYSQL_TYPE_STRING column to be of value Bytes, got {:?}", + val + )); + } + }; + noria_row.push(mysql_pad_collation_column( + &bytes, + col.column_type(), + col.character_set(), + col.column_length() as usize, + )); + } + false => noria_row.push(readyset_data::DfValue::try_from(val)?), + } } Ok(noria_row) } diff --git a/replicators/src/mysql_connector/utils.rs b/replicators/src/mysql_connector/utils.rs new file mode 100644 index 0000000000..b3a11218c0 --- /dev/null +++ b/replicators/src/mysql_connector/utils.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; + +use mysql_common::collations::{self, Collation, CollationId}; +use mysql_srv::ColumnType; +use readyset_data::DfValue; + +/// Pad a MYSQL_TYPE_STRING (CHAR / BINARY) column value to the correct length for the given column +/// type and charset. +/// +/// Parameters: +/// - `val`: The current column value as a vector of bytes. +/// - `col`: The column type. +/// - `collation`: The collation ID of the column. +/// - `col_len`: The length of the column in bytes. +/// +/// Returns: +/// - A `DfValue` representing the padded column value - `CHAR` will return a `TinyText` or `Text` +/// and `BINARY` will return a `ByteArray`. +pub fn mysql_pad_collation_column( + val: &Vec, + col: ColumnType, + collation: u16, + col_len: usize, +) -> DfValue { + assert_eq!(col, ColumnType::MYSQL_TYPE_STRING); + let collation: Collation = collations::CollationId::from(collation).into(); + match collation.id() { + CollationId::BINARY => { + if val.len() < col_len { + let mut padded = val.clone(); + padded.extend(std::iter::repeat(0).take(col_len - val.len())); + return DfValue::ByteArray(Arc::new(padded)); + } + DfValue::ByteArray(Arc::new(val.to_vec())) + } + _ => { + let column_length_characters = col_len / collation.max_len() as usize; + let mut str = String::from_utf8_lossy(val).to_string(); + let str_len = str.chars().count(); + if str_len < column_length_characters { + str.extend(std::iter::repeat(' ').take(column_length_characters - str_len)); + } + DfValue::from(str) + } + } +} diff --git a/replicators/tests/tests.rs b/replicators/tests/tests.rs index dc8ae0cf50..58bd3ffea3 100644 --- a/replicators/tests/tests.rs +++ b/replicators/tests/tests.rs @@ -683,6 +683,238 @@ async fn mysql_datetime_replication() { mysql_datetime_replication_inner().await.unwrap(); } +async fn mysql_binary_collation_padding_inner() -> ReadySetResult<()> { + let url = &mysql_url(); + let mut client = DbConnection::connect(url).await?; + client + .query( + " + DROP TABLE IF EXISTS `col_bin_pad` CASCADE; + CREATE TABLE `col_bin_pad` ( + id int NOT NULL PRIMARY KEY, + c BINARY(3) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; + INSERT INTO `col_bin_pad` VALUES (1, 'ࠈ'); + INSERT INTO `col_bin_pad` VALUES (2, 'A'); + INSERT INTO `col_bin_pad` VALUES (3, 'AAA'); + INSERT INTO `col_bin_pad` VALUES (4, '¥');", + ) + .await?; + let (mut ctx, shutdown_tx) = TestHandle::start_noria(url.to_string(), None).await?; + ctx.notification_channel + .as_mut() + .unwrap() + .snapshot_completed() + .await + .unwrap(); + ctx.check_results( + "col_bin_pad", + "Snapshot", + &[ + &[ + DfValue::Int(1), + // 'ࠈ' is the UTF-8 encoding of U+E0A088 + DfValue::ByteArray(vec![0xE0, 0xA0, 0x88].into()), + ], + &[ + DfValue::Int(2), + DfValue::ByteArray(vec![0x41, 0x0, 0x0].into()), + ], + &[ + DfValue::Int(3), + DfValue::ByteArray(vec![0x41, 0x41, 0x41].into()), + ], + &[ + DfValue::Int(4), + // '¥' is the UTF-8 encoding of U+C2A5 + DfValue::ByteArray(vec![0xC2, 0xA5, 0x0].into()), + ], + ], + ) + .await?; + + // Replication and mix of characters from 1st and 2rd byte on the same row + client + .query( + " + INSERT INTO `col_bin_pad` VALUES (5, 'B¥'); + INSERT INTO `col_bin_pad` VALUES (6, 'B'); + INSERT INTO `col_bin_pad` VALUES (7, 'BBB'); + INSERT INTO `col_bin_pad` VALUES (8, '¥'); + ", + ) + .await + .unwrap(); + ctx.check_results( + "col_bin_pad", + "Replication", + &[ + &[ + DfValue::Int(1), + // 'ࠈ' is the UTF-8 encoding of U+E0A088 + DfValue::ByteArray(vec![0xE0, 0xA0, 0x88].into()), + ], + &[ + DfValue::Int(2), + DfValue::ByteArray(vec![0x41, 0x0, 0x0].into()), + ], + &[ + DfValue::Int(3), + DfValue::ByteArray(vec![0x41, 0x41, 0x41].into()), + ], + &[ + DfValue::Int(4), + // '¥' is the UTF-8 encoding of U+C2A5 + DfValue::ByteArray(vec![0xC2, 0xA5, 0x0].into()), + ], + &[ + DfValue::Int(5), + // '¥' is the UTF-8 encoding of U+C2A5 + DfValue::ByteArray(vec![0x42, 0xC2, 0xA5].into()), + ], + &[ + DfValue::Int(6), + DfValue::ByteArray(vec![0x42, 0x0, 0x0].into()), + ], + &[ + DfValue::Int(7), + DfValue::ByteArray(vec![0x42, 0x42, 0x42].into()), + ], + &[ + DfValue::Int(8), + // '¥' is the UTF-8 encoding of U+C2A5 + DfValue::ByteArray(vec![0xC2, 0xA5, 0x0].into()), + ], + ], + ) + .await?; + + client.stop().await; + ctx.stop().await; + shutdown_tx.shutdown().await; + + Ok(()) +} + +async fn mysql_char_collation_padding_inner() -> ReadySetResult<()> { + let url = &mysql_url(); + let mut client = DbConnection::connect(url).await?; + client + .query( + " + DROP TABLE IF EXISTS `col_pad` CASCADE; + CREATE TABLE `col_pad` ( + id int NOT NULL PRIMARY KEY, + c CHAR(3) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; + INSERT INTO `col_pad` VALUES (1, 'ࠈࠈ'); + INSERT INTO `col_pad` VALUES (2, 'A'); + INSERT INTO `col_pad` VALUES (3, 'AAA');", + ) + .await?; + let (mut ctx, shutdown_tx) = TestHandle::start_noria(url.to_string(), None).await?; + ctx.notification_channel + .as_mut() + .unwrap() + .snapshot_completed() + .await + .unwrap(); + ctx.check_results( + "col_pad", + "Snapshot", + &[ + &[ + DfValue::Int(1), + // 'ࠈࠈ ' is the UTF-8 encoding of U+E0A088 U+E0A088 U+20 + DfValue::TinyText( + TinyText::from_slice(vec![0xE0, 0xA0, 0x88, 0xE0, 0xA0, 0x88, 0x20].as_slice()) + .unwrap_or_else(|_| TinyText::from_arr(b"")), + ), + ], + &[ + DfValue::Int(2), + DfValue::TinyText(TinyText::from_arr(b"A ")), + ], + &[ + DfValue::Int(3), + DfValue::TinyText(TinyText::from_arr(b"AAA")), + ], + ], + ) + .await?; + + // Replication and mix of characters from 1st and 3rd byte on the same row + client + .query( + " + INSERT INTO `col_pad` VALUES (4, 'Bࠉ'); + INSERT INTO `col_pad` VALUES (5, 'B'); + INSERT INTO `col_pad` VALUES (6, 'BBB'); + ", + ) + .await + .unwrap(); + ctx.check_results( + "col_pad", + "Replication", + &[ + &[ + DfValue::Int(1), + // 'ࠈࠈ ' is the UTF-8 encoding of U+E0A088 U+E0A088 U+20 + DfValue::TinyText( + TinyText::from_slice(vec![0xE0, 0xA0, 0x88, 0xE0, 0xA0, 0x88, 0x20].as_slice()) + .unwrap_or_else(|_| TinyText::from_arr(b"")), + ), + ], + &[ + DfValue::Int(2), + DfValue::TinyText(TinyText::from_arr(b"A ")), + ], + &[ + DfValue::Int(3), + DfValue::TinyText(TinyText::from_arr(b"AAA")), + ], + &[ + DfValue::Int(4), + // 'Bࠉ ' is the UTF-8 encoding of U+E42 U+E0A089 U+20 + DfValue::TinyText( + TinyText::from_slice(vec![0x42, 0xE0, 0xA0, 0x89, 0x20].as_slice()) + .unwrap_or_else(|_| TinyText::from_arr(b"")), + ), + ], + &[ + DfValue::Int(5), + DfValue::TinyText(TinyText::from_arr(b"B ")), + ], + &[ + DfValue::Int(6), + DfValue::TinyText(TinyText::from_arr(b"BBB")), + ], + ], + ) + .await?; + + client.stop().await; + ctx.stop().await; + shutdown_tx.shutdown().await; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +#[serial_test::serial] +#[slow] +async fn mysql_binary_collation_padding() { + mysql_binary_collation_padding_inner().await.unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +#[serial_test::serial] +#[slow] +async fn mysql_char_collation_padding() { + mysql_char_collation_padding_inner().await.unwrap(); +} + #[tokio::test(flavor = "multi_thread")] #[serial_test::serial] #[slow]