From 167a766f488091a9e6d833bb64e7e7cf8f90111f Mon Sep 17 00:00:00 2001 From: Hristo Venev Date: Sat, 28 Sep 2019 16:12:03 +0000 Subject: Split updater into module. --- src/config.rs | 10 +-- src/main.rs | 4 +- src/manager/mod.rs | 160 +++--------------------------------------------- src/manager/updater.rs | 161 +++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 177 insertions(+), 158 deletions(-) create mode 100644 src/manager/updater.rs diff --git a/src/config.rs b/src/config.rs index 9972830..98a795f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -39,16 +39,19 @@ impl Default for PeerConfig { #[serde(deny_unknown_fields)] #[derive(serde_derive::Serialize, serde_derive::Deserialize, Clone, PartialEq, Eq, Debug)] -pub struct UpdateConfig { +pub struct UpdaterConfig { + pub cache_directory: Option, + // Number of seconds between regular updates. #[serde(default = "default_refresh_sec")] pub refresh_sec: u32, } -impl Default for UpdateConfig { +impl Default for UpdaterConfig { #[inline] fn default() -> Self { Self { + cache_directory: None, refresh_sec: default_refresh_sec(), } } @@ -57,14 +60,13 @@ impl Default for UpdateConfig { #[serde(deny_unknown_fields)] #[derive(serde_derive::Serialize, serde_derive::Deserialize, Default, Clone, Debug)] pub struct Config { - pub cache_directory: Option, pub runtime_directory: Option, #[serde(flatten)] pub peer_config: PeerConfig, #[serde(flatten)] - pub update_config: UpdateConfig, + pub updater: UpdaterConfig, #[serde(rename = "source")] pub sources: HashMap, diff --git a/src/main.rs b/src/main.rs index 4ae628e..cfd9c82 100644 --- a/src/main.rs +++ b/src/main.rs @@ -89,7 +89,7 @@ fn cli_config(args: &mut impl Iterator) -> Option) -> i } fn run_daemon(ifname: OsString, mut config: config::Config) -> i32 { - maybe_get_var(&mut config.cache_directory, "CACHE_DIRECTORY"); + maybe_get_var(&mut config.updater.cache_directory, "CACHE_DIRECTORY"); maybe_get_var(&mut config.runtime_directory, "RUNTIME_DIRECTORY"); let mut m = match manager::Manager::new(ifname, config) { diff --git a/src/manager/mod.rs b/src/manager/mod.rs index 8e8792d..2efa370 100644 --- a/src/manager/mod.rs +++ b/src/manager/mod.rs @@ -3,15 +3,13 @@ // See COPYING. use crate::{config, model, proto, wg}; -use std::ffi::{OsStr, OsString}; +use std::ffi::{OsString}; #[cfg(unix)] use std::os::unix::fs::OpenOptionsExt; use std::path::{Path, PathBuf}; use std::time::{Duration, Instant, SystemTime}; use std::{fs, io}; -mod builder; - struct Source { name: String, config: config::Source, @@ -20,10 +18,10 @@ struct Source { backoff: Option, } -struct Updater { - config: config::UpdateConfig, - cache_directory: Option, -} +mod builder; + +mod updater; +pub use updater::load_source; fn update_file(path: &Path, data: &[u8]) -> io::Result<()> { let mut tmp_path = OsString::from(path); @@ -67,100 +65,13 @@ fn load_file(path: &Path) -> io::Result>> { Ok(Some(data)) } -impl Updater { - fn cache_path(&self, s: &Source) -> Option { - if let Some(ref dir) = self.cache_directory { - let mut p = dir.clone(); - p.push(&s.name); - Some(p) - } else { - None - } - } - - fn cache_update(&self, src: &Source) { - let path = if let Some(path) = self.cache_path(src) { - path - } else { - return; - }; - - let data = serde_json::to_vec(&src.data).unwrap(); - match update_file(&path, &data) { - Ok(()) => {} - Err(e) => { - eprintln!("<4>Failed to cache [{}]: {}", &src.name, e); - } - } - } - - fn cache_load(&self, src: &mut Source) -> bool { - let path = if let Some(path) = self.cache_path(src) { - path - } else { - return false; - }; - - let data = match load_file(&path) { - Ok(Some(data)) => data, - Ok(None) => { - return false; - } - Err(e) => { - eprintln!("<3>Failed to read [{}] from cache: {}", &src.name, e); - return false; - } - }; - - let mut de = serde_json::Deserializer::from_slice(&data); - src.data = match serde::Deserialize::deserialize(&mut de) { - Ok(r) => r, - Err(e) => { - eprintln!("<3>Failed to load [{}] from cache: {}", &src.name, e); - return false; - } - }; - - true - } - - fn update(&self, src: &mut Source) -> (bool, Instant) { - let refresh = Duration::from_secs(u64::from(self.config.refresh_sec)); - - let r = fetch_source(&src.config.url); - let now = Instant::now(); - let r = match r { - Ok(r) => { - eprintln!("<6>Updated [{}]", &src.config.url); - src.data = r; - src.backoff = None; - src.next_update = now + refresh; - self.cache_update(src); - return (true, now); - } - Err(r) => r, - }; - - let b = src - .backoff - .unwrap_or_else(|| Duration::from_secs(10).min(refresh / 10)); - src.next_update = now + b; - src.backoff = Some((b + b / 3).min(refresh / 3)); - eprintln!( - "<3>Failed to update [{}], retrying after {:.1?}: {}", - &src.config.url, b, &r - ); - (false, now) - } -} - pub struct Manager { dev: wg::Device, peer_config: config::PeerConfig, sources: Vec, current: model::Config, runtime_directory: Option, - updater: Updater, + updater: updater::Updater, } impl Manager { @@ -171,10 +82,7 @@ impl Manager { sources: vec![], current: model::Config::default(), runtime_directory: c.runtime_directory, - updater: Updater { - config: c.update_config, - cache_directory: c.cache_directory, - }, + updater: updater::Updater::new(c.updater), }; let _ = m.current_load(); @@ -322,7 +230,7 @@ impl Manager { } fn refresh(&mut self) -> io::Result { - let refresh = Duration::from_secs(u64::from(self.updater.config.refresh_sec)); + let refresh = self.updater.refresh_time(); let mut now = Instant::now(); let mut t_refresh = now + refresh; @@ -369,55 +277,3 @@ impl Manager { }) } } - -pub fn fetch_source(url: &str) -> io::Result { - use std::env; - use std::process::{Command, Stdio}; - - let curl = match env::var_os("CURL") { - None => OsString::new(), - Some(v) => v, - }; - let mut proc = Command::new(if curl.is_empty() { - OsStr::new("curl") - } else { - curl.as_os_str() - }); - - proc.stdin(Stdio::null()); - proc.stdout(Stdio::piped()); - proc.stderr(Stdio::piped()); - proc.arg("-gsSfL"); - proc.arg("--fail-early"); - proc.arg("--max-time"); - proc.arg("10"); - proc.arg("--max-filesize"); - proc.arg("1M"); - proc.arg("--"); - proc.arg(url); - - let out = proc.output()?; - - if !out.status.success() { - let msg = String::from_utf8_lossy(&out.stderr); - let msg = msg.replace('\n', "; "); - return Err(io::Error::new(io::ErrorKind::Other, msg)); - } - - let mut de = serde_json::Deserializer::from_slice(&out.stdout); - let r = serde::Deserialize::deserialize(&mut de)?; - Ok(r) -} - -pub fn load_source(path: &OsStr) -> io::Result { - let mut data = Vec::new(); - { - use std::io::Read; - let mut f = fs::File::open(&path)?; - f.read_to_end(&mut data)?; - } - - let mut de = serde_json::Deserializer::from_slice(&data); - let r = serde::Deserialize::deserialize(&mut de)?; - Ok(r) -} diff --git a/src/manager/updater.rs b/src/manager/updater.rs new file mode 100644 index 0000000..0de675c --- /dev/null +++ b/src/manager/updater.rs @@ -0,0 +1,161 @@ +// Copyright 2019 Hristo Venev +// +// See COPYING. + +use super::{load_file, update_file, Source}; +use crate::{config, proto}; +use std::ffi::{OsStr, OsString}; +use std::path::PathBuf; +use std::time::{Duration, Instant}; +use std::{fs, io}; + +pub(super) struct Updater { + config: config::UpdaterConfig, +} + +impl Updater { + pub fn new(config: config::UpdaterConfig) -> Self { + Self { config } + } + + fn cache_path(&self, s: &Source) -> Option { + if let Some(ref dir) = self.config.cache_directory { + let mut p = dir.clone(); + p.push(&s.name); + Some(p) + } else { + None + } + } + + fn cache_update(&self, src: &Source) { + let path = if let Some(path) = self.cache_path(src) { + path + } else { + return; + }; + + let data = serde_json::to_vec(&src.data).unwrap(); + match update_file(&path, &data) { + Ok(()) => {} + Err(e) => { + eprintln!("<4>Failed to cache [{}]: {}", &src.name, e); + } + } + } + + pub fn cache_load(&self, src: &mut Source) -> bool { + let path = if let Some(path) = self.cache_path(src) { + path + } else { + return false; + }; + + let data = match load_file(&path) { + Ok(Some(data)) => data, + Ok(None) => { + return false; + } + Err(e) => { + eprintln!("<3>Failed to read [{}] from cache: {}", &src.name, e); + return false; + } + }; + + let mut de = serde_json::Deserializer::from_slice(&data); + src.data = match serde::Deserialize::deserialize(&mut de) { + Ok(r) => r, + Err(e) => { + eprintln!("<3>Failed to load [{}] from cache: {}", &src.name, e); + return false; + } + }; + + true + } + + pub fn update(&self, src: &mut Source) -> (bool, Instant) { + let refresh = self.refresh_time(); + + let r = fetch_source(&src.config.url); + let now = Instant::now(); + let r = match r { + Ok(r) => { + eprintln!("<6>Updated [{}]", &src.config.url); + src.data = r; + src.backoff = None; + src.next_update = now + refresh; + self.cache_update(src); + return (true, now); + } + Err(r) => r, + }; + + let b = src + .backoff + .unwrap_or_else(|| Duration::from_secs(10).min(refresh / 10)); + src.next_update = now + b; + src.backoff = Some((b + b / 3).min(refresh / 3)); + eprintln!( + "<3>Failed to update [{}], retrying after {:.1?}: {}", + &src.config.url, b, &r + ); + (false, now) + } + + pub fn refresh_time(&self) -> Duration { + Duration::from_secs(u64::from(self.config.refresh_sec)) + } +} + +fn fetch_source(url: &str) -> io::Result { + use std::env; + use std::process::{Command, Stdio}; + + let curl = match env::var_os("CURL") { + None => OsString::new(), + Some(v) => v, + }; + let mut proc = Command::new(if curl.is_empty() { + OsStr::new("curl") + } else { + curl.as_os_str() + }); + + proc.stdin(Stdio::null()); + proc.stdout(Stdio::piped()); + proc.stderr(Stdio::piped()); + proc.arg("-gsSfL"); + proc.arg("--fail-early"); + proc.arg("--max-time"); + proc.arg("10"); + proc.arg("--max-filesize"); + proc.arg("1M"); + proc.arg("--"); + proc.arg(url); + + let out = proc.output()?; + + if !out.status.success() { + let msg = String::from_utf8_lossy(&out.stderr); + let msg = msg.replace('\n', "; "); + return Err(io::Error::new(io::ErrorKind::Other, msg)); + } + + let mut de = serde_json::Deserializer::from_slice(&out.stdout); + let r = serde::Deserialize::deserialize(&mut de)?; + Ok(r) +} + +pub fn load_source(path: &OsStr) -> io::Result { + let mut data = Vec::new(); + { + use std::io::Read; + let mut f = fs::File::open(&path)?; + f.read_to_end(&mut data)?; + } + + let mut de = serde_json::Deserializer::from_slice(&data); + let r = serde::Deserialize::deserialize(&mut de)?; + Ok(r) +} -- cgit