From a66bdaee155d9012b1fd75bf31332df638d49857 Mon Sep 17 00:00:00 2001 From: Hristo Venev Date: Sat, 28 Sep 2019 16:04:53 +0000 Subject: Move builder inside manager. --- src/manager/builder.rs | 171 ++++++++++++++++++++ src/manager/mod.rs | 423 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 594 insertions(+) create mode 100644 src/manager/builder.rs create mode 100644 src/manager/mod.rs (limited to 'src/manager') diff --git a/src/manager/builder.rs b/src/manager/builder.rs new file mode 100644 index 0000000..9fc2291 --- /dev/null +++ b/src/manager/builder.rs @@ -0,0 +1,171 @@ +// Copyright 2019 Hristo Venev +// +// See COPYING. + +use crate::{config, model, proto}; +use std::collections::hash_map; +use std::{error, fmt}; + +#[derive(Debug)] +pub struct ConfigError { + pub url: String, + pub peer: model::Key, + pub important: bool, + err: &'static str, +} + +impl ConfigError { + fn new(err: &'static str, s: &config::Source, p: &proto::Peer, important: bool) -> Self { + Self { + url: s.url.clone(), + peer: p.public_key.clone(), + important, + err, + } + } +} + +impl error::Error for ConfigError {} +impl fmt::Display for ConfigError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{} [{}] from [{}]: {}", + if self.important { + "Invalid peer" + } else { + "Misconfigured peer" + }, + self.peer, + self.url, + self.err + ) + } +} + +pub struct ConfigBuilder<'a> { + c: model::Config, + err: Vec, + public_key: model::Key, + pc: &'a config::PeerConfig, +} + +impl<'a> ConfigBuilder<'a> { + #[inline] + pub fn new(public_key: model::Key, pc: &'a config::PeerConfig) -> Self { + Self { + c: model::Config::default(), + err: vec![], + public_key, + pc, + } + } + + #[inline] + pub fn build(self) -> (model::Config, Vec) { + (self.c, self.err) + } + + #[inline] + pub fn add_server(&mut self, s: &config::Source, p: &proto::Server) { + if p.peer.public_key == self.public_key { + return; + } + + let pc = self.pc; + let ent = insert_peer(&mut self.c, &mut self.err, s, &p.peer, |ent| { + ent.psk = s.psk.clone(); + ent.endpoint = Some(p.endpoint.clone()); + ent.keepalive = pc.fix_keepalive(p.keepalive); + }); + + add_peer(&mut self.err, ent, s, &p.peer) + } + + #[inline] + pub fn add_road_warrior(&mut self, s: &config::Source, p: &proto::RoadWarrior) { + if p.peer.public_key == self.public_key { + self.err.push(ConfigError::new( + "The local peer cannot be a road warrior", + s, + &p.peer, + true, + )); + return; + } + + let ent = if p.base == self.public_key { + insert_peer(&mut self.c, &mut self.err, s, &p.peer, |_| {}) + } else if let Some(ent) = self.c.peers.get_mut(&p.base) { + ent + } else { + self.err + .push(ConfigError::new("Unknown base peer", s, &p.peer, true)); + return; + }; + add_peer(&mut self.err, ent, s, &p.peer) + } +} + +#[inline] +fn insert_peer<'b>( + c: &'b mut model::Config, + err: &mut Vec, + s: &config::Source, + p: &proto::Peer, + update: impl for<'c> FnOnce(&'c mut model::Peer) -> (), +) -> &'b mut model::Peer { + match c.peers.entry(p.public_key.clone()) { + hash_map::Entry::Occupied(ent) => { + err.push(ConfigError::new("Duplicate public key", s, p, true)); + ent.into_mut() + } + hash_map::Entry::Vacant(ent) => { + let ent = ent.insert(model::Peer { + endpoint: None, + psk: None, + keepalive: 0, + ipv4: vec![], + ipv6: vec![], + }); + update(ent); + ent + } + } +} + +fn add_peer( + err: &mut Vec, + ent: &mut model::Peer, + s: &config::Source, + p: &proto::Peer, +) { + let mut added = false; + let mut removed = false; + + for i in &p.ipv4 { + if s.ipv4.contains(i) { + ent.ipv4.push(*i); + added = true; + } else { + removed = true; + } + } + for i in &p.ipv6 { + if s.ipv6.contains(i) { + ent.ipv6.push(*i); + added = true; + } else { + removed = true; + } + } + + if removed { + let msg = if added { + "Some IPs removed" + } else { + "All IPs removed" + }; + err.push(ConfigError::new(msg, s, p, !added)); + } +} diff --git a/src/manager/mod.rs b/src/manager/mod.rs new file mode 100644 index 0000000..8e8792d --- /dev/null +++ b/src/manager/mod.rs @@ -0,0 +1,423 @@ +// Copyright 2019 Hristo Venev +// +// See COPYING. + +use crate::{config, model, proto, wg}; +use std::ffi::{OsStr, OsString}; +#[cfg(unix)] +use std::os::unix::fs::OpenOptionsExt; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant, SystemTime}; +use std::{fs, io}; + +mod builder; + +struct Source { + name: String, + config: config::Source, + data: proto::Source, + next_update: Instant, + backoff: Option, +} + +struct Updater { + config: config::UpdateConfig, + cache_directory: Option, +} + +fn update_file(path: &Path, data: &[u8]) -> io::Result<()> { + let mut tmp_path = OsString::from(path); + tmp_path.push(".tmp"); + let tmp_path = PathBuf::from(tmp_path); + + let mut file = { + let mut file = fs::OpenOptions::new(); + file.append(true); + file.create_new(true); + #[cfg(unix)] + file.mode(0o0600); + file.open(&tmp_path)? + }; + + let r = io::Write::write_all(&mut file, data) + .and_then(|_| file.sync_data()) + .and_then(|_| fs::rename(&tmp_path, &path)); + + if r.is_err() { + fs::remove_file(&tmp_path).unwrap_or_else(|e2| { + eprintln!("<3>Failed to clean up [{}]: {}", tmp_path.display(), e2); + }); + } + r +} + +fn load_file(path: &Path) -> io::Result>> { + let mut file = match fs::File::open(&path) { + Ok(file) => file, + Err(e) => { + if e.kind() == io::ErrorKind::NotFound { + return Ok(None); + } + return Err(e); + } + }; + + let mut data = Vec::new(); + io::Read::read_to_end(&mut file, &mut data)?; + Ok(Some(data)) +} + +impl Updater { + fn cache_path(&self, s: &Source) -> Option { + if let Some(ref dir) = self.cache_directory { + let mut p = dir.clone(); + p.push(&s.name); + Some(p) + } else { + None + } + } + + fn cache_update(&self, src: &Source) { + let path = if let Some(path) = self.cache_path(src) { + path + } else { + return; + }; + + let data = serde_json::to_vec(&src.data).unwrap(); + match update_file(&path, &data) { + Ok(()) => {} + Err(e) => { + eprintln!("<4>Failed to cache [{}]: {}", &src.name, e); + } + } + } + + fn cache_load(&self, src: &mut Source) -> bool { + let path = if let Some(path) = self.cache_path(src) { + path + } else { + return false; + }; + + let data = match load_file(&path) { + Ok(Some(data)) => data, + Ok(None) => { + return false; + } + Err(e) => { + eprintln!("<3>Failed to read [{}] from cache: {}", &src.name, e); + return false; + } + }; + + let mut de = serde_json::Deserializer::from_slice(&data); + src.data = match serde::Deserialize::deserialize(&mut de) { + Ok(r) => r, + Err(e) => { + eprintln!("<3>Failed to load [{}] from cache: {}", &src.name, e); + return false; + } + }; + + true + } + + fn update(&self, src: &mut Source) -> (bool, Instant) { + let refresh = Duration::from_secs(u64::from(self.config.refresh_sec)); + + let r = fetch_source(&src.config.url); + let now = Instant::now(); + let r = match r { + Ok(r) => { + eprintln!("<6>Updated [{}]", &src.config.url); + src.data = r; + src.backoff = None; + src.next_update = now + refresh; + self.cache_update(src); + return (true, now); + } + Err(r) => r, + }; + + let b = src + .backoff + .unwrap_or_else(|| Duration::from_secs(10).min(refresh / 10)); + src.next_update = now + b; + src.backoff = Some((b + b / 3).min(refresh / 3)); + eprintln!( + "<3>Failed to update [{}], retrying after {:.1?}: {}", + &src.config.url, b, &r + ); + (false, now) + } +} + +pub struct Manager { + dev: wg::Device, + peer_config: config::PeerConfig, + sources: Vec, + current: model::Config, + runtime_directory: Option, + updater: Updater, +} + +impl Manager { + pub fn new(ifname: OsString, c: config::Config) -> io::Result { + let mut m = Self { + dev: wg::Device::new(ifname)?, + peer_config: c.peer_config, + sources: vec![], + current: model::Config::default(), + runtime_directory: c.runtime_directory, + updater: Updater { + config: c.update_config, + cache_directory: c.cache_directory, + }, + }; + + let _ = m.current_load(); + + for (name, cfg) in c.sources { + m.add_source(name, cfg)?; + } + + Ok(m) + } + + fn state_path(&self) -> Option { + let mut path = if let Some(ref path) = self.runtime_directory { + path.clone() + } else { + return None; + }; + path.push("state.json"); + Some(path) + } + + fn current_load(&mut self) -> bool { + let path = if let Some(path) = self.state_path() { + path + } else { + return false; + }; + + let data = match load_file(&path) { + Ok(Some(data)) => data, + Ok(None) => { + return false; + } + Err(e) => { + eprintln!("<3>Failed to read interface state: {}", e); + return false; + } + }; + + let mut de = serde_json::Deserializer::from_slice(&data); + match serde::Deserialize::deserialize(&mut de) { + Ok(c) => { + self.current = c; + true + } + Err(e) => { + eprintln!("<3>Failed to load interface state: {}", e); + false + } + } + } + + fn current_update(&mut self, c: &model::Config) { + let path = if let Some(path) = self.state_path() { + path + } else { + return; + }; + + let data = serde_json::to_vec(c).unwrap(); + match update_file(&path, &data) { + Ok(()) => {} + Err(e) => { + eprintln!("<3>Failed to persist interface state: {}", e); + } + } + } + + fn add_source(&mut self, name: String, config: config::Source) -> io::Result<()> { + let mut s = Source { + name, + config, + data: proto::Source::empty(), + next_update: Instant::now(), + backoff: None, + }; + + self.init_source(&mut s)?; + self.sources.push(s); + Ok(()) + } + + fn init_source(&mut self, s: &mut Source) -> io::Result<()> { + if self.updater.update(s).0 { + return Ok(()); + } + if self.updater.cache_load(s) { + return Ok(()); + } + if !s.config.required { + return Ok(()); + } + if self.updater.update(s).0 { + return Ok(()); + } + if self.updater.update(s).0 { + return Ok(()); + } + Err(io::Error::new( + io::ErrorKind::Other, + format!("Failed to update required source [{}]", &s.config.url), + )) + } + + fn make_config( + &self, + public_key: model::Key, + ts: SystemTime, + ) -> (model::Config, Vec, SystemTime) { + let mut t_cfg = ts + Duration::from_secs(1 << 20); + let mut sources: Vec<(&Source, &proto::SourceConfig)> = vec![]; + for src in &self.sources { + let sc = src + .data + .next + .as_ref() + .and_then(|next| { + if ts >= next.update_at { + Some(&next.config) + } else { + t_cfg = t_cfg.min(next.update_at); + None + } + }) + .unwrap_or(&src.data.config); + sources.push((src, sc)); + } + + let mut cfg = builder::ConfigBuilder::new(public_key, &self.peer_config); + + for (src, sc) in &sources { + for peer in &sc.servers { + cfg.add_server(&src.config, peer); + } + } + + for (src, sc) in &sources { + for peer in &sc.road_warriors { + cfg.add_road_warrior(&src.config, peer); + } + } + + let (cfg, errs) = cfg.build(); + (cfg, errs, t_cfg) + } + + fn refresh(&mut self) -> io::Result { + let refresh = Duration::from_secs(u64::from(self.updater.config.refresh_sec)); + let mut now = Instant::now(); + let mut t_refresh = now + refresh; + + for src in &mut self.sources { + if now >= src.next_update { + now = self.updater.update(src).1; + } + t_refresh = t_refresh.min(src.next_update); + } + + Ok(t_refresh) + } + + pub fn update(&mut self) -> io::Result { + let t_refresh = self.refresh()?; + + let public_key = self.dev.get_public_key()?; + let now = Instant::now(); + let sysnow = SystemTime::now(); + let (config, errors, t_cfg) = self.make_config(public_key, sysnow); + let time_to_cfg = t_cfg + .duration_since(sysnow) + .unwrap_or(Duration::from_secs(0)); + let t_cfg = now + time_to_cfg; + + if config != self.current { + eprintln!("<5>Applying configuration update"); + for err in &errors { + eprintln!("<{}>{}", if err.important { '4' } else { '5' }, err); + } + self.dev.apply_diff(&self.current, &config)?; + self.current_update(&config); + self.current = config; + } + + Ok(if t_cfg < t_refresh { + eprintln!("<6>Next configuration update after {:.1?}", time_to_cfg); + t_cfg + } else if t_refresh > now { + t_refresh + } else { + eprintln!("<4>Next refresh immediately?"); + now + }) + } +} + +pub fn fetch_source(url: &str) -> io::Result { + use std::env; + use std::process::{Command, Stdio}; + + let curl = match env::var_os("CURL") { + None => OsString::new(), + Some(v) => v, + }; + let mut proc = Command::new(if curl.is_empty() { + OsStr::new("curl") + } else { + curl.as_os_str() + }); + + proc.stdin(Stdio::null()); + proc.stdout(Stdio::piped()); + proc.stderr(Stdio::piped()); + proc.arg("-gsSfL"); + proc.arg("--fail-early"); + proc.arg("--max-time"); + proc.arg("10"); + proc.arg("--max-filesize"); + proc.arg("1M"); + proc.arg("--"); + proc.arg(url); + + let out = proc.output()?; + + if !out.status.success() { + let msg = String::from_utf8_lossy(&out.stderr); + let msg = msg.replace('\n', "; "); + return Err(io::Error::new(io::ErrorKind::Other, msg)); + } + + let mut de = serde_json::Deserializer::from_slice(&out.stdout); + let r = serde::Deserialize::deserialize(&mut de)?; + Ok(r) +} + +pub fn load_source(path: &OsStr) -> io::Result { + let mut data = Vec::new(); + { + use std::io::Read; + let mut f = fs::File::open(&path)?; + f.read_to_end(&mut data)?; + } + + let mut de = serde_json::Deserializer::from_slice(&data); + let r = serde::Deserialize::deserialize(&mut de)?; + Ok(r) +} -- cgit