aboutsummaryrefslogtreecommitdiff
path: root/src/manager
diff options
context:
space:
mode:
Diffstat (limited to 'src/manager')
-rw-r--r--src/manager/builder.rs171
-rw-r--r--src/manager/mod.rs423
2 files changed, 594 insertions, 0 deletions
diff --git a/src/manager/builder.rs b/src/manager/builder.rs
new file mode 100644
index 0000000..9fc2291
--- /dev/null
+++ b/src/manager/builder.rs
@@ -0,0 +1,171 @@
+// Copyright 2019 Hristo Venev
+//
+// See COPYING.
+
+use crate::{config, model, proto};
+use std::collections::hash_map;
+use std::{error, fmt};
+
+#[derive(Debug)]
+pub struct ConfigError {
+ pub url: String,
+ pub peer: model::Key,
+ pub important: bool,
+ err: &'static str,
+}
+
+impl ConfigError {
+ fn new(err: &'static str, s: &config::Source, p: &proto::Peer, important: bool) -> Self {
+ Self {
+ url: s.url.clone(),
+ peer: p.public_key.clone(),
+ important,
+ err,
+ }
+ }
+}
+
+impl error::Error for ConfigError {}
+impl fmt::Display for ConfigError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "{} [{}] from [{}]: {}",
+ if self.important {
+ "Invalid peer"
+ } else {
+ "Misconfigured peer"
+ },
+ self.peer,
+ self.url,
+ self.err
+ )
+ }
+}
+
+pub struct ConfigBuilder<'a> {
+ c: model::Config,
+ err: Vec<ConfigError>,
+ public_key: model::Key,
+ pc: &'a config::PeerConfig,
+}
+
+impl<'a> ConfigBuilder<'a> {
+ #[inline]
+ pub fn new(public_key: model::Key, pc: &'a config::PeerConfig) -> Self {
+ Self {
+ c: model::Config::default(),
+ err: vec![],
+ public_key,
+ pc,
+ }
+ }
+
+ #[inline]
+ pub fn build(self) -> (model::Config, Vec<ConfigError>) {
+ (self.c, self.err)
+ }
+
+ #[inline]
+ pub fn add_server(&mut self, s: &config::Source, p: &proto::Server) {
+ if p.peer.public_key == self.public_key {
+ return;
+ }
+
+ let pc = self.pc;
+ let ent = insert_peer(&mut self.c, &mut self.err, s, &p.peer, |ent| {
+ ent.psk = s.psk.clone();
+ ent.endpoint = Some(p.endpoint.clone());
+ ent.keepalive = pc.fix_keepalive(p.keepalive);
+ });
+
+ add_peer(&mut self.err, ent, s, &p.peer)
+ }
+
+ #[inline]
+ pub fn add_road_warrior(&mut self, s: &config::Source, p: &proto::RoadWarrior) {
+ if p.peer.public_key == self.public_key {
+ self.err.push(ConfigError::new(
+ "The local peer cannot be a road warrior",
+ s,
+ &p.peer,
+ true,
+ ));
+ return;
+ }
+
+ let ent = if p.base == self.public_key {
+ insert_peer(&mut self.c, &mut self.err, s, &p.peer, |_| {})
+ } else if let Some(ent) = self.c.peers.get_mut(&p.base) {
+ ent
+ } else {
+ self.err
+ .push(ConfigError::new("Unknown base peer", s, &p.peer, true));
+ return;
+ };
+ add_peer(&mut self.err, ent, s, &p.peer)
+ }
+}
+
+#[inline]
+fn insert_peer<'b>(
+ c: &'b mut model::Config,
+ err: &mut Vec<ConfigError>,
+ s: &config::Source,
+ p: &proto::Peer,
+ update: impl for<'c> FnOnce(&'c mut model::Peer) -> (),
+) -> &'b mut model::Peer {
+ match c.peers.entry(p.public_key.clone()) {
+ hash_map::Entry::Occupied(ent) => {
+ err.push(ConfigError::new("Duplicate public key", s, p, true));
+ ent.into_mut()
+ }
+ hash_map::Entry::Vacant(ent) => {
+ let ent = ent.insert(model::Peer {
+ endpoint: None,
+ psk: None,
+ keepalive: 0,
+ ipv4: vec![],
+ ipv6: vec![],
+ });
+ update(ent);
+ ent
+ }
+ }
+}
+
+fn add_peer(
+ err: &mut Vec<ConfigError>,
+ ent: &mut model::Peer,
+ s: &config::Source,
+ p: &proto::Peer,
+) {
+ let mut added = false;
+ let mut removed = false;
+
+ for i in &p.ipv4 {
+ if s.ipv4.contains(i) {
+ ent.ipv4.push(*i);
+ added = true;
+ } else {
+ removed = true;
+ }
+ }
+ for i in &p.ipv6 {
+ if s.ipv6.contains(i) {
+ ent.ipv6.push(*i);
+ added = true;
+ } else {
+ removed = true;
+ }
+ }
+
+ if removed {
+ let msg = if added {
+ "Some IPs removed"
+ } else {
+ "All IPs removed"
+ };
+ err.push(ConfigError::new(msg, s, p, !added));
+ }
+}
diff --git a/src/manager/mod.rs b/src/manager/mod.rs
new file mode 100644
index 0000000..8e8792d
--- /dev/null
+++ b/src/manager/mod.rs
@@ -0,0 +1,423 @@
+// Copyright 2019 Hristo Venev
+//
+// See COPYING.
+
+use crate::{config, model, proto, wg};
+use std::ffi::{OsStr, OsString};
+#[cfg(unix)]
+use std::os::unix::fs::OpenOptionsExt;
+use std::path::{Path, PathBuf};
+use std::time::{Duration, Instant, SystemTime};
+use std::{fs, io};
+
+mod builder;
+
+struct Source {
+ name: String,
+ config: config::Source,
+ data: proto::Source,
+ next_update: Instant,
+ backoff: Option<Duration>,
+}
+
+struct Updater {
+ config: config::UpdateConfig,
+ cache_directory: Option<PathBuf>,
+}
+
+fn update_file(path: &Path, data: &[u8]) -> io::Result<()> {
+ let mut tmp_path = OsString::from(path);
+ tmp_path.push(".tmp");
+ let tmp_path = PathBuf::from(tmp_path);
+
+ let mut file = {
+ let mut file = fs::OpenOptions::new();
+ file.append(true);
+ file.create_new(true);
+ #[cfg(unix)]
+ file.mode(0o0600);
+ file.open(&tmp_path)?
+ };
+
+ let r = io::Write::write_all(&mut file, data)
+ .and_then(|_| file.sync_data())
+ .and_then(|_| fs::rename(&tmp_path, &path));
+
+ if r.is_err() {
+ fs::remove_file(&tmp_path).unwrap_or_else(|e2| {
+ eprintln!("<3>Failed to clean up [{}]: {}", tmp_path.display(), e2);
+ });
+ }
+ r
+}
+
+fn load_file(path: &Path) -> io::Result<Option<Vec<u8>>> {
+ let mut file = match fs::File::open(&path) {
+ Ok(file) => file,
+ Err(e) => {
+ if e.kind() == io::ErrorKind::NotFound {
+ return Ok(None);
+ }
+ return Err(e);
+ }
+ };
+
+ let mut data = Vec::new();
+ io::Read::read_to_end(&mut file, &mut data)?;
+ Ok(Some(data))
+}
+
+impl Updater {
+ fn cache_path(&self, s: &Source) -> Option<PathBuf> {
+ if let Some(ref dir) = self.cache_directory {
+ let mut p = dir.clone();
+ p.push(&s.name);
+ Some(p)
+ } else {
+ None
+ }
+ }
+
+ fn cache_update(&self, src: &Source) {
+ let path = if let Some(path) = self.cache_path(src) {
+ path
+ } else {
+ return;
+ };
+
+ let data = serde_json::to_vec(&src.data).unwrap();
+ match update_file(&path, &data) {
+ Ok(()) => {}
+ Err(e) => {
+ eprintln!("<4>Failed to cache [{}]: {}", &src.name, e);
+ }
+ }
+ }
+
+ fn cache_load(&self, src: &mut Source) -> bool {
+ let path = if let Some(path) = self.cache_path(src) {
+ path
+ } else {
+ return false;
+ };
+
+ let data = match load_file(&path) {
+ Ok(Some(data)) => data,
+ Ok(None) => {
+ return false;
+ }
+ Err(e) => {
+ eprintln!("<3>Failed to read [{}] from cache: {}", &src.name, e);
+ return false;
+ }
+ };
+
+ let mut de = serde_json::Deserializer::from_slice(&data);
+ src.data = match serde::Deserialize::deserialize(&mut de) {
+ Ok(r) => r,
+ Err(e) => {
+ eprintln!("<3>Failed to load [{}] from cache: {}", &src.name, e);
+ return false;
+ }
+ };
+
+ true
+ }
+
+ fn update(&self, src: &mut Source) -> (bool, Instant) {
+ let refresh = Duration::from_secs(u64::from(self.config.refresh_sec));
+
+ let r = fetch_source(&src.config.url);
+ let now = Instant::now();
+ let r = match r {
+ Ok(r) => {
+ eprintln!("<6>Updated [{}]", &src.config.url);
+ src.data = r;
+ src.backoff = None;
+ src.next_update = now + refresh;
+ self.cache_update(src);
+ return (true, now);
+ }
+ Err(r) => r,
+ };
+
+ let b = src
+ .backoff
+ .unwrap_or_else(|| Duration::from_secs(10).min(refresh / 10));
+ src.next_update = now + b;
+ src.backoff = Some((b + b / 3).min(refresh / 3));
+ eprintln!(
+ "<3>Failed to update [{}], retrying after {:.1?}: {}",
+ &src.config.url, b, &r
+ );
+ (false, now)
+ }
+}
+
+pub struct Manager {
+ dev: wg::Device,
+ peer_config: config::PeerConfig,
+ sources: Vec<Source>,
+ current: model::Config,
+ runtime_directory: Option<PathBuf>,
+ updater: Updater,
+}
+
+impl Manager {
+ pub fn new(ifname: OsString, c: config::Config) -> io::Result<Self> {
+ let mut m = Self {
+ dev: wg::Device::new(ifname)?,
+ peer_config: c.peer_config,
+ sources: vec![],
+ current: model::Config::default(),
+ runtime_directory: c.runtime_directory,
+ updater: Updater {
+ config: c.update_config,
+ cache_directory: c.cache_directory,
+ },
+ };
+
+ let _ = m.current_load();
+
+ for (name, cfg) in c.sources {
+ m.add_source(name, cfg)?;
+ }
+
+ Ok(m)
+ }
+
+ fn state_path(&self) -> Option<PathBuf> {
+ let mut path = if let Some(ref path) = self.runtime_directory {
+ path.clone()
+ } else {
+ return None;
+ };
+ path.push("state.json");
+ Some(path)
+ }
+
+ fn current_load(&mut self) -> bool {
+ let path = if let Some(path) = self.state_path() {
+ path
+ } else {
+ return false;
+ };
+
+ let data = match load_file(&path) {
+ Ok(Some(data)) => data,
+ Ok(None) => {
+ return false;
+ }
+ Err(e) => {
+ eprintln!("<3>Failed to read interface state: {}", e);
+ return false;
+ }
+ };
+
+ let mut de = serde_json::Deserializer::from_slice(&data);
+ match serde::Deserialize::deserialize(&mut de) {
+ Ok(c) => {
+ self.current = c;
+ true
+ }
+ Err(e) => {
+ eprintln!("<3>Failed to load interface state: {}", e);
+ false
+ }
+ }
+ }
+
+ fn current_update(&mut self, c: &model::Config) {
+ let path = if let Some(path) = self.state_path() {
+ path
+ } else {
+ return;
+ };
+
+ let data = serde_json::to_vec(c).unwrap();
+ match update_file(&path, &data) {
+ Ok(()) => {}
+ Err(e) => {
+ eprintln!("<3>Failed to persist interface state: {}", e);
+ }
+ }
+ }
+
+ fn add_source(&mut self, name: String, config: config::Source) -> io::Result<()> {
+ let mut s = Source {
+ name,
+ config,
+ data: proto::Source::empty(),
+ next_update: Instant::now(),
+ backoff: None,
+ };
+
+ self.init_source(&mut s)?;
+ self.sources.push(s);
+ Ok(())
+ }
+
+ fn init_source(&mut self, s: &mut Source) -> io::Result<()> {
+ if self.updater.update(s).0 {
+ return Ok(());
+ }
+ if self.updater.cache_load(s) {
+ return Ok(());
+ }
+ if !s.config.required {
+ return Ok(());
+ }
+ if self.updater.update(s).0 {
+ return Ok(());
+ }
+ if self.updater.update(s).0 {
+ return Ok(());
+ }
+ Err(io::Error::new(
+ io::ErrorKind::Other,
+ format!("Failed to update required source [{}]", &s.config.url),
+ ))
+ }
+
+ fn make_config(
+ &self,
+ public_key: model::Key,
+ ts: SystemTime,
+ ) -> (model::Config, Vec<builder::ConfigError>, SystemTime) {
+ let mut t_cfg = ts + Duration::from_secs(1 << 20);
+ let mut sources: Vec<(&Source, &proto::SourceConfig)> = vec![];
+ for src in &self.sources {
+ let sc = src
+ .data
+ .next
+ .as_ref()
+ .and_then(|next| {
+ if ts >= next.update_at {
+ Some(&next.config)
+ } else {
+ t_cfg = t_cfg.min(next.update_at);
+ None
+ }
+ })
+ .unwrap_or(&src.data.config);
+ sources.push((src, sc));
+ }
+
+ let mut cfg = builder::ConfigBuilder::new(public_key, &self.peer_config);
+
+ for (src, sc) in &sources {
+ for peer in &sc.servers {
+ cfg.add_server(&src.config, peer);
+ }
+ }
+
+ for (src, sc) in &sources {
+ for peer in &sc.road_warriors {
+ cfg.add_road_warrior(&src.config, peer);
+ }
+ }
+
+ let (cfg, errs) = cfg.build();
+ (cfg, errs, t_cfg)
+ }
+
+ fn refresh(&mut self) -> io::Result<Instant> {
+ let refresh = Duration::from_secs(u64::from(self.updater.config.refresh_sec));
+ let mut now = Instant::now();
+ let mut t_refresh = now + refresh;
+
+ for src in &mut self.sources {
+ if now >= src.next_update {
+ now = self.updater.update(src).1;
+ }
+ t_refresh = t_refresh.min(src.next_update);
+ }
+
+ Ok(t_refresh)
+ }
+
+ pub fn update(&mut self) -> io::Result<Instant> {
+ let t_refresh = self.refresh()?;
+
+ let public_key = self.dev.get_public_key()?;
+ let now = Instant::now();
+ let sysnow = SystemTime::now();
+ let (config, errors, t_cfg) = self.make_config(public_key, sysnow);
+ let time_to_cfg = t_cfg
+ .duration_since(sysnow)
+ .unwrap_or(Duration::from_secs(0));
+ let t_cfg = now + time_to_cfg;
+
+ if config != self.current {
+ eprintln!("<5>Applying configuration update");
+ for err in &errors {
+ eprintln!("<{}>{}", if err.important { '4' } else { '5' }, err);
+ }
+ self.dev.apply_diff(&self.current, &config)?;
+ self.current_update(&config);
+ self.current = config;
+ }
+
+ Ok(if t_cfg < t_refresh {
+ eprintln!("<6>Next configuration update after {:.1?}", time_to_cfg);
+ t_cfg
+ } else if t_refresh > now {
+ t_refresh
+ } else {
+ eprintln!("<4>Next refresh immediately?");
+ now
+ })
+ }
+}
+
+pub fn fetch_source(url: &str) -> io::Result<proto::Source> {
+ use std::env;
+ use std::process::{Command, Stdio};
+
+ let curl = match env::var_os("CURL") {
+ None => OsString::new(),
+ Some(v) => v,
+ };
+ let mut proc = Command::new(if curl.is_empty() {
+ OsStr::new("curl")
+ } else {
+ curl.as_os_str()
+ });
+
+ proc.stdin(Stdio::null());
+ proc.stdout(Stdio::piped());
+ proc.stderr(Stdio::piped());
+ proc.arg("-gsSfL");
+ proc.arg("--fail-early");
+ proc.arg("--max-time");
+ proc.arg("10");
+ proc.arg("--max-filesize");
+ proc.arg("1M");
+ proc.arg("--");
+ proc.arg(url);
+
+ let out = proc.output()?;
+
+ if !out.status.success() {
+ let msg = String::from_utf8_lossy(&out.stderr);
+ let msg = msg.replace('\n', "; ");
+ return Err(io::Error::new(io::ErrorKind::Other, msg));
+ }
+
+ let mut de = serde_json::Deserializer::from_slice(&out.stdout);
+ let r = serde::Deserialize::deserialize(&mut de)?;
+ Ok(r)
+}
+
+pub fn load_source(path: &OsStr) -> io::Result<proto::Source> {
+ let mut data = Vec::new();
+ {
+ use std::io::Read;
+ let mut f = fs::File::open(&path)?;
+ f.read_to_end(&mut data)?;
+ }
+
+ let mut de = serde_json::Deserializer::from_slice(&data);
+ let r = serde::Deserialize::deserialize(&mut de)?;
+ Ok(r)
+}