From b06338ec1d282a762440ad72c935717e404badca Mon Sep 17 00:00:00 2001 From: Hristo Venev Date: Tue, 2 Apr 2019 15:56:06 +0300 Subject: Reorg, sources have names. --- src/manager.rs | 332 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 332 insertions(+) create mode 100644 src/manager.rs (limited to 'src/manager.rs') diff --git a/src/manager.rs b/src/manager.rs new file mode 100644 index 0000000..20d7f50 --- /dev/null +++ b/src/manager.rs @@ -0,0 +1,332 @@ +// Copyright 2019 Hristo Venev +// +// See COPYING. + +use std::{io, fs}; +use crate::{builder, config, model, proto, wg}; +use std::time::{Duration, Instant, SystemTime}; +use std::ffi::{OsStr, OsString}; +use std::path::PathBuf; + +struct Source { + name: String, + config: config::Source, + data: proto::Source, + next_update: Instant, + backoff: Option, +} + +struct Updater { + config: config::UpdateConfig, + cache_directory: Option, +} + +impl Updater { + fn cache_path(&self, s: &Source) -> Option { + if let Some(ref dir) = self.cache_directory { + let mut p = dir.clone(); + p.push(&s.name); + Some(p) + } else { + None + } + } + + fn cache_update(&self, src: &Source) -> io::Result { + let path = match self.cache_path(src) { + Some(path) => path, + None => { + return Ok(false); + } + }; + + let mut tmp_path = OsString::from(path.clone()); + tmp_path.push(".tmp"); + let tmp_path = PathBuf::from(tmp_path); + + let data = serde_json::to_vec(&src.data).unwrap(); + + let mut file = fs::File::create(&tmp_path)?; + match io::Write::write_all(&mut file, &data) + .and_then(|_| file.sync_data()) + .and_then(|_| fs::rename(&tmp_path, &path)) + { + Ok(()) => {} + Err(e) => { + fs::remove_file(&tmp_path).unwrap_or_else(|e2| { + eprintln!("<3>Failed to clean up [{}]: {}", tmp_path.display(), e2); + }); + return Err(e); + } + } + + Ok(true) + } + + fn cache_load(&self, src: &mut Source) -> bool { + let path = match self.cache_path(src) { + Some(path) => path, + None => { + return false; + } + }; + + let mut file = match fs::File::open(&path) { + Ok(file) => file, + Err(_) => { + return false; + } + }; + + let mut data = Vec::new(); + match io::Read::read_to_end(&mut file, &mut data) { + Ok(_) => {} + Err(e) => { + eprintln!("<3>Failed to read [{}] from cache: {}", src.config.url, e); + return false; + } + }; + + let mut de = serde_json::Deserializer::from_slice(&data); + src.data = match serde::Deserialize::deserialize(&mut de) { + Ok(r) => r, + Err(e) => { + eprintln!("<3>Failed to load [{}] from cache: {}", src.config.url, e); + return false; + } + }; + + true + } + + fn update(&self, src: &mut Source) -> (bool, Instant) { + let refresh = Duration::from_secs(u64::from(self.config.refresh_sec)); + + let r = fetch_source(&src.config.url); + let now = Instant::now(); + let r = match r { + Ok(r) => { + eprintln!("<6>Updated [{}]", &src.config.url); + src.data = r; + src.backoff = None; + src.next_update = now + refresh; + match self.cache_update(src) { + Ok(_) => {} + Err(e) => { + eprintln!("<4>Failed to cache [{}]: {}", &src.config.url, e); + } + } + return (true, now); + } + Err(r) => r, + }; + + let b = src.backoff.unwrap_or_else(|| Duration::from_secs(10).min(refresh / 10)); + src.next_update = now + b; + src.backoff = Some((b + b / 3).min(refresh / 3)); + eprintln!("<3>Failed to update [{}], retrying after {:.1?}: {}", &src.config.url, b, &r); + (false, now) + } + +} + +pub struct Manager { + dev: wg::Device, + peer_config: config::PeerConfig, + sources: Vec, + current: model::Config, + updater: Updater, +} + +impl Manager { + pub fn new(ifname: OsString, c: config::Config) -> io::Result { + let mut m = Manager { + dev: wg::Device::new(ifname)?, + peer_config: c.peer_config, + sources: vec![], + current: model::Config::default(), + updater: Updater { + config: c.update_config, + cache_directory: c.cache_directory, + }, + }; + + for (name, cfg) in c.sources.into_iter() { + m.add_source(name, cfg)?; + } + + Ok(m) + } + + fn add_source(&mut self, name: String, config: config::Source) -> io::Result<()> { + let mut s = Source { + name, + config, + data: proto::Source::empty(), + next_update: Instant::now(), + backoff: None, + }; + + self.init_source(&mut s)?; + self.sources.push(s); + Ok(()) + } + + fn init_source(&mut self, s: &mut Source) -> io::Result<()> { + if self.updater.update(s).0 { + return Ok(()); + } + if self.updater.cache_load(s) { + return Ok(()); + } + if !s.config.required { + return Ok(()); + } + if self.updater.update(s).0 { + return Ok(()); + } + if self.updater.update(s).0 { + return Ok(()); + } + Err(io::Error::new(io::ErrorKind::Other, format!("Failed to update required source [{}]", &s.config.url))) + } + + fn make_config( + &self, + public_key: model::Key, + ts: SystemTime, + ) -> (model::Config, Vec, SystemTime) { + let mut t_cfg = ts + Duration::from_secs(1 << 30); + let mut sources: Vec<(&Source, &proto::SourceConfig)> = vec![]; + for src in self.sources.iter() { + let sc = src.data.next + .as_ref() + .and_then(|next| { + if ts >= next.update_at { + Some(&next.config) + } else { + t_cfg = t_cfg.min(next.update_at); + None + } + }) + .unwrap_or(&src.data.config); + sources.push((src, sc)); + } + + let mut cfg = builder::ConfigBuilder::new(public_key, &self.peer_config); + + for (src, sc) in sources.iter() { + for peer in sc.servers.iter() { + cfg.add_server(&src.config, peer); + } + } + + for (src, sc) in sources.iter() { + for peer in sc.road_warriors.iter() { + cfg.add_road_warrior(&src.config, peer); + } + } + + let (cfg, errs) = cfg.build(); + (cfg, errs, t_cfg) + } + + fn refresh(&mut self) -> io::Result { + let refresh = Duration::from_secs(u64::from(self.updater.config.refresh_sec)); + let mut now = Instant::now(); + let mut t_refresh = now + refresh; + + for src in self.sources.iter_mut() { + if now >= src.next_update { + now = self.updater.update(src).1; + } + t_refresh = t_refresh.min(src.next_update); + } + + Ok(t_refresh) + } + + pub fn update(&mut self) -> io::Result { + let t_refresh = self.refresh()?; + + let public_key = self.dev.get_public_key()?; + let now = Instant::now(); + let sysnow = SystemTime::now(); + let (config, errors, t_cfg) = self.make_config(public_key, sysnow); + let time_to_cfg = t_cfg + .duration_since(sysnow) + .unwrap_or(Duration::from_secs(0)); + let t_cfg = now + time_to_cfg; + + if config != self.current { + eprintln!("<5>Applying configuration update"); + for err in errors.iter() { + eprintln!("<{}>{}", if err.important { '4' } else { '5' }, err); + } + self.dev.apply_diff(&self.current, &config)?; + self.current = config; + } + + Ok(if t_cfg < t_refresh { + eprintln!("<6>Next configuration update after {:.1?}", time_to_cfg); + t_cfg + } else if t_refresh > now { + t_refresh + } else { + eprintln!("<4>Next refresh immediately?"); + now + }) + } +} + +pub fn fetch_source(url: &str) -> io::Result { + use std::env; + use std::process::{Command, Stdio}; + + let curl = match env::var_os("CURL") { + None => OsString::new(), + Some(v) => v, + }; + let mut proc = Command::new(if curl.is_empty() { + OsStr::new("curl") + } else { + curl.as_os_str() + }); + + proc.stdin(Stdio::null()); + proc.stdout(Stdio::piped()); + proc.stderr(Stdio::piped()); + proc.arg("-gsSfL"); + proc.arg("--fail-early"); + proc.arg("--max-time"); + proc.arg("10"); + proc.arg("--max-filesize"); + proc.arg("1M"); + proc.arg("--"); + proc.arg(url); + + let out = proc.output()?; + + if !out.status.success() { + let msg = String::from_utf8_lossy(&out.stderr); + let msg = msg.replace('\n', "; "); + return Err(io::Error::new(io::ErrorKind::Other, msg)); + } + + let mut de = serde_json::Deserializer::from_slice(&out.stdout); + let r = serde::Deserialize::deserialize(&mut de)?; + Ok(r) +} + +pub fn load_source(path: &OsStr) -> io::Result { + let mut data = Vec::new(); + { + use std::io::Read; + let mut f = fs::File::open(&path)?; + f.read_to_end(&mut data)?; + } + + let mut de = serde_json::Deserializer::from_slice(&data); + let r = serde::Deserialize::deserialize(&mut de)?; + Ok(r) +} -- cgit