From a66bdaee155d9012b1fd75bf31332df638d49857 Mon Sep 17 00:00:00 2001 From: Hristo Venev Date: Sat, 28 Sep 2019 16:04:53 +0000 Subject: Move builder inside manager. --- src/manager.rs | 421 --------------------------------------------------------- 1 file changed, 421 deletions(-) delete mode 100644 src/manager.rs (limited to 'src/manager.rs') diff --git a/src/manager.rs b/src/manager.rs deleted file mode 100644 index 3f487bf..0000000 --- a/src/manager.rs +++ /dev/null @@ -1,421 +0,0 @@ -// Copyright 2019 Hristo Venev -// -// See COPYING. - -use crate::{builder, config, model, proto, wg}; -use std::ffi::{OsStr, OsString}; -#[cfg(unix)] -use std::os::unix::fs::OpenOptionsExt; -use std::path::{Path, PathBuf}; -use std::time::{Duration, Instant, SystemTime}; -use std::{fs, io}; - -struct Source { - name: String, - config: config::Source, - data: proto::Source, - next_update: Instant, - backoff: Option, -} - -struct Updater { - config: config::UpdateConfig, - cache_directory: Option, -} - -fn update_file(path: &Path, data: &[u8]) -> io::Result<()> { - let mut tmp_path = OsString::from(path); - tmp_path.push(".tmp"); - let tmp_path = PathBuf::from(tmp_path); - - let mut file = { - let mut file = fs::OpenOptions::new(); - file.append(true); - file.create_new(true); - #[cfg(unix)] - file.mode(0o0600); - file.open(&tmp_path)? - }; - - let r = io::Write::write_all(&mut file, data) - .and_then(|_| file.sync_data()) - .and_then(|_| fs::rename(&tmp_path, &path)); - - if r.is_err() { - fs::remove_file(&tmp_path).unwrap_or_else(|e2| { - eprintln!("<3>Failed to clean up [{}]: {}", tmp_path.display(), e2); - }); - } - r -} - -fn load_file(path: &Path) -> io::Result>> { - let mut file = match fs::File::open(&path) { - Ok(file) => file, - Err(e) => { - if e.kind() == io::ErrorKind::NotFound { - return Ok(None); - } - return Err(e); - } - }; - - let mut data = Vec::new(); - io::Read::read_to_end(&mut file, &mut data)?; - Ok(Some(data)) -} - -impl Updater { - fn cache_path(&self, s: &Source) -> Option { - if let Some(ref dir) = self.cache_directory { - let mut p = dir.clone(); - p.push(&s.name); - Some(p) - } else { - None - } - } - - fn cache_update(&self, src: &Source) { - let path = if let Some(path) = self.cache_path(src) { - path - } else { - return; - }; - - let data = serde_json::to_vec(&src.data).unwrap(); - match update_file(&path, &data) { - Ok(()) => {} - Err(e) => { - eprintln!("<4>Failed to cache [{}]: {}", &src.name, e); - } - } - } - - fn cache_load(&self, src: &mut Source) -> bool { - let path = if let Some(path) = self.cache_path(src) { - path - } else { - return false; - }; - - let data = match load_file(&path) { - Ok(Some(data)) => data, - Ok(None) => { - return false; - } - Err(e) => { - eprintln!("<3>Failed to read [{}] from cache: {}", &src.name, e); - return false; - } - }; - - let mut de = serde_json::Deserializer::from_slice(&data); - src.data = match serde::Deserialize::deserialize(&mut de) { - Ok(r) => r, - Err(e) => { - eprintln!("<3>Failed to load [{}] from cache: {}", &src.name, e); - return false; - } - }; - - true - } - - fn update(&self, src: &mut Source) -> (bool, Instant) { - let refresh = Duration::from_secs(u64::from(self.config.refresh_sec)); - - let r = fetch_source(&src.config.url); - let now = Instant::now(); - let r = match r { - Ok(r) => { - eprintln!("<6>Updated [{}]", &src.config.url); - src.data = r; - src.backoff = None; - src.next_update = now + refresh; - self.cache_update(src); - return (true, now); - } - Err(r) => r, - }; - - let b = src - .backoff - .unwrap_or_else(|| Duration::from_secs(10).min(refresh / 10)); - src.next_update = now + b; - src.backoff = Some((b + b / 3).min(refresh / 3)); - eprintln!( - "<3>Failed to update [{}], retrying after {:.1?}: {}", - &src.config.url, b, &r - ); - (false, now) - } -} - -pub struct Manager { - dev: wg::Device, - peer_config: config::PeerConfig, - sources: Vec, - current: model::Config, - runtime_directory: Option, - updater: Updater, -} - -impl Manager { - pub fn new(ifname: OsString, c: config::Config) -> io::Result { - let mut m = Self { - dev: wg::Device::new(ifname)?, - peer_config: c.peer_config, - sources: vec![], - current: model::Config::default(), - runtime_directory: c.runtime_directory, - updater: Updater { - config: c.update_config, - cache_directory: c.cache_directory, - }, - }; - - let _ = m.current_load(); - - for (name, cfg) in c.sources { - m.add_source(name, cfg)?; - } - - Ok(m) - } - - fn state_path(&self) -> Option { - let mut path = if let Some(ref path) = self.runtime_directory { - path.clone() - } else { - return None; - }; - path.push("state.json"); - Some(path) - } - - fn current_load(&mut self) -> bool { - let path = if let Some(path) = self.state_path() { - path - } else { - return false; - }; - - let data = match load_file(&path) { - Ok(Some(data)) => data, - Ok(None) => { - return false; - } - Err(e) => { - eprintln!("<3>Failed to read interface state: {}", e); - return false; - } - }; - - let mut de = serde_json::Deserializer::from_slice(&data); - match serde::Deserialize::deserialize(&mut de) { - Ok(c) => { - self.current = c; - true - } - Err(e) => { - eprintln!("<3>Failed to load interface state: {}", e); - false - } - } - } - - fn current_update(&mut self, c: &model::Config) { - let path = if let Some(path) = self.state_path() { - path - } else { - return; - }; - - let data = serde_json::to_vec(c).unwrap(); - match update_file(&path, &data) { - Ok(()) => {} - Err(e) => { - eprintln!("<3>Failed to persist interface state: {}", e); - } - } - } - - fn add_source(&mut self, name: String, config: config::Source) -> io::Result<()> { - let mut s = Source { - name, - config, - data: proto::Source::empty(), - next_update: Instant::now(), - backoff: None, - }; - - self.init_source(&mut s)?; - self.sources.push(s); - Ok(()) - } - - fn init_source(&mut self, s: &mut Source) -> io::Result<()> { - if self.updater.update(s).0 { - return Ok(()); - } - if self.updater.cache_load(s) { - return Ok(()); - } - if !s.config.required { - return Ok(()); - } - if self.updater.update(s).0 { - return Ok(()); - } - if self.updater.update(s).0 { - return Ok(()); - } - Err(io::Error::new( - io::ErrorKind::Other, - format!("Failed to update required source [{}]", &s.config.url), - )) - } - - fn make_config( - &self, - public_key: model::Key, - ts: SystemTime, - ) -> (model::Config, Vec, SystemTime) { - let mut t_cfg = ts + Duration::from_secs(1 << 20); - let mut sources: Vec<(&Source, &proto::SourceConfig)> = vec![]; - for src in &self.sources { - let sc = src - .data - .next - .as_ref() - .and_then(|next| { - if ts >= next.update_at { - Some(&next.config) - } else { - t_cfg = t_cfg.min(next.update_at); - None - } - }) - .unwrap_or(&src.data.config); - sources.push((src, sc)); - } - - let mut cfg = builder::ConfigBuilder::new(public_key, &self.peer_config); - - for (src, sc) in &sources { - for peer in &sc.servers { - cfg.add_server(&src.config, peer); - } - } - - for (src, sc) in &sources { - for peer in &sc.road_warriors { - cfg.add_road_warrior(&src.config, peer); - } - } - - let (cfg, errs) = cfg.build(); - (cfg, errs, t_cfg) - } - - fn refresh(&mut self) -> io::Result { - let refresh = Duration::from_secs(u64::from(self.updater.config.refresh_sec)); - let mut now = Instant::now(); - let mut t_refresh = now + refresh; - - for src in &mut self.sources { - if now >= src.next_update { - now = self.updater.update(src).1; - } - t_refresh = t_refresh.min(src.next_update); - } - - Ok(t_refresh) - } - - pub fn update(&mut self) -> io::Result { - let t_refresh = self.refresh()?; - - let public_key = self.dev.get_public_key()?; - let now = Instant::now(); - let sysnow = SystemTime::now(); - let (config, errors, t_cfg) = self.make_config(public_key, sysnow); - let time_to_cfg = t_cfg - .duration_since(sysnow) - .unwrap_or(Duration::from_secs(0)); - let t_cfg = now + time_to_cfg; - - if config != self.current { - eprintln!("<5>Applying configuration update"); - for err in &errors { - eprintln!("<{}>{}", if err.important { '4' } else { '5' }, err); - } - self.dev.apply_diff(&self.current, &config)?; - self.current_update(&config); - self.current = config; - } - - Ok(if t_cfg < t_refresh { - eprintln!("<6>Next configuration update after {:.1?}", time_to_cfg); - t_cfg - } else if t_refresh > now { - t_refresh - } else { - eprintln!("<4>Next refresh immediately?"); - now - }) - } -} - -pub fn fetch_source(url: &str) -> io::Result { - use std::env; - use std::process::{Command, Stdio}; - - let curl = match env::var_os("CURL") { - None => OsString::new(), - Some(v) => v, - }; - let mut proc = Command::new(if curl.is_empty() { - OsStr::new("curl") - } else { - curl.as_os_str() - }); - - proc.stdin(Stdio::null()); - proc.stdout(Stdio::piped()); - proc.stderr(Stdio::piped()); - proc.arg("-gsSfL"); - proc.arg("--fail-early"); - proc.arg("--max-time"); - proc.arg("10"); - proc.arg("--max-filesize"); - proc.arg("1M"); - proc.arg("--"); - proc.arg(url); - - let out = proc.output()?; - - if !out.status.success() { - let msg = String::from_utf8_lossy(&out.stderr); - let msg = msg.replace('\n', "; "); - return Err(io::Error::new(io::ErrorKind::Other, msg)); - } - - let mut de = serde_json::Deserializer::from_slice(&out.stdout); - let r = serde::Deserialize::deserialize(&mut de)?; - Ok(r) -} - -pub fn load_source(path: &OsStr) -> io::Result { - let mut data = Vec::new(); - { - use std::io::Read; - let mut f = fs::File::open(&path)?; - f.read_to_end(&mut data)?; - } - - let mut de = serde_json::Deserializer::from_slice(&data); - let r = serde::Deserialize::deserialize(&mut de)?; - Ok(r) -} -- cgit