Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce 'pipes', allowing users to pipe data to and control plugins from the command line #3066

Merged
merged 32 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c755ab3
prototype - working with message from the cli
imsnif Dec 3, 2023
6e3ac24
prototype - pipe from the CLI to plugins
imsnif Dec 5, 2023
dfe90cc
prototype - pipe from the CLI to plugins and back again
imsnif Dec 5, 2023
dcdb31e
prototype - working with better cli interface
imsnif Dec 18, 2023
76710e7
prototype - working after removing unused stuff
imsnif Dec 19, 2023
806a72a
prototype - working with launching plugin if it is not launched, also…
imsnif Dec 22, 2023
54f75e1
refactor: change message to cli-message
imsnif Dec 22, 2023
6c9cf4b
prototype - allow plugins to send messages to each other
imsnif Dec 26, 2023
e1fb349
fix: allow cli messages to send plugin parameters (and implement back…
imsnif Dec 27, 2023
0aa5324
fix: use input_pipe_id to identify cli pipes instead of their message…
imsnif Dec 27, 2023
e23ba43
fix: come cleanups and add skip_cache parameter
imsnif Dec 27, 2023
124c675
fix: pipe/client-server communication robustness
imsnif Dec 28, 2023
22d3306
fix: leaking messages between plugins while loading
imsnif Dec 28, 2023
9ade600
feat: allow plugins to specify how a new plugin instance is launched …
imsnif Dec 29, 2023
ac31898
fix: add permissions
imsnif Dec 29, 2023
cf4a8a6
refactor: adjust cli api
imsnif Dec 31, 2023
a3959e0
fix: improve cli plugin loading error messages
imsnif Jan 1, 2024
a0c28ba
docs: cli pipe
imsnif Jan 1, 2024
5ce97e0
fix: take plugin configuration into account when messaging between pl…
imsnif Jan 5, 2024
3538ce5
refactor: pipe message protobuf interface
imsnif Jan 8, 2024
b9c3d89
refactor: update(event) -> pipe
imsnif Jan 9, 2024
241168a
refactor - rename CliMessage to CliPipe
imsnif Jan 10, 2024
9b45f91
fix: add is_private to pipes and change some naming
imsnif Jan 10, 2024
6a8d5f6
refactor - cli client
imsnif Jan 10, 2024
7dd6b4d
refactor: various cleanups
imsnif Jan 11, 2024
4915877
style(fmt): rustfmt
imsnif Jan 11, 2024
c3e8121
fix(pipes): backpressure across multiple plugins
imsnif Jan 16, 2024
463202b
style: some cleanups
imsnif Jan 16, 2024
981e92a
style(fmt): rustfmt
imsnif Jan 16, 2024
423cfe8
Merge branch 'main' into plugin-cli-interface
imsnif Jan 16, 2024
b1a096f
style: fix merge conflict mistake
imsnif Jan 16, 2024
ead3566
style(wording): clarify pipe permission
imsnif Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions default-plugins/fixture-plugin-for-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ struct State {
received_events: Vec<Event>,
received_payload: Option<String>,
configuration: BTreeMap<String, String>,
message_to_plugin_payload: Option<String>,
}

#[derive(Default, Serialize, Deserialize)]
Expand All @@ -34,9 +35,12 @@ impl<'de> ZellijWorker<'de> for TestWorker {
}
}

#[cfg(target_family = "wasm")]
register_plugin!(State);
#[cfg(target_family = "wasm")]
register_worker!(TestWorker, test_worker, TEST_WORKER);

