diff options
author | Hristo Venev <hristo@venev.name> | 2019-09-28 16:04:53 +0000 |
---|---|---|
committer | Hristo Venev <hristo@venev.name> | 2019-09-28 16:04:53 +0000 |
commit | a66bdaee155d9012b1fd75bf31332df638d49857 (patch) | |
tree | b75058baf614a6fac894123ba21143382fb5e546 /src/manager/mod.rs | |
parent | 5bcc0210923e8cb50fc97b318309d1b62c5cf62e (diff) |
Move builder inside manager.
Diffstat (limited to 'src/manager/mod.rs')
-rw-r--r-- | src/manager/mod.rs | 423 |
1 files changed, 423 insertions, 0 deletions
diff --git a/src/manager/mod.rs b/src/manager/mod.rs new file mode 100644 index 0000000..8e8792d --- /dev/null +++ b/src/manager/mod.rs @@ -0,0 +1,423 @@ +// Copyright 2019 Hristo Venev +// +// See COPYING. + +use crate::{config, model, proto, wg}; +use std::ffi::{OsStr, OsString}; +#[cfg(unix)] +use std::os::unix::fs::OpenOptionsExt; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant, SystemTime}; +use std::{fs, io}; + +mod builder; + +struct Source { + name: String, + config: config::Source, + data: proto::Source, + next_update: Instant, + backoff: Option<Duration>, +} + +struct Updater { + config: config::UpdateConfig, + cache_directory: Option<PathBuf>, +} + +fn update_file(path: &Path, data: &[u8]) -> io::Result<()> { + let mut tmp_path = OsString::from(path); + tmp_path.push(".tmp"); + let tmp_path = PathBuf::from(tmp_path); + + let mut file = { + let mut file = fs::OpenOptions::new(); + file.append(true); + file.create_new(true); + #[cfg(unix)] + file.mode(0o0600); + file.open(&tmp_path)? + }; + + let r = io::Write::write_all(&mut file, data) + .and_then(|_| file.sync_data()) + .and_then(|_| fs::rename(&tmp_path, &path)); + + if r.is_err() { + fs::remove_file(&tmp_path).unwrap_or_else(|e2| { + eprintln!("<3>Failed to clean up [{}]: {}", tmp_path.display(), e2); + }); + } + r +} + +fn load_file(path: &Path) -> io::Result<Option<Vec<u8>>> { + let mut file = match fs::File::open(&path) { + Ok(file) => file, + Err(e) => { + if e.kind() == io::ErrorKind::NotFound { + return Ok(None); + } + return Err(e); + } + }; + + let mut data = Vec::new(); + io::Read::read_to_end(&mut file, &mut data)?; + Ok(Some(data)) +} + +impl Updater { + fn cache_path(&self, s: &Source) -> Option<PathBuf> { + if let Some(ref dir) = self.cache_directory { + let mut p = dir.clone(); + p.push(&s.name); + Some(p) + } else { + None + } + } + + fn cache_update(&self, src: &Source) { + let path = if let Some(path) = self.cache_path(src) { + path + } else { + return; + }; + + let data = serde_json::to_vec(&src.data).unwrap(); + match update_file(&path, &data) { + Ok(()) => {} + Err(e) => { + eprintln!("<4>Failed to cache [{}]: {}", &src.name, e); + } + } + } + + fn cache_load(&self, src: &mut Source) -> bool { + let path = if let Some(path) = self.cache_path(src) { + path + } else { + return false; + }; + + let data = match load_file(&path) { + Ok(Some(data)) => data, + Ok(None) => { + return false; + } + Err(e) => { + eprintln!("<3>Failed to read [{}] from cache: {}", &src.name, e); + return false; + } + }; + + let mut de = serde_json::Deserializer::from_slice(&data); + src.data = match serde::Deserialize::deserialize(&mut de) { + Ok(r) => r, + Err(e) => { + eprintln!("<3>Failed to load [{}] from cache: {}", &src.name, e); + return false; + } + }; + + true + } + + fn update(&self, src: &mut Source) -> (bool, Instant) { + let refresh = Duration::from_secs(u64::from(self.config.refresh_sec)); + + let r = fetch_source(&src.config.url); + let now = Instant::now(); + let r = match r { + Ok(r) => { + eprintln!("<6>Updated [{}]", &src.config.url); + src.data = r; + src.backoff = None; + src.next_update = now + refresh; + self.cache_update(src); + return (true, now); + } + Err(r) => r, + }; + + let b = src + .backoff + .unwrap_or_else(|| Duration::from_secs(10).min(refresh / 10)); + src.next_update = now + b; + src.backoff = Some((b + b / 3).min(refresh / 3)); + eprintln!( + "<3>Failed to update [{}], retrying after {:.1?}: {}", + &src.config.url, b, &r + ); + (false, now) + } +} + +pub struct Manager { + dev: wg::Device, + peer_config: config::PeerConfig, + sources: Vec<Source>, + current: model::Config, + runtime_directory: Option<PathBuf>, + updater: Updater, +} + +impl Manager { + pub fn new(ifname: OsString, c: config::Config) -> io::Result<Self> { + let mut m = Self { + dev: wg::Device::new(ifname)?, + peer_config: c.peer_config, + sources: vec![], + current: model::Config::default(), + runtime_directory: c.runtime_directory, + updater: Updater { + config: c.update_config, + cache_directory: c.cache_directory, + }, + }; + + let _ = m.current_load(); + + for (name, cfg) in c.sources { + m.add_source(name, cfg)?; + } + + Ok(m) + } + + fn state_path(&self) -> Option<PathBuf> { + let mut path = if let Some(ref path) = self.runtime_directory { + path.clone() + } else { + return None; + }; + path.push("state.json"); + Some(path) + } + + fn current_load(&mut self) -> bool { + let path = if let Some(path) = self.state_path() { + path + } else { + return false; + }; + + let data = match load_file(&path) { + Ok(Some(data)) => data, + Ok(None) => { + return false; + } + Err(e) => { + eprintln!("<3>Failed to read interface state: {}", e); + return false; + } + }; + + let mut de = serde_json::Deserializer::from_slice(&data); + match serde::Deserialize::deserialize(&mut de) { + Ok(c) => { + self.current = c; + true + } + Err(e) => { + eprintln!("<3>Failed to load interface state: {}", e); + false + } + } + } + + fn current_update(&mut self, c: &model::Config) { + let path = if let Some(path) = self.state_path() { + path + } else { + return; + }; + + let data = serde_json::to_vec(c).unwrap(); + match update_file(&path, &data) { + Ok(()) => {} + Err(e) => { + eprintln!("<3>Failed to persist interface state: {}", e); + } + } + } + + fn add_source(&mut self, name: String, config: config::Source) -> io::Result<()> { + let mut s = Source { + name, + config, + data: proto::Source::empty(), + next_update: Instant::now(), + backoff: None, + }; + + self.init_source(&mut s)?; + self.sources.push(s); + Ok(()) + } + + fn init_source(&mut self, s: &mut Source) -> io::Result<()> { + if self.updater.update(s).0 { + return Ok(()); + } + if self.updater.cache_load(s) { + return Ok(()); + } + if !s.config.required { + return Ok(()); + } + if self.updater.update(s).0 { + return Ok(()); + } + if self.updater.update(s).0 { + return Ok(()); + } + Err(io::Error::new( + io::ErrorKind::Other, + format!("Failed to update required source [{}]", &s.config.url), + )) + } + + fn make_config( + &self, + public_key: model::Key, + ts: SystemTime, + ) -> (model::Config, Vec<builder::ConfigError>, SystemTime) { + let mut t_cfg = ts + Duration::from_secs(1 << 20); + let mut sources: Vec<(&Source, &proto::SourceConfig)> = vec![]; + for src in &self.sources { + let sc = src + .data + .next + .as_ref() + .and_then(|next| { + if ts >= next.update_at { + Some(&next.config) + } else { + t_cfg = t_cfg.min(next.update_at); + None + } + }) + .unwrap_or(&src.data.config); + sources.push((src, sc)); + } + + let mut cfg = builder::ConfigBuilder::new(public_key, &self.peer_config); + + for (src, sc) in &sources { + for peer in &sc.servers { + cfg.add_server(&src.config, peer); + } + } + + for (src, sc) in &sources { + for peer in &sc.road_warriors { + cfg.add_road_warrior(&src.config, peer); + } + } + + let (cfg, errs) = cfg.build(); + (cfg, errs, t_cfg) + } + + fn refresh(&mut self) -> io::Result<Instant> { + let refresh = Duration::from_secs(u64::from(self.updater.config.refresh_sec)); + let mut now = Instant::now(); + let mut t_refresh = now + refresh; + + for src in &mut self.sources { + if now >= src.next_update { + now = self.updater.update(src).1; + } + t_refresh = t_refresh.min(src.next_update); + } + + Ok(t_refresh) + } + + pub fn update(&mut self) -> io::Result<Instant> { + let t_refresh = self.refresh()?; + + let public_key = self.dev.get_public_key()?; + let now = Instant::now(); + let sysnow = SystemTime::now(); + let (config, errors, t_cfg) = self.make_config(public_key, sysnow); + let time_to_cfg = t_cfg + .duration_since(sysnow) + .unwrap_or(Duration::from_secs(0)); + let t_cfg = now + time_to_cfg; + + if config != self.current { + eprintln!("<5>Applying configuration update"); + for err in &errors { + eprintln!("<{}>{}", if err.important { '4' } else { '5' }, err); + } + self.dev.apply_diff(&self.current, &config)?; + self.current_update(&config); + self.current = config; + } + + Ok(if t_cfg < t_refresh { + eprintln!("<6>Next configuration update after {:.1?}", time_to_cfg); + t_cfg + } else if t_refresh > now { + t_refresh + } else { + eprintln!("<4>Next refresh immediately?"); + now + }) + } +} + +pub fn fetch_source(url: &str) -> io::Result<proto::Source> { + use std::env; + use std::process::{Command, Stdio}; + + let curl = match env::var_os("CURL") { + None => OsString::new(), + Some(v) => v, + }; + let mut proc = Command::new(if curl.is_empty() { + OsStr::new("curl") + } else { + curl.as_os_str() + }); + + proc.stdin(Stdio::null()); + proc.stdout(Stdio::piped()); + proc.stderr(Stdio::piped()); + proc.arg("-gsSfL"); + proc.arg("--fail-early"); + proc.arg("--max-time"); + proc.arg("10"); + proc.arg("--max-filesize"); + proc.arg("1M"); + proc.arg("--"); + proc.arg(url); + + let out = proc.output()?; + + if !out.status.success() { + let msg = String::from_utf8_lossy(&out.stderr); + let msg = msg.replace('\n', "; "); + return Err(io::Error::new(io::ErrorKind::Other, msg)); + } + + let mut de = serde_json::Deserializer::from_slice(&out.stdout); + let r = serde::Deserialize::deserialize(&mut de)?; + Ok(r) +} + +pub fn load_source(path: &OsStr) -> io::Result<proto::Source> { + let mut data = Vec::new(); + { + use std::io::Read; + let mut f = fs::File::open(&path)?; + f.read_to_end(&mut data)?; + } + + let mut de = serde_json::Deserializer::from_slice(&data); + let r = serde::Deserialize::deserialize(&mut de)?; + Ok(r) +} |