1use std::any::Any;
4use std::path::PathBuf;
5use std::process::{Command, Stdio};
6use std::sync::Arc;
7
8use arcstr::ArcStr;
9use derive_builder::Builder;
10
11#[cfg(test)]
12mod tests;
13
14#[derive(Clone, Debug, Eq, PartialEq)]
16pub struct ExecOpts {
17 pub cpus: Option<usize>,
19 pub machines: usize,
21 pub logs: LogOutput,
23}
24
25impl Default for ExecOpts {
26 #[inline]
27 fn default() -> Self {
28 Self {
29 cpus: None,
30 machines: 1,
31 logs: LogOutput::Stdio,
32 }
33 }
34}
35
36#[derive(Clone, Debug, Eq, PartialEq, Default)]
37pub enum LogOutput {
39 #[default]
41 Stdio,
42 File(PathBuf),
44}
45
46pub trait Executor: Any + Send + Sync {
48 fn execute(&self, command: Command, opts: ExecOpts) -> Result<(), crate::error::Error>;
50}
51
52#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Default)]
54pub struct LocalExecutor;
55
56impl Executor for LocalExecutor {
57 fn execute(&self, mut command: Command, opts: ExecOpts) -> Result<(), crate::error::Error> {
58 if let LogOutput::File(ref path) = opts.logs {
59 let fout = std::fs::File::create(path).map_err(Arc::new)?;
60 let ferr = fout.try_clone().map_err(Arc::new)?;
61 command.stdout(Stdio::from(fout)).stderr(Stdio::from(ferr));
62 }
63
64 let status = command.status().map_err(Arc::new)?;
65 if !status.success() {
66 return Err(crate::error::Error::CommandFailed(Arc::new(command)));
67 }
68
69 Ok(())
70 }
71}
72
73#[derive(Clone, Debug, Eq, PartialEq, Builder)]
75pub struct LsfExecutor {
76 #[builder(setter(into), default = "::arcstr::literal!(\"bsub\")")]
78 bsub: ArcStr,
79 #[builder(setter(into, strip_option))]
81 queue: Option<ArcStr>,
82}
83
84impl Default for LsfExecutor {
85 fn default() -> Self {
86 Self {
87 bsub: arcstr::literal!("bsub"),
88 queue: None,
89 }
90 }
91}
92
93impl LsfExecutor {
94 #[inline]
96 pub fn builder() -> LsfExecutorBuilder {
97 LsfExecutorBuilder::default()
98 }
99
100 pub fn command(&self, command: &Command, opts: ExecOpts) -> Command {
102 let mut submit = Command::new(&*self.bsub);
103
104 submit.arg("-K");
106 if let Some(ref queue) = self.queue {
107 submit.arg("-q").arg(queue.as_str());
108 }
109 if let Some(cpus) = opts.cpus {
110 submit.arg("-n").arg(cpus.to_string());
111 }
112 submit.arg(command.get_program());
113 for arg in command.get_args() {
114 submit.arg(arg);
115 }
116 if let Some(dir) = command.get_current_dir() {
117 submit.current_dir(dir);
118 }
119
120 for (key, val) in command.get_envs() {
121 match val {
122 None => submit.env_remove(key),
123 Some(val) => submit.env(key, val),
124 };
125 }
126
127 submit
128 }
129}
130
131impl Executor for LsfExecutor {
132 fn execute(&self, command: Command, opts: ExecOpts) -> Result<(), crate::error::Error> {
133 let mut submit = self.command(&command, opts);
134
135 let status = submit.status().map_err(Arc::new)?;
136 if !status.success() {
137 return Err(crate::error::Error::CommandFailed(Arc::new(submit)));
138 }
139
140 Ok(())
141 }
142}