#[cfg(target_family = "wasm")]
impl ZellijPlugin for State {
fn load(&mut self, configuration: BTreeMap<String, String>) {
request_permission(&[
Expand All @@ -49,6 +53,8 @@ impl ZellijPlugin for State {
PermissionType::OpenTerminalsOrPlugins,
PermissionType::WriteToStdin,
PermissionType::WebAccess,
PermissionType::ReadCliPipes,
PermissionType::MessageAndLaunchOtherPlugins,
]);
self.configuration = configuration;
subscribe(&[
Expand Down Expand Up @@ -295,10 +301,35 @@ impl ZellijPlugin for State {
self.received_events.push(event);
should_render
}
fn pipe(&mut self, pipe_message: PipeMessage) -> bool {
let input_pipe_id = match pipe_message.source {
PipeSource::Cli(id) => id.clone(),
PipeSource::Plugin(id) => format!("{}", id),
};
let name = pipe_message.name;
let payload = pipe_message.payload;
if name == "message_name" && payload == Some("message_payload".to_owned()) {
unblock_cli_pipe_input(&input_pipe_id);
} else if name == "message_name_block" {
block_cli_pipe_input(&input_pipe_id);
} else if name == "pipe_output" {
cli_pipe_output(&name, "this_is_my_output");
} else if name == "pipe_message_to_plugin" {
pipe_message_to_plugin(
MessageToPlugin::new("message_to_plugin").with_payload("my_cool_payload"),
);
} else if name == "message_to_plugin" {
self.message_to_plugin_payload = payload.clone();
}
let should_render = true;
should_render
}

fn render(&mut self, rows: usize, cols: usize) {
if let Some(payload) = self.received_payload.as_ref() {
println!("Payload from worker: {:?}", payload);
} else if let Some(payload) = self.message_to_plugin_payload.take() {
println!("Payload from self: {:?}", payload);
} else {
println!(
"Rows: {:?}, Cols: {:?}, Received events: {:?}",
Expand Down
25 changes: 25 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,31 @@ fn main() {
commands::convert_old_theme_file(old_theme_file);
std::process::exit(0);
}
if let Some(Command::Sessions(Sessions::Pipe {
name,
payload,
args,
plugin,
plugin_configuration,
})) = opts.command
{
let command_cli_action = CliAction::Pipe {
name,
payload,
args,
plugin,
plugin_configuration,

force_launch_plugin: false,
skip_plugin_cache: false,
floating_plugin: None,
in_place_plugin: None,
plugin_cwd: None,
plugin_title: None,
};
commands::send_action_to_session(command_cli_action, opts.session, config);
std::process::exit(0);
}
}

if let Some(Command::Sessions(Sessions::ListSessions {
Expand Down
181 changes: 177 additions & 4 deletions zellij-client/src/cli_client.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
//! The `[cli_client]` is used to attach to a running server session
//! and dispatch actions, that are specified through the command line.
use std::collections::BTreeMap;
use std::io::BufRead;
use std::process;
use std::{fs, path::PathBuf};

use crate::os_input_output::ClientOsApi;
use zellij_utils::{
errors::prelude::*,
input::actions::Action,
ipc::{ClientToServerMsg, ServerToClientMsg},
ipc::{ClientToServerMsg, ExitReason, ServerToClientMsg},
uuid::Uuid,
};

pub fn start_cli_client(os_input: Box<dyn ClientOsApi>, session_name: &str, actions: Vec<Action>) {
pub fn start_cli_client(
mut os_input: Box<dyn ClientOsApi>,
session_name: &str,
actions: Vec<Action>,
) {
let zellij_ipc_pipe: PathBuf = {
let mut sock_dir = zellij_utils::consts::ZELLIJ_SOCK_DIR.clone();
fs::create_dir_all(&sock_dir).unwrap();
Expand All @@ -21,10 +29,166 @@ pub fn start_cli_client(os_input: Box<dyn ClientOsApi>, session_name: &str, acti
let pane_id = os_input
.env_variable("ZELLIJ_PANE_ID")
.and_then(|e| e.trim().parse().ok());

for action in actions {
let msg = ClientToServerMsg::Action(action, pane_id, None);
os_input.send_to_server(msg);
match action {
Action::CliPipe {
pipe_id,
name,
payload,
plugin,
args,
configuration,
launch_new,
skip_cache,
floating,
in_place,
cwd,
pane_title,
} => {
pipe_client(
&mut os_input,
pipe_id,
name,
payload,
plugin,
args,
configuration,
launch_new,
skip_cache,
floating,
in_place,
pane_id,
cwd,
pane_title,
);
},
action => {
single_message_client(&mut os_input, action, pane_id);
},
}
}
}

fn pipe_client(
os_input: &mut Box<dyn ClientOsApi>,
pipe_id: String,
mut name: Option<String>,
payload: Option<String>,
plugin: Option<String>,
args: Option<BTreeMap<String, String>>,
mut configuration: Option<BTreeMap<String, String>>,
launch_new: bool,
skip_cache: bool,
floating: Option<bool>,
in_place: Option<bool>,
pane_id: Option<u32>,
cwd: Option<PathBuf>,
pane_title: Option<String>,
) {
let mut stdin = os_input.get_stdin_reader();
let name = name.take().or_else(|| Some(Uuid::new_v4().to_string()));
if launch_new {
// we do this to make sure the plugin is unique (has a unique configuration parameter) so
// that a new one would be launched, but we'll still send it to the same instance rather
// than launching a new one in every iteration of the loop
configuration
.get_or_insert_with(BTreeMap::new)
.insert("_zellij_id".to_owned(), Uuid::new_v4().to_string());
}
let create_msg = |payload: Option<String>| -> ClientToServerMsg {
ClientToServerMsg::Action(
Action::CliPipe {
pipe_id: pipe_id.clone(),
name: name.clone(),
payload,
args: args.clone(),
plugin: plugin.clone(),
configuration: configuration.clone(),
floating,
in_place,
launch_new,
skip_cache,
cwd: cwd.clone(),
pane_title: pane_title.clone(),
},
pane_id,
None,
)
};
loop {
if payload.is_some() {
// we got payload from the command line, we should use it and not wait for more
let msg = create_msg(payload);
os_input.send_to_server(msg);
break;
}
// we didn't get payload from the command line, meaning we listen on STDIN because this
// signifies the user is about to pipe more (eg. cat my-large-file | zellij pipe ...)
let mut buffer = String::new();
let _ = stdin.read_line(&mut buffer);
if buffer.is_empty() {
// end of pipe, send an empty message down the pipe
let msg = create_msg(None);
os_input.send_to_server(msg);
break;
} else {
// we've got data! send it down the pipe (most common)
let msg = create_msg(Some(buffer));
os_input.send_to_server(msg);
}
loop {
// wait for a response and act accordingly
match os_input.recv_from_server() {
Some((ServerToClientMsg::UnblockCliPipeInput(pipe_name), _)) => {
// unblock this pipe, meaning we need to stop waiting for a response and read
// once more from STDIN
if pipe_name == pipe_id {
break;
}
},
Some((ServerToClientMsg::CliPipeOutput(pipe_name, output), _)) => {
// send data to STDOUT, this *does not* mean we need to unblock the input
let err_context = "Failed to write to stdout";
if pipe_name == pipe_id {
let mut stdout = os_input.get_stdout_writer();
stdout
.write_all(output.as_bytes())
.context(err_context)
.non_fatal();
stdout.flush().context(err_context).non_fatal();
}
},
Some((ServerToClientMsg::Log(log_lines), _)) => {
log_lines.iter().for_each(|line| println!("{line}"));
process::exit(0);
},
Some((ServerToClientMsg::LogError(log_lines), _)) => {
log_lines.iter().for_each(|line| eprintln!("{line}"));
process::exit(2);
},
Some((ServerToClientMsg::Exit(exit_reason), _)) => match exit_reason {
ExitReason::Error(e) => {
eprintln!("{}", e);
process::exit(2);
},
_ => {
process::exit(0);
},
},
_ => {},
}
}
}
}

fn single_message_client(
os_input: &mut Box<dyn ClientOsApi>,
action: Action,
pane_id: Option<u32>,
) {
let msg = ClientToServerMsg::Action(action, pane_id, None);
os_input.send_to_server(msg);
loop {
match os_input.recv_from_server() {
Some((ServerToClientMsg::UnblockInputThread, _)) => {
Expand All @@ -39,6 +203,15 @@ pub fn start_cli_client(os_input: Box<dyn ClientOsApi>, session_name: &str, acti
log_lines.iter().for_each(|line| eprintln!("{line}"));
process::exit(2);
},
Some((ServerToClientMsg::Exit(exit_reason), _)) => match exit_reason {
ExitReason::Error(e) => {
eprintln!("{}", e);
process::exit(2);
},
_ => {
process::exit(0);
},
},
_ => {},
}
}
Expand Down
10 changes: 10 additions & 0 deletions zellij-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub(crate) enum ClientInstruction {
LogError(Vec<String>),
SwitchSession(ConnectToSession),
SetSynchronizedOutput(Option<SyncOutput>),
UnblockCliPipeInput(String), // String -> pipe name
CliPipeOutput(String, String), // String -> pipe name, String -> output
}

impl From<ServerToClientMsg> for ClientInstruction {
Expand All @@ -67,6 +69,12 @@ impl From<ServerToClientMsg> for ClientInstruction {
ServerToClientMsg::SwitchSession(connect_to_session) => {
ClientInstruction::SwitchSession(connect_to_session)
},
ServerToClientMsg::UnblockCliPipeInput(pipe_name) => {
ClientInstruction::UnblockCliPipeInput(pipe_name)
},
ServerToClientMsg::CliPipeOutput(pipe_name, output) => {
ClientInstruction::CliPipeOutput(pipe_name, output)
},
}
}
}
Expand All @@ -87,6 +95,8 @@ impl From<&ClientInstruction> for ClientContext {
ClientInstruction::DoneParsingStdinQuery => ClientContext::DoneParsingStdinQuery,
ClientInstruction::SwitchSession(..) => ClientContext::SwitchSession,
ClientInstruction::SetSynchronizedOutput(..) => ClientContext::SetSynchronisedOutput,
ClientInstruction::UnblockCliPipeInput(..) => ClientContext::UnblockCliPipeInput,
ClientInstruction::CliPipeOutput(..) => ClientContext::CliPipeOutput,
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions zellij-client/src/os_input_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ pub trait ClientOsApi: Send + Sync {
fn unset_raw_mode(&self, fd: RawFd) -> Result<(), nix::Error>;
/// Returns the writer that allows writing to standard output.
fn get_stdout_writer(&self) -> Box<dyn io::Write>;
fn get_stdin_reader(&self) -> Box<dyn io::Read>;
/// Returns a BufReader that allows to read from STDIN line by line, also locks STDIN
fn get_stdin_reader(&self) -> Box<dyn io::BufRead>;
fn update_session_name(&mut self, new_session_name: String);
/// Returns the raw contents of standard input.
fn read_from_stdin(&mut self) -> Result<Vec<u8>, &'static str>;
Expand Down Expand Up @@ -186,9 +187,10 @@ impl ClientOsApi for ClientOsInputOutput {
let stdout = ::std::io::stdout();
Box::new(stdout)
}
fn get_stdin_reader(&self) -> Box<dyn io::Read> {

fn get_stdin_reader(&self) -> Box<dyn io::BufRead> {
let stdin = ::std::io::stdin();
Box::new(stdin)
Box::new(stdin.lock())
}

fn send_to_server(&self, msg: ClientToServerMsg) {
Expand Down
4 changes: 2 additions & 2 deletions zellij-client/src/unit/stdin_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ impl ClientOsApi for FakeClientOsApi {
let fake_stdout_writer = FakeStdoutWriter::new(self.stdout_buffer.clone());
Box::new(fake_stdout_writer)
}
fn get_stdin_reader(&self) -> Box<dyn io::Read> {
fn get_stdin_reader(&self) -> Box<dyn io::BufRead> {
unimplemented!()
}
fn update_session_name(&mut self, new_session_name: String) {}
fn update_session_name(&mut self, _new_session_name: String) {}
fn read_from_stdin(&mut self) -> Result<Vec<u8>, &'static str> {
Ok(self.stdin_buffer.drain(..).collect())
}
Expand Down
Loading