substrate/execute/
mod.rs

1//! Executor (e.g. LSF, Slurm) API.
2
3use std::any::Any;
4use std::path::PathBuf;
5use std::process::{Command, Stdio};
6use std::sync::Arc;
7
8use arcstr::ArcStr;
9use derive_builder::Builder;
10
11#[cfg(test)]
12mod tests;
13
14/// Job submission options.
15#[derive(Clone, Debug, Eq, PartialEq)]
16pub struct ExecOpts {
17    /// Number of CPUs to use.
18    pub cpus: Option<usize>,
19    /// Number of machines to use.
20    pub machines: usize,
21    /// Where to place logs.
22    pub logs: LogOutput,
23}
24
25impl Default for ExecOpts {
26    #[inline]
27    fn default() -> Self {
28        Self {
29            cpus: None,
30            machines: 1,
31            logs: LogOutput::Stdio,
32        }
33    }
34}
35
36#[derive(Clone, Debug, Eq, PartialEq, Default)]
37/// Where to place logs generated by a job.
38pub enum LogOutput {
39    /// Save logs to standard output and standard error.
40    #[default]
41    Stdio,
42    /// Save logs to a file.
43    File(PathBuf),
44}
45
46/// A job executor.
47pub trait Executor: Any + Send + Sync {
48    /// Execute the given command with the given options, waiting until the command completes.
49    fn execute(&self, command: Command, opts: ExecOpts) -> Result<(), crate::error::Error>;
50}
51
52/// Executes commands locally.
53#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Default)]
54pub struct LocalExecutor;
55
56impl Executor for LocalExecutor {
57    fn execute(&self, mut command: Command, opts: ExecOpts) -> Result<(), crate::error::Error> {
58        if let LogOutput::File(ref path) = opts.logs {
59            let fout = std::fs::File::create(path).map_err(Arc::new)?;
60            let ferr = fout.try_clone().map_err(Arc::new)?;
61            command.stdout(Stdio::from(fout)).stderr(Stdio::from(ferr));
62        }
63
64        let status = command.status().map_err(Arc::new)?;
65        if !status.success() {
66            return Err(crate::error::Error::CommandFailed(Arc::new(command)));
67        }
68
69        Ok(())
70    }
71}
72
73/// An executor for submitting jobs to an LSF cluster.
74#[derive(Clone, Debug, Eq, PartialEq, Builder)]
75pub struct LsfExecutor {
76    /// The command to use to submit jobs.
77    #[builder(setter(into), default = "::arcstr::literal!(\"bsub\")")]
78    bsub: ArcStr,
79    /// The queue to which jobs should be submitted.
80    #[builder(setter(into, strip_option))]
81    queue: Option<ArcStr>,
82}
83
84impl Default for LsfExecutor {
85    fn default() -> Self {
86        Self {
87            bsub: arcstr::literal!("bsub"),
88            queue: None,
89        }
90    }
91}
92
93impl LsfExecutor {
94    /// A builder for constructing an [`LsfExecutor`].
95    #[inline]
96    pub fn builder() -> LsfExecutorBuilder {
97        LsfExecutorBuilder::default()
98    }
99
100    /// Gets the LSF submission command.
101    pub fn command(&self, command: &Command, opts: ExecOpts) -> Command {
102        let mut submit = Command::new(&*self.bsub);
103
104        // -K makes bsub wait until the job completes
105        submit.arg("-K");
106        if let Some(ref queue) = self.queue {
107            submit.arg("-q").arg(queue.as_str());
108        }
109        if let Some(cpus) = opts.cpus {
110            submit.arg("-n").arg(cpus.to_string());
111        }
112        submit.arg(command.get_program());
113        for arg in command.get_args() {
114            submit.arg(arg);
115        }
116        if let Some(dir) = command.get_current_dir() {
117            submit.current_dir(dir);
118        }
119
120        for (key, val) in command.get_envs() {
121            match val {
122                None => submit.env_remove(key),
123                Some(val) => submit.env(key, val),
124            };
125        }
126
127        submit
128    }
129}
130
131impl Executor for LsfExecutor {
132    fn execute(&self, command: Command, opts: ExecOpts) -> Result<(), crate::error::Error> {
133        let mut submit = self.command(&command, opts);
134
135        let status = submit.status().map_err(Arc::new)?;
136        if !status.success() {
137            return Err(crate::error::Error::CommandFailed(Arc::new(submit)));
138        }
139
140        Ok(())
141    }
142}