From a66bdaee155d9012b1fd75bf31332df638d49857 Mon Sep 17 00:00:00 2001 From: Hristo Venev Date: Sat, 28 Sep 2019 16:04:53 +0000 Subject: Move builder inside manager. --- src/builder.rs | 171 -------------------- src/main.rs | 1 - src/manager.rs | 421 ------------------------------------------------ src/manager/builder.rs | 171 ++++++++++++++++++++ src/manager/mod.rs | 423 +++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 594 insertions(+), 593 deletions(-) delete mode 100644 src/builder.rs delete mode 100644 src/manager.rs create mode 100644 src/manager/builder.rs create mode 100644 src/manager/mod.rs diff --git a/src/builder.rs b/src/builder.rs deleted file mode 100644 index 9fc2291..0000000 --- a/src/builder.rs +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright 2019 Hristo Venev -// -// See COPYING. - -use crate::{config, model, proto}; -use std::collections::hash_map; -use std::{error, fmt}; - -#[derive(Debug)] -pub struct ConfigError { - pub url: String, - pub peer: model::Key, - pub important: bool, - err: &'static str, -} - -impl ConfigError { - fn new(err: &'static str, s: &config::Source, p: &proto::Peer, important: bool) -> Self { - Self { - url: s.url.clone(), - peer: p.public_key.clone(), - important, - err, - } - } -} - -impl error::Error for ConfigError {} -impl fmt::Display for ConfigError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "{} [{}] from [{}]: {}", - if self.important { - "Invalid peer" - } else { - "Misconfigured peer" - }, - self.peer, - self.url, - self.err - ) - } -} - -pub struct ConfigBuilder<'a> { - c: model::Config, - err: Vec, - public_key: model::Key, - pc: &'a config::PeerConfig, -} - -impl<'a> ConfigBuilder<'a> { - #[inline] - pub fn new(public_key: model::Key, pc: &'a config::PeerConfig) -> Self { - Self { - c: model::Config::default(), - err: vec![], - public_key, - pc, - } - } - - #[inline] - pub fn build(self) -> (model::Config, Vec) { - (self.c, self.err) - } - - #[inline] - pub fn add_server(&mut self, s: &config::Source, p: &proto::Server) { - if p.peer.public_key == self.public_key { - return; - } - - let pc = self.pc; - let ent = insert_peer(&mut self.c, &mut self.err, s, &p.peer, |ent| { - ent.psk = s.psk.clone(); - ent.endpoint = Some(p.endpoint.clone()); - ent.keepalive = pc.fix_keepalive(p.keepalive); - }); - - add_peer(&mut self.err, ent, s, &p.peer) - } - - #[inline] - pub fn add_road_warrior(&mut self, s: &config::Source, p: &proto::RoadWarrior) { - if p.peer.public_key == self.public_key { - self.err.push(ConfigError::new( - "The local peer cannot be a road warrior", - s, - &p.peer, - true, - )); - return; - } - - let ent = if p.base == self.public_key { - insert_peer(&mut self.c, &mut self.err, s, &p.peer, |_| {}) - } else if let Some(ent) = self.c.peers.get_mut(&p.base) { - ent - } else { - self.err - .push(ConfigError::new("Unknown base peer", s, &p.peer, true)); - return; - }; - add_peer(&mut self.err, ent, s, &p.peer) - } -} - -#[inline] -fn insert_peer<'b>( - c: &'b mut model::Config, - err: &mut Vec, - s: &config::Source, - p: &proto::Peer, - update: impl for<'c> FnOnce(&'c mut model::Peer) -> (), -) -> &'b mut model::Peer { - match c.peers.entry(p.public_key.clone()) { - hash_map::Entry::Occupied(ent) => { - err.push(ConfigError::new("Duplicate public key", s, p, true)); - ent.into_mut() - } - hash_map::Entry::Vacant(ent) => { - let ent = ent.insert(model::Peer { - endpoint: None, - psk: None, - keepalive: 0, - ipv4: vec![], - ipv6: vec![], - }); - update(ent); - ent - } - } -} - -fn add_peer( - err: &mut Vec, - ent: &mut model::Peer, - s: &config::Source, - p: &proto::Peer, -) { - let mut added = false; - let mut removed = false; - - for i in &p.ipv4 { - if s.ipv4.contains(i) { - ent.ipv4.push(*i); - added = true; - } else { - removed = true; - } - } - for i in &p.ipv6 { - if s.ipv6.contains(i) { - ent.ipv6.push(*i); - added = true; - } else { - removed = true; - } - } - - if removed { - let msg = if added { - "Some IPs removed" - } else { - "All IPs removed" - }; - err.push(ConfigError::new(msg, s, p, !added)); - } -} diff --git a/src/main.rs b/src/main.rs index 3d74674..4ae628e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,6 @@ use std::{fs, io}; #[cfg(feature = "toml")] use toml; -mod builder; mod config; mod manager; mod model; diff --git a/src/manager.rs b/src/manager.rs deleted file mode 100644 index 3f487bf..0000000 --- a/src/manager.rs +++ /dev/null @@ -1,421 +0,0 @@ -// Copyright 2019 Hristo Venev -// -// See COPYING. - -use crate::{builder, config, model, proto, wg}; -use std::ffi::{OsStr, OsString}; -#[cfg(unix)] -use std::os::unix::fs::OpenOptionsExt; -use std::path::{Path, PathBuf}; -use std::time::{Duration, Instant, SystemTime}; -use std::{fs, io}; - -struct Source { - name: String, - config: config::Source, - data: proto::Source, - next_update: Instant, - backoff: Option, -} - -struct Updater { - config: config::UpdateConfig, - cache_directory: Option, -} - -fn update_file(path: &Path, data: &[u8]) -> io::Result<()> { - let mut tmp_path = OsString::from(path); - tmp_path.push(".tmp"); - let tmp_path = PathBuf::from(tmp_path); - - let mut file = { - let mut file = fs::OpenOptions::new(); - file.append(true); - file.create_new(true); - #[cfg(unix)] - file.mode(0o0600); - file.open(&tmp_path)? - }; - - let r = io::Write::write_all(&mut file, data) - .and_then(|_| file.sync_data()) - .and_then(|_| fs::rename(&tmp_path, &path)); - - if r.is_err() { - fs::remove_file(&tmp_path).unwrap_or_else(|e2| { - eprintln!("<3>Failed to clean up [{}]: {}", tmp_path.display(), e2); - }); - } - r -} - -fn load_file(path: &Path) -> io::Result>> { - let mut file = match fs::File::open(&path) { - Ok(file) => file, - Err(e) => { - if e.kind() == io::ErrorKind::NotFound { - return Ok(None); - } - return Err(e); - } - }; - - let mut data = Vec::new(); - io::Read::read_to_end(&mut file, &mut data)?; - Ok(Some(data)) -} - -impl Updater { - fn cache_path(&self, s: &Source) -> Option { - if let Some(ref dir) = self.cache_directory { - let mut p = dir.clone(); - p.push(&s.name); - Some(p) - } else { - None - } - } - - fn cache_update(&self, src: &Source) { - let path = if let Some(path) = self.cache_path(src) { - path - } else { - return; - }; - - let data = serde_json::to_vec(&src.data).unwrap(); - match update_file(&path, &data) { - Ok(()) => {} - Err(e) => { - eprintln!("<4>Failed to cache [{}]: {}", &src.name, e); - } - } - } - - fn cache_load(&self, src: &mut Source) -> bool { - let path = if let Some(path) = self.cache_path(src) { - path - } else { - return false; - }; - - let data = match load_file(&path) { - Ok(Some(data)) => data, - Ok(None) => { - return false; - } - Err(e) => { - eprintln!("<3>Failed to read [{}] from cache: {}", &src.name, e); - return false; - } - }; - - let mut de = serde_json::Deserializer::from_slice(&data); - src.data = match serde::Deserialize::deserialize(&mut de) { - Ok(r) => r, - Err(e) => { - eprintln!("<3>Failed to load [{}] from cache: {}", &src.name, e); - return false; - } - }; - - true - } - - fn update(&self, src: &mut Source) -> (bool, Instant) { - let refresh = Duration::from_secs(u64::from(self.config.refresh_sec)); - - let r = fetch_source(&src.config.url); - let now = Instant::now(); - let r = match r { - Ok(r) => { - eprintln!("<6>Updated [{}]", &src.config.url); - src.data = r; - src.backoff = None; - src.next_update = now + refresh; - self.cache_update(src); - return (true, now); - } - Err(r) => r, - }; - - let b = src - .backoff - .unwrap_or_else(|| Duration::from_secs(10).min(refresh / 10)); - src.next_update = now + b; - src.backoff = Some((b + b / 3).min(refresh / 3)); - eprintln!( - "<3>Failed to update [{}], retrying after {:.1?}: {}", - &src.config.url, b, &r - ); - (false, now) - } -} - -pub struct Manager { - dev: wg::Device, - peer_config: config::PeerConfig, - sources: Vec, - current: model::Config, - runtime_directory: Option, - updater: Updater, -} - -impl Manager { - pub fn new(ifname: OsString, c: config::Config) -> io::Result { - let mut m = Self { - dev: wg::Device::new(ifname)?, - peer_config: c.peer_config, - sources: vec![], - current: model::Config::default(), - runtime_directory: c.runtime_directory, - updater: Updater { - config: c.update_config, - cache_directory: c.cache_directory, - }, - }; - - let _ = m.current_load(); - - for (name, cfg) in c.sources { - m.add_source(name, cfg)?; - } - - Ok(m) - } - - fn state_path(&self) -> Option { - let mut path = if let Some(ref path) = self.runtime_directory { - path.clone() - } else { - return None; - }; - path.push("state.json"); - Some(path) - } - - fn current_load(&mut self) -> bool { - let path = if let Some(path) = self.state_path() { - path - } else { - return false; - }; - - let data = match load_file(&path) { - Ok(Some(data)) => data, - Ok(None) => { - return false; - } - Err(e) => { - eprintln!("<3>Failed to read interface state: {}", e); - return false; - } - }; - - let mut de = serde_json::Deserializer::from_slice(&data); - match serde::Deserialize::deserialize(&mut de) { - Ok(c) => { - self.current = c; - true - } - Err(e) => { - eprintln!("<3>Failed to load interface state: {}", e); - false - } - } - } - - fn current_update(&mut self, c: &model::Config) { - let path = if let Some(path) = self.state_path() { - path - } else { - return; - }; - - let data = serde_json::to_vec(c).unwrap(); - match update_file(&path, &data) { - Ok(()) => {} - Err(e) => { - eprintln!("<3>Failed to persist interface state: {}", e); - } - } - } - - fn add_source(&mut self, name: String, config: config::Source) -> io::Result<()> { - let mut s = Source { - name, - config, - data: proto::Source::empty(), - next_update: Instant::now(), - backoff: None, - }; - - self.init_source(&mut s)?; - self.sources.push(s); - Ok(()) - } - - fn init_source(&mut self, s: &mut Source) -> io::Result<()> { - if self.updater.update(s).0 { - return Ok(()); - } - if self.updater.cache_load(s) { - return Ok(()); - } - if !s.config.required { - return Ok(()); - } - if self.updater.update(s).0 { - return Ok(()); - } - if self.updater.update(s).0 { - return Ok(()); - } - Err(io::Error::new( - io::ErrorKind::Other, - format!("Failed to update required source [{}]", &s.config.url), - )) - } - - fn make_config( - &self, - public_key: model::Key, - ts: SystemTime, - ) -> (model::Config, Vec, SystemTime) { - let mut t_cfg = ts + Duration::from_secs(1 << 20); - let mut sources: Vec<(&Source, &proto::SourceConfig)> = vec![]; - for src in &self.sources { - let sc = src - .data - .next - .as_ref() - .and_then(|next| { - if ts >= next.update_at { - Some(&next.config) - } else { - t_cfg = t_cfg.min(next.update_at); - None - } - }) - .unwrap_or(&src.data.config); - sources.push((src, sc)); - } - - let mut cfg = builder::ConfigBuilder::new(public_key, &self.peer_config); - - for (src, sc) in &sources { - for peer in &sc.servers { - cfg.add_server(&src.config, peer); - } - } - - for (src, sc) in &sources { - for peer in &sc.road_warriors { - cfg.add_road_warrior(&src.config, peer); - } - } - - let (cfg, errs) = cfg.build(); - (cfg, errs, t_cfg) - } - - fn refresh(&mut self) -> io::Result { - let refresh = Duration::from_secs(u64::from(self.updater.config.refresh_sec)); - let mut now = Instant::now(); - let mut t_refresh = now + refresh; - - for src in &mut self.sources { - if now >= src.next_update { - now = self.updater.update(src).1; - } - t_refresh = t_refresh.min(src.next_update); - } - - Ok(t_refresh) - } - - pub fn update(&mut self) -> io::Result { - let t_refresh = self.refresh()?; - - let public_key = self.dev.get_public_key()?; - let now = Instant::now(); - let sysnow = SystemTime::now(); - let (config, errors, t_cfg) = self.make_config(public_key, sysnow); - let time_to_cfg = t_cfg - .duration_since(sysnow) - .unwrap_or(Duration::from_secs(0)); - let t_cfg = now + time_to_cfg; - - if config != self.current { - eprintln!("<5>Applying configuration update"); - for err in &errors { - eprintln!("<{}>{}", if err.important { '4' } else { '5' }, err); - } - self.dev.apply_diff(&self.current, &config)?; - self.current_update(&config); - self.current = config; - } - - Ok(if t_cfg < t_refresh { - eprintln!("<6>Next configuration update after {:.1?}", time_to_cfg); - t_cfg - } else if t_refresh > now { - t_refresh - } else { - eprintln!("<4>Next refresh immediately?"); - now - }) - } -} - -pub fn fetch_source(url: &str) -> io::Result { - use std::env; - use std::process::{Command, Stdio}; - - let curl = match env::var_os("CURL") { - None => OsString::new(), - Some(v) => v, - }; - let mut proc = Command::new(if curl.is_empty() { - OsStr::new("curl") - } else { - curl.as_os_str() - }); - - proc.stdin(Stdio::null()); - proc.stdout(Stdio::piped()); - proc.stderr(Stdio::piped()); - proc.arg("-gsSfL"); - proc.arg("--fail-early"); - proc.arg("--max-time"); - proc.arg("10"); - proc.arg("--max-filesize"); - proc.arg("1M"); - proc.arg("--"); - proc.arg(url); - - let out = proc.output()?; - - if !out.status.success() { - let msg = String::from_utf8_lossy(&out.stderr); - let msg = msg.replace('\n', "; "); - return Err(io::Error::new(io::ErrorKind::Other, msg)); - } - - let mut de = serde_json::Deserializer::from_slice(&out.stdout); - let r = serde::Deserialize::deserialize(&mut de)?; - Ok(r) -} - -pub fn load_source(path: &OsStr) -> io::Result { - let mut data = Vec::new(); - { - use std::io::Read; - let mut f = fs::File::open(&path)?; - f.read_to_end(&mut data)?; - } - - let mut de = serde_json::Deserializer::from_slice(&data); - let r = serde::Deserialize::deserialize(&mut de)?; - Ok(r) -} diff --git a/src/manager/builder.rs b/src/manager/builder.rs new file mode 100644 index 0000000..9fc2291 --- /dev/null +++ b/src/manager/builder.rs @@ -0,0 +1,171 @@ +// Copyright 2019 Hristo Venev +// +// See COPYING. + +use crate::{config, model, proto}; +use std::collections::hash_map; +use std::{error, fmt}; + +#[derive(Debug)] +pub struct ConfigError { + pub url: String, + pub peer: model::Key, + pub important: bool, + err: &'static str, +} + +impl ConfigError { + fn new(err: &'static str, s: &config::Source, p: &proto::Peer, important: bool) -> Self { + Self { + url: s.url.clone(), + peer: p.public_key.clone(), + important, + err, + } + } +} + +impl error::Error for ConfigError {} +impl fmt::Display for ConfigError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{} [{}] from [{}]: {}", + if self.important { + "Invalid peer" + } else { + "Misconfigured peer" + }, + self.peer, + self.url, + self.err + ) + } +} + +pub struct ConfigBuilder<'a> { + c: model::Config, + err: Vec, + public_key: model::Key, + pc: &'a config::PeerConfig, +} + +impl<'a> ConfigBuilder<'a> { + #[inline] + pub fn new(public_key: model::Key, pc: &'a config::PeerConfig) -> Self { + Self { + c: model::Config::default(), + err: vec![], + public_key, + pc, + } + } + + #[inline] + pub fn build(self) -> (model::Config, Vec) { + (self.c, self.err) + } + + #[inline] + pub fn add_server(&mut self, s: &config::Source, p: &proto::Server) { + if p.peer.public_key == self.public_key { + return; + } + + let pc = self.pc; + let ent = insert_peer(&mut self.c, &mut self.err, s, &p.peer, |ent| { + ent.psk = s.psk.clone(); + ent.endpoint = Some(p.endpoint.clone()); + ent.keepalive = pc.fix_keepalive(p.keepalive); + }); + + add_peer(&mut self.err, ent, s, &p.peer) + } + + #[inline] + pub fn add_road_warrior(&mut self, s: &config::Source, p: &proto::RoadWarrior) { + if p.peer.public_key == self.public_key { + self.err.push(ConfigError::new( + "The local peer cannot be a road warrior", + s, + &p.peer, + true, + )); + return; + } + + let ent = if p.base == self.public_key { + insert_peer(&mut self.c, &mut self.err, s, &p.peer, |_| {}) + } else if let Some(ent) = self.c.peers.get_mut(&p.base) { + ent + } else { + self.err + .push(ConfigError::new("Unknown base peer", s, &p.peer, true)); + return; + }; + add_peer(&mut self.err, ent, s, &p.peer) + } +} + +#[inline] +fn insert_peer<'b>( + c: &'b mut model::Config, + err: &mut Vec, + s: &config::Source, + p: &proto::Peer, + update: impl for<'c> FnOnce(&'c mut model::Peer) -> (), +) -> &'b mut model::Peer { + match c.peers.entry(p.public_key.clone()) { + hash_map::Entry::Occupied(ent) => { + err.push(ConfigError::new("Duplicate public key", s, p, true)); + ent.into_mut() + } + hash_map::Entry::Vacant(ent) => { + let ent = ent.insert(model::Peer { + endpoint: None, + psk: None, + keepalive: 0, + ipv4: vec![], + ipv6: vec![], + }); + update(ent); + ent + } + } +} + +fn add_peer( + err: &mut Vec, + ent: &mut model::Peer, + s: &config::Source, + p: &proto::Peer, +) { + let mut added = false; + let mut removed = false; + + for i in &p.ipv4 { + if s.ipv4.contains(i) { + ent.ipv4.push(*i); + added = true; + } else { + removed = true; + } + } + for i in &p.ipv6 { + if s.ipv6.contains(i) { + ent.ipv6.push(*i); + added = true; + } else { + removed = true; + } + } + + if removed { + let msg = if added { + "Some IPs removed" + } else { + "All IPs removed" + }; + err.push(ConfigError::new(msg, s, p, !added)); + } +} diff --git a/src/manager/mod.rs b/src/manager/mod.rs new file mode 100644 index 0000000..8e8792d --- /dev/null +++ b/src/manager/mod.rs @@ -0,0 +1,423 @@ +// Copyright 2019 Hristo Venev +// +// See COPYING. + +use crate::{config, model, proto, wg}; +use std::ffi::{OsStr, OsString}; +#[cfg(unix)] +use std::os::unix::fs::OpenOptionsExt; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant, SystemTime}; +use std::{fs, io}; + +mod builder; + +struct Source { + name: String, + config: config::Source, + data: proto::Source, + next_update: Instant, + backoff: Option, +} + +struct Updater { + config: config::UpdateConfig, + cache_directory: Option, +} + +fn update_file(path: &Path, data: &[u8]) -> io::Result<()> { + let mut tmp_path = OsString::from(path); + tmp_path.push(".tmp"); + let tmp_path = PathBuf::from(tmp_path); + + let mut file = { + let mut file = fs::OpenOptions::new(); + file.append(true); + file.create_new(true); + #[cfg(unix)] + file.mode(0o0600); + file.open(&tmp_path)? + }; + + let r = io::Write::write_all(&mut file, data) + .and_then(|_| file.sync_data()) + .and_then(|_| fs::rename(&tmp_path, &path)); + + if r.is_err() { + fs::remove_file(&tmp_path).unwrap_or_else(|e2| { + eprintln!("<3>Failed to clean up [{}]: {}", tmp_path.display(), e2); + }); + } + r +} + +fn load_file(path: &Path) -> io::Result>> { + let mut file = match fs::File::open(&path) { + Ok(file) => file, + Err(e) => { + if e.kind() == io::ErrorKind::NotFound { + return Ok(None); + } + return Err(e); + } + }; + + let mut data = Vec::new(); + io::Read::read_to_end(&mut file, &mut data)?; + Ok(Some(data)) +} + +impl Updater { + fn cache_path(&self, s: &Source) -> Option { + if let Some(ref dir) = self.cache_directory { + let mut p = dir.clone(); + p.push(&s.name); + Some(p) + } else { + None + } + } + + fn cache_update(&self, src: &Source) { + let path = if let Some(path) = self.cache_path(src) { + path + } else { + return; + }; + + let data = serde_json::to_vec(&src.data).unwrap(); + match update_file(&path, &data) { + Ok(()) => {} + Err(e) => { + eprintln!("<4>Failed to cache [{}]: {}", &src.name, e); + } + } + } + + fn cache_load(&self, src: &mut Source) -> bool { + let path = if let Some(path) = self.cache_path(src) { + path + } else { + return false; + }; + + let data = match load_file(&path) { + Ok(Some(data)) => data, + Ok(None) => { + return false; + } + Err(e) => { + eprintln!("<3>Failed to read [{}] from cache: {}", &src.name, e); + return false; + } + }; + + let mut de = serde_json::Deserializer::from_slice(&data); + src.data = match serde::Deserialize::deserialize(&mut de) { + Ok(r) => r, + Err(e) => { + eprintln!("<3>Failed to load [{}] from cache: {}", &src.name, e); + return false; + } + }; + + true + } + + fn update(&self, src: &mut Source) -> (bool, Instant) { + let refresh = Duration::from_secs(u64::from(self.config.refresh_sec)); + + let r = fetch_source(&src.config.url); + let now = Instant::now(); + let r = match r { + Ok(r) => { + eprintln!("<6>Updated [{}]", &src.config.url); + src.data = r; + src.backoff = None; + src.next_update = now + refresh; + self.cache_update(src); + return (true, now); + } + Err(r) => r, + }; + + let b = src + .backoff + .unwrap_or_else(|| Duration::from_secs(10).min(refresh / 10)); + src.next_update = now + b; + src.backoff = Some((b + b / 3).min(refresh / 3)); + eprintln!( + "<3>Failed to update [{}], retrying after {:.1?}: {}", + &src.config.url, b, &r + ); + (false, now) + } +} + +pub struct Manager { + dev: wg::Device, + peer_config: config::PeerConfig, + sources: Vec, + current: model::Config, + runtime_directory: Option, + updater: Updater, +} + +impl Manager { + pub fn new(ifname: OsString, c: config::Config) -> io::Result { + let mut m = Self { + dev: wg::Device::new(ifname)?, + peer_config: c.peer_config, + sources: vec![], + current: model::Config::default(), + runtime_directory: c.runtime_directory, + updater: Updater { + config: c.update_config, + cache_directory: c.cache_directory, + }, + }; + + let _ = m.current_load(); + + for (name, cfg) in c.sources { + m.add_source(name, cfg)?; + } + + Ok(m) + } + + fn state_path(&self) -> Option { + let mut path = if let Some(ref path) = self.runtime_directory { + path.clone() + } else { + return None; + }; + path.push("state.json"); + Some(path) + } + + fn current_load(&mut self) -> bool { + let path = if let Some(path) = self.state_path() { + path + } else { + return false; + }; + + let data = match load_file(&path) { + Ok(Some(data)) => data, + Ok(None) => { + return false; + } + Err(e) => { + eprintln!("<3>Failed to read interface state: {}", e); + return false; + } + }; + + let mut de = serde_json::Deserializer::from_slice(&data); + match serde::Deserialize::deserialize(&mut de) { + Ok(c) => { + self.current = c; + true + } + Err(e) => { + eprintln!("<3>Failed to load interface state: {}", e); + false + } + } + } + + fn current_update(&mut self, c: &model::Config) { + let path = if let Some(path) = self.state_path() { + path + } else { + return; + }; + + let data = serde_json::to_vec(c).unwrap(); + match update_file(&path, &data) { + Ok(()) => {} + Err(e) => { + eprintln!("<3>Failed to persist interface state: {}", e); + } + } + } + + fn add_source(&mut self, name: String, config: config::Source) -> io::Result<()> { + let mut s = Source { + name, + config, + data: proto::Source::empty(), + next_update: Instant::now(), + backoff: None, + }; + + self.init_source(&mut s)?; + self.sources.push(s); + Ok(()) + } + + fn init_source(&mut self, s: &mut Source) -> io::Result<()> { + if self.updater.update(s).0 { + return Ok(()); + } + if self.updater.cache_load(s) { + return Ok(()); + } + if !s.config.required { + return Ok(()); + } + if self.updater.update(s).0 { + return Ok(()); + } + if self.updater.update(s).0 { + return Ok(()); + } + Err(io::Error::new( + io::ErrorKind::Other, + format!("Failed to update required source [{}]", &s.config.url), + )) + } + + fn make_config( + &self, + public_key: model::Key, + ts: SystemTime, + ) -> (model::Config, Vec, SystemTime) { + let mut t_cfg = ts + Duration::from_secs(1 << 20); + let mut sources: Vec<(&Source, &proto::SourceConfig)> = vec![]; + for src in &self.sources { + let sc = src + .data + .next + .as_ref() + .and_then(|next| { + if ts >= next.update_at { + Some(&next.config) + } else { + t_cfg = t_cfg.min(next.update_at); + None + } + }) + .unwrap_or(&src.data.config); + sources.push((src, sc)); + } + + let mut cfg = builder::ConfigBuilder::new(public_key, &self.peer_config); + + for (src, sc) in &sources { + for peer in &sc.servers { + cfg.add_server(&src.config, peer); + } + } + + for (src, sc) in &sources { + for peer in &sc.road_warriors { + cfg.add_road_warrior(&src.config, peer); + } + } + + let (cfg, errs) = cfg.build(); + (cfg, errs, t_cfg) + } + + fn refresh(&mut self) -> io::Result { + let refresh = Duration::from_secs(u64::from(self.updater.config.refresh_sec)); + let mut now = Instant::now(); + let mut t_refresh = now + refresh; + + for src in &mut self.sources { + if now >= src.next_update { + now = self.updater.update(src).1; + } + t_refresh = t_refresh.min(src.next_update); + } + + Ok(t_refresh) + } + + pub fn update(&mut self) -> io::Result { + let t_refresh = self.refresh()?; + + let public_key = self.dev.get_public_key()?; + let now = Instant::now(); + let sysnow = SystemTime::now(); + let (config, errors, t_cfg) = self.make_config(public_key, sysnow); + let time_to_cfg = t_cfg + .duration_since(sysnow) + .unwrap_or(Duration::from_secs(0)); + let t_cfg = now + time_to_cfg; + + if config != self.current { + eprintln!("<5>Applying configuration update"); + for err in &errors { + eprintln!("<{}>{}", if err.important { '4' } else { '5' }, err); + } + self.dev.apply_diff(&self.current, &config)?; + self.current_update(&config); + self.current = config; + } + + Ok(if t_cfg < t_refresh { + eprintln!("<6>Next configuration update after {:.1?}", time_to_cfg); + t_cfg + } else if t_refresh > now { + t_refresh + } else { + eprintln!("<4>Next refresh immediately?"); + now + }) + } +} + +pub fn fetch_source(url: &str) -> io::Result { + use std::env; + use std::process::{Command, Stdio}; + + let curl = match env::var_os("CURL") { + None => OsString::new(), + Some(v) => v, + }; + let mut proc = Command::new(if curl.is_empty() { + OsStr::new("curl") + } else { + curl.as_os_str() + }); + + proc.stdin(Stdio::null()); + proc.stdout(Stdio::piped()); + proc.stderr(Stdio::piped()); + proc.arg("-gsSfL"); + proc.arg("--fail-early"); + proc.arg("--max-time"); + proc.arg("10"); + proc.arg("--max-filesize"); + proc.arg("1M"); + proc.arg("--"); + proc.arg(url); + + let out = proc.output()?; + + if !out.status.success() { + let msg = String::from_utf8_lossy(&out.stderr); + let msg = msg.replace('\n', "; "); + return Err(io::Error::new(io::ErrorKind::Other, msg)); + } + + let mut de = serde_json::Deserializer::from_slice(&out.stdout); + let r = serde::Deserialize::deserialize(&mut de)?; + Ok(r) +} + +pub fn load_source(path: &OsStr) -> io::Result { + let mut data = Vec::new(); + { + use std::io::Read; + let mut f = fs::File::open(&path)?; + f.read_to_end(&mut data)?; + } + + let mut de = serde_json::Deserializer::from_slice(&data); + let r = serde::Deserialize::deserialize(&mut de)?; + Ok(r) +} -- cgit