aboutsummaryrefslogtreecommitdiff
path: root/src/manager.rs
diff options
context:
space:
mode:
authorHristo Venev <hristo@venev.name>2019-09-28 16:04:53 +0000
committerHristo Venev <hristo@venev.name>2019-09-28 16:04:53 +0000
commita66bdaee155d9012b1fd75bf31332df638d49857 (patch)
treeb75058baf614a6fac894123ba21143382fb5e546 /src/manager.rs
parent5bcc0210923e8cb50fc97b318309d1b62c5cf62e (diff)
Move builder inside manager.
Diffstat (limited to 'src/manager.rs')
-rw-r--r--src/manager.rs421
1 files changed, 0 insertions, 421 deletions
diff --git a/src/manager.rs b/src/manager.rs
deleted file mode 100644
index 3f487bf..0000000
--- a/src/manager.rs
+++ /dev/null
@@ -1,421 +0,0 @@
-// Copyright 2019 Hristo Venev
-//
-// See COPYING.
-
-use crate::{builder, config, model, proto, wg};
-use std::ffi::{OsStr, OsString};
-#[cfg(unix)]
-use std::os::unix::fs::OpenOptionsExt;
-use std::path::{Path, PathBuf};
-use std::time::{Duration, Instant, SystemTime};
-use std::{fs, io};
-
-struct Source {
- name: String,
- config: config::Source,
- data: proto::Source,
- next_update: Instant,
- backoff: Option<Duration>,
-}
-
-struct Updater {
- config: config::UpdateConfig,
- cache_directory: Option<PathBuf>,
-}
-
-fn update_file(path: &Path, data: &[u8]) -> io::Result<()> {
- let mut tmp_path = OsString::from(path);
- tmp_path.push(".tmp");
- let tmp_path = PathBuf::from(tmp_path);
-
- let mut file = {
- let mut file = fs::OpenOptions::new();
- file.append(true);
- file.create_new(true);
- #[cfg(unix)]
- file.mode(0o0600);
- file.open(&tmp_path)?
- };
-
- let r = io::Write::write_all(&mut file, data)
- .and_then(|_| file.sync_data())
- .and_then(|_| fs::rename(&tmp_path, &path));
-
- if r.is_err() {
- fs::remove_file(&tmp_path).unwrap_or_else(|e2| {
- eprintln!("<3>Failed to clean up [{}]: {}", tmp_path.display(), e2);
- });
- }
- r
-}
-
-fn load_file(path: &Path) -> io::Result<Option<Vec<u8>>> {
- let mut file = match fs::File::open(&path) {
- Ok(file) => file,
- Err(e) => {
- if e.kind() == io::ErrorKind::NotFound {
- return Ok(None);
- }
- return Err(e);
- }
- };
-
- let mut data = Vec::new();
- io::Read::read_to_end(&mut file, &mut data)?;
- Ok(Some(data))
-}
-
-impl Updater {
- fn cache_path(&self, s: &Source) -> Option<PathBuf> {
- if let Some(ref dir) = self.cache_directory {
- let mut p = dir.clone();
- p.push(&s.name);
- Some(p)
- } else {
- None
- }
- }
-
- fn cache_update(&self, src: &Source) {
- let path = if let Some(path) = self.cache_path(src) {
- path
- } else {
- return;
- };
-
- let data = serde_json::to_vec(&src.data).unwrap();
- match update_file(&path, &data) {
- Ok(()) => {}
- Err(e) => {
- eprintln!("<4>Failed to cache [{}]: {}", &src.name, e);
- }
- }
- }
-
- fn cache_load(&self, src: &mut Source) -> bool {
- let path = if let Some(path) = self.cache_path(src) {
- path
- } else {
- return false;
- };
-
- let data = match load_file(&path) {
- Ok(Some(data)) => data,
- Ok(None) => {
- return false;
- }
- Err(e) => {
- eprintln!("<3>Failed to read [{}] from cache: {}", &src.name, e);
- return false;
- }
- };
-
- let mut de = serde_json::Deserializer::from_slice(&data);
- src.data = match serde::Deserialize::deserialize(&mut de) {
- Ok(r) => r,
- Err(e) => {
- eprintln!("<3>Failed to load [{}] from cache: {}", &src.name, e);
- return false;
- }
- };
-
- true
- }
-
- fn update(&self, src: &mut Source) -> (bool, Instant) {
- let refresh = Duration::from_secs(u64::from(self.config.refresh_sec));
-
- let r = fetch_source(&src.config.url);
- let now = Instant::now();
- let r = match r {
- Ok(r) => {
- eprintln!("<6>Updated [{}]", &src.config.url);
- src.data = r;
- src.backoff = None;
- src.next_update = now + refresh;
- self.cache_update(src);
- return (true, now);
- }
- Err(r) => r,
- };
-
- let b = src
- .backoff
- .unwrap_or_else(|| Duration::from_secs(10).min(refresh / 10));
- src.next_update = now + b;
- src.backoff = Some((b + b / 3).min(refresh / 3));
- eprintln!(
- "<3>Failed to update [{}], retrying after {:.1?}: {}",
- &src.config.url, b, &r
- );
- (false, now)
- }
-}
-
-pub struct Manager {
- dev: wg::Device,
- peer_config: config::PeerConfig,
- sources: Vec<Source>,
- current: model::Config,
- runtime_directory: Option<PathBuf>,
- updater: Updater,
-}
-
-impl Manager {
- pub fn new(ifname: OsString, c: config::Config) -> io::Result<Self> {
- let mut m = Self {
- dev: wg::Device::new(ifname)?,
- peer_config: c.peer_config,
- sources: vec![],
- current: model::Config::default(),
- runtime_directory: c.runtime_directory,
- updater: Updater {
- config: c.update_config,
- cache_directory: c.cache_directory,
- },
- };
-
- let _ = m.current_load();
-
- for (name, cfg) in c.sources {
- m.add_source(name, cfg)?;
- }
-
- Ok(m)
- }
-
- fn state_path(&self) -> Option<PathBuf> {
- let mut path = if let Some(ref path) = self.runtime_directory {
- path.clone()
- } else {
- return None;
- };
- path.push("state.json");
- Some(path)
- }
-
- fn current_load(&mut self) -> bool {
- let path = if let Some(path) = self.state_path() {
- path
- } else {
- return false;
- };
-
- let data = match load_file(&path) {
- Ok(Some(data)) => data,
- Ok(None) => {
- return false;
- }
- Err(e) => {
- eprintln!("<3>Failed to read interface state: {}", e);
- return false;
- }
- };
-
- let mut de = serde_json::Deserializer::from_slice(&data);
- match serde::Deserialize::deserialize(&mut de) {
- Ok(c) => {
- self.current = c;
- true
- }
- Err(e) => {
- eprintln!("<3>Failed to load interface state: {}", e);
- false
- }
- }
- }
-
- fn current_update(&mut self, c: &model::Config) {
- let path = if let Some(path) = self.state_path() {
- path
- } else {
- return;
- };
-
- let data = serde_json::to_vec(c).unwrap();
- match update_file(&path, &data) {
- Ok(()) => {}
- Err(e) => {
- eprintln!("<3>Failed to persist interface state: {}", e);
- }
- }
- }
-
- fn add_source(&mut self, name: String, config: config::Source) -> io::Result<()> {
- let mut s = Source {
- name,
- config,
- data: proto::Source::empty(),
- next_update: Instant::now(),
- backoff: None,
- };
-
- self.init_source(&mut s)?;
- self.sources.push(s);
- Ok(())
- }
-
- fn init_source(&mut self, s: &mut Source) -> io::Result<()> {
- if self.updater.update(s).0 {
- return Ok(());
- }
- if self.updater.cache_load(s) {
- return Ok(());
- }
- if !s.config.required {
- return Ok(());
- }
- if self.updater.update(s).0 {
- return Ok(());
- }
- if self.updater.update(s).0 {
- return Ok(());
- }
- Err(io::Error::new(
- io::ErrorKind::Other,
- format!("Failed to update required source [{}]", &s.config.url),
- ))
- }
-
- fn make_config(
- &self,
- public_key: model::Key,
- ts: SystemTime,
- ) -> (model::Config, Vec<builder::ConfigError>, SystemTime) {
- let mut t_cfg = ts + Duration::from_secs(1 << 20);
- let mut sources: Vec<(&Source, &proto::SourceConfig)> = vec![];
- for src in &self.sources {
- let sc = src
- .data
- .next
- .as_ref()
- .and_then(|next| {
- if ts >= next.update_at {
- Some(&next.config)
- } else {
- t_cfg = t_cfg.min(next.update_at);
- None
- }
- })
- .unwrap_or(&src.data.config);
- sources.push((src, sc));
- }
-
- let mut cfg = builder::ConfigBuilder::new(public_key, &self.peer_config);
-
- for (src, sc) in &sources {
- for peer in &sc.servers {
- cfg.add_server(&src.config, peer);
- }
- }
-
- for (src, sc) in &sources {
- for peer in &sc.road_warriors {
- cfg.add_road_warrior(&src.config, peer);
- }
- }
-
- let (cfg, errs) = cfg.build();
- (cfg, errs, t_cfg)
- }
-
- fn refresh(&mut self) -> io::Result<Instant> {
- let refresh = Duration::from_secs(u64::from(self.updater.config.refresh_sec));
- let mut now = Instant::now();
- let mut t_refresh = now + refresh;
-
- for src in &mut self.sources {
- if now >= src.next_update {
- now = self.updater.update(src).1;
- }
- t_refresh = t_refresh.min(src.next_update);
- }
-
- Ok(t_refresh)
- }
-
- pub fn update(&mut self) -> io::Result<Instant> {
- let t_refresh = self.refresh()?;
-
- let public_key = self.dev.get_public_key()?;
- let now = Instant::now();
- let sysnow = SystemTime::now();
- let (config, errors, t_cfg) = self.make_config(public_key, sysnow);
- let time_to_cfg = t_cfg
- .duration_since(sysnow)
- .unwrap_or(Duration::from_secs(0));
- let t_cfg = now + time_to_cfg;
-
- if config != self.current {
- eprintln!("<5>Applying configuration update");
- for err in &errors {
- eprintln!("<{}>{}", if err.important { '4' } else { '5' }, err);
- }
- self.dev.apply_diff(&self.current, &config)?;
- self.current_update(&config);
- self.current = config;
- }
-
- Ok(if t_cfg < t_refresh {
- eprintln!("<6>Next configuration update after {:.1?}", time_to_cfg);
- t_cfg
- } else if t_refresh > now {
- t_refresh
- } else {
- eprintln!("<4>Next refresh immediately?");
- now
- })
- }
-}
-
-pub fn fetch_source(url: &str) -> io::Result<proto::Source> {
- use std::env;
- use std::process::{Command, Stdio};
-
- let curl = match env::var_os("CURL") {
- None => OsString::new(),
- Some(v) => v,
- };
- let mut proc = Command::new(if curl.is_empty() {
- OsStr::new("curl")
- } else {
- curl.as_os_str()
- });
-
- proc.stdin(Stdio::null());
- proc.stdout(Stdio::piped());
- proc.stderr(Stdio::piped());
- proc.arg("-gsSfL");
- proc.arg("--fail-early");
- proc.arg("--max-time");
- proc.arg("10");
- proc.arg("--max-filesize");
- proc.arg("1M");
- proc.arg("--");
- proc.arg(url);
-
- let out = proc.output()?;
-
- if !out.status.success() {
- let msg = String::from_utf8_lossy(&out.stderr);
- let msg = msg.replace('\n', "; ");
- return Err(io::Error::new(io::ErrorKind::Other, msg));
- }
-
- let mut de = serde_json::Deserializer::from_slice(&out.stdout);
- let r = serde::Deserialize::deserialize(&mut de)?;
- Ok(r)
-}
-
-pub fn load_source(path: &OsStr) -> io::Result<proto::Source> {
- let mut data = Vec::new();
- {
- use std::io::Read;
- let mut f = fs::File::open(&path)?;
- f.read_to_end(&mut data)?;
- }
-
- let mut de = serde_json::Deserializer::from_slice(&data);
- let r = serde::Deserialize::deserialize(&mut de)?;
- Ok(r)
-}