Skip to content

Commit

Permalink
chore: add fix create definition cmd in rise ctl (#19744)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored and zwang28 committed Jan 16, 2025
1 parent fa87bab commit 13d4dd8
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/ctl/src/cmd_impl/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod fix_create_definition;
mod fix_table_fragments;
mod meta_store;

pub use fix_create_definition::*;
pub use fix_table_fragments::*;
pub use meta_store::*;
83 changes: 83 additions & 0 deletions src/ctl/src/cmd_impl/debug/fix_create_definition.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use etcd_client::ConnectOptions;
use risingwave_meta::model::MetadataModel;
use risingwave_meta::storage::{EtcdMetaStore, WrappedEtcdClient};
use risingwave_pb::catalog::table::PbTableType;
use risingwave_pb::catalog::{PbSource, PbTable};

use crate::DebugCommon;

pub async fn fix_create_definition(common: DebugCommon, dry_run: bool) -> anyhow::Result<()> {
let DebugCommon {
etcd_endpoints,
etcd_username,
etcd_password,
enable_etcd_auth,
..
} = common;

let client = if enable_etcd_auth {
let options = ConnectOptions::default().with_user(
etcd_username.clone().unwrap_or_default(),
etcd_password.clone().unwrap_or_default(),
);
WrappedEtcdClient::connect(etcd_endpoints.clone(), Some(options), true).await?
} else {
WrappedEtcdClient::connect(etcd_endpoints.clone(), None, false).await?
};

let meta_store = EtcdMetaStore::new(client);

let mut tables = PbTable::list(&meta_store).await?;
for table in &mut tables {
if table.table_type() != PbTableType::Internal
&& table.table_type() != PbTableType::Index
&& (table.definition.contains("*WATERMARK") || table.definition.contains(")WATERMARK"))
{
println!("table: id={}, name={}: ", table.id, table.name);
let new_definition = table
.definition
.replace("*WATERMARK", "*, WATERMARK")
.replace(")WATERMARK", "), WATERMARK");
println!("\told definition: {}", table.definition);
println!("\tnew definition: {}", new_definition);
if !dry_run {
table.definition = new_definition;
table.insert(&meta_store).await?;
println!("table definition updated");
}
}
}
let mut sources = PbSource::list(&meta_store).await?;
for source in &mut sources {
if source.definition.contains("*WATERMARK") || source.definition.contains(")WATERMARK") {
println!("source: id={}, name={}: ", source.id, source.name);
let new_definition = source
.definition
.replace("*WATERMARK", "*, WATERMARK")
.replace(")WATERMARK", "), WATERMARK");
println!("\told definition: {}", source.definition);
println!("\tnew definition: {}", new_definition);
if !dry_run {
source.definition = new_definition;
source.insert(&meta_store).await?;
println!("source definition updated");
}
}
}

Ok(())
}
12 changes: 12 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ pub enum DebugCommands {
#[clap(long, value_delimiter = ',')]
dirty_fragment_ids: Vec<u32>,
},
/// Fix table or source definition caused by the incorrect watermark syntax.
/// Related: <https://github.com/risingwavelabs/risingwave/pull/18393>
FixCreateDefinition {
#[command(flatten)]
common: DebugCommon,

#[clap(long)]
dry_run: bool,
},
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -907,6 +916,9 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
table_id,
dirty_fragment_ids,
}) => cmd_impl::debug::fix_table_fragments(common, table_id, dirty_fragment_ids).await?,
Commands::Debug(DebugCommands::FixCreateDefinition { common, dry_run }) => {
cmd_impl::debug::fix_create_definition(common, dry_run).await?
}
Commands::Throttle(ThrottleCommands::Source(args)) => {
apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
}
Expand Down

0 comments on commit 13d4dd8

Please sign in to comment.