aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHristo Venev <hristo@venev.name>2019-09-28 16:12:03 +0000
committerHristo Venev <hristo@venev.name>2019-09-28 16:12:03 +0000
commit167a766f488091a9e6d833bb64e7e7cf8f90111f (patch)
tree568621ba7309be674821aae0684ca2378d063e0e
parenta66bdaee155d9012b1fd75bf31332df638d49857 (diff)
Split updater into module.
-rw-r--r--src/config.rs10
-rw-r--r--src/main.rs4
-rw-r--r--src/manager/mod.rs160
-rw-r--r--src/manager/updater.rs161
4 files changed, 177 insertions, 158 deletions
diff --git a/src/config.rs b/src/config.rs
index 9972830..98a795f 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -39,16 +39,19 @@ impl Default for PeerConfig {
#[serde(deny_unknown_fields)]
#[derive(serde_derive::Serialize, serde_derive::Deserialize, Clone, PartialEq, Eq, Debug)]
-pub struct UpdateConfig {
+pub struct UpdaterConfig {
+ pub cache_directory: Option<PathBuf>,
+
// Number of seconds between regular updates.
#[serde(default = "default_refresh_sec")]
pub refresh_sec: u32,
}
-impl Default for UpdateConfig {
+impl Default for UpdaterConfig {
#[inline]
fn default() -> Self {
Self {
+ cache_directory: None,
refresh_sec: default_refresh_sec(),
}
}
@@ -57,14 +60,13 @@ impl Default for UpdateConfig {
#[serde(deny_unknown_fields)]
#[derive(serde_derive::Serialize, serde_derive::Deserialize, Default, Clone, Debug)]
pub struct Config {
- pub cache_directory: Option<PathBuf>,
pub runtime_directory: Option<PathBuf>,
#[serde(flatten)]
pub peer_config: PeerConfig,
#[serde(flatten)]
- pub update_config: UpdateConfig,
+ pub updater: UpdaterConfig,
#[serde(rename = "source")]
pub sources: HashMap<String, Source>,
diff --git a/src/main.rs b/src/main.rs
index 4ae628e..cfd9c82 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -89,7 +89,7 @@ fn cli_config(args: &mut impl Iterator<Item = OsString>) -> Option<config::Confi
if key == "refresh_sec" {
arg = args.next()?;
let arg = arg.to_str()?;
- cfg.update_config.refresh_sec = u32::from_str(arg).ok()?;
+ cfg.updater.refresh_sec = u32::from_str(arg).ok()?;
continue;
}
if key == "source" {
@@ -187,7 +187,7 @@ fn run_with_cmdline(argv0: &str, args: &mut impl Iterator<Item = OsString>) -> i
}
fn run_daemon(ifname: OsString, mut config: config::Config) -> i32 {
- maybe_get_var(&mut config.cache_directory, "CACHE_DIRECTORY");
+ maybe_get_var(&mut config.updater.cache_directory, "CACHE_DIRECTORY");
maybe_get_var(&mut config.runtime_directory, "RUNTIME_DIRECTORY");
let mut m = match manager::Manager::new(ifname, config) {
diff --git a/src/manager/mod.rs b/src/manager/mod.rs
index 8e8792d..2efa370 100644
--- a/src/manager/mod.rs
+++ b/src/manager/mod.rs
@@ -3,15 +3,13 @@
// See COPYING.
use crate::{config, model, proto, wg};
-use std::ffi::{OsStr, OsString};
+use std::ffi::{OsString};
#[cfg(unix)]
use std::os::unix::fs::OpenOptionsExt;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant, SystemTime};
use std::{fs, io};
-mod builder;
-
struct Source {
name: String,
config: config::Source,
@@ -20,10 +18,10 @@ struct Source {
backoff: Option<Duration>,
}
-struct Updater {
- config: config::UpdateConfig,
- cache_directory: Option<PathBuf>,
-}
+mod builder;
+
+mod updater;
+pub use updater::load_source;
fn update_file(path: &Path, data: &[u8]) -> io::Result<()> {
let mut tmp_path = OsString::from(path);
@@ -67,100 +65,13 @@ fn load_file(path: &Path) -> io::Result<Option<Vec<u8>>> {
Ok(Some(data))
}
-impl Updater {
- fn cache_path(&self, s: &Source) -> Option<PathBuf> {
- if let Some(ref dir) = self.cache_directory {
- let mut p = dir.clone();
- p.push(&s.name);
- Some(p)
- } else {
- None
- }
- }
-
- fn cache_update(&self, src: &Source) {
- let path = if let Some(path) = self.cache_path(src) {
- path
- } else {
- return;
- };
-
- let data = serde_json::to_vec(&src.data).unwrap();
- match update_file(&path, &data) {
- Ok(()) => {}
- Err(e) => {
- eprintln!("<4>Failed to cache [{}]: {}", &src.name, e);
- }
- }
- }
-
- fn cache_load(&self, src: &mut Source) -> bool {
- let path = if let Some(path) = self.cache_path(src) {
- path
- } else {
- return false;
- };
-
- let data = match load_file(&path) {
- Ok(Some(data)) => data,
- Ok(None) => {
- return false;
- }
- Err(e) => {
- eprintln!("<3>Failed to read [{}] from cache: {}", &src.name, e);
- return false;
- }
- };
-
- let mut de = serde_json::Deserializer::from_slice(&data);
- src.data = match serde::Deserialize::deserialize(&mut de) {
- Ok(r) => r,
- Err(e) => {
- eprintln!("<3>Failed to load [{}] from cache: {}", &src.name, e);
- return false;
- }
- };
-
- true
- }
-
- fn update(&self, src: &mut Source) -> (bool, Instant) {
- let refresh = Duration::from_secs(u64::from(self.config.refresh_sec));
-
- let r = fetch_source(&src.config.url);
- let now = Instant::now();
- let r = match r {
- Ok(r) => {
- eprintln!("<6>Updated [{}]", &src.config.url);
- src.data = r;
- src.backoff = None;
- src.next_update = now + refresh;
- self.cache_update(src);
- return (true, now);
- }
- Err(r) => r,
- };
-
- let b = src
- .backoff
- .unwrap_or_else(|| Duration::from_secs(10).min(refresh / 10));
- src.next_update = now + b;
- src.backoff = Some((b + b / 3).min(refresh / 3));
- eprintln!(
- "<3>Failed to update [{}], retrying after {:.1?}: {}",
- &src.config.url, b, &r
- );
- (false, now)
- }
-}
-
pub struct Manager {
dev: wg::Device,
peer_config: config::PeerConfig,
sources: Vec<Source>,
current: model::Config,
runtime_directory: Option<PathBuf>,
- updater: Updater,
+ updater: updater::Updater,
}
impl Manager {
@@ -171,10 +82,7 @@ impl Manager {
sources: vec![],
current: model::Config::default(),
runtime_directory: c.runtime_directory,
- updater: Updater {
- config: c.update_config,
- cache_directory: c.cache_directory,
- },
+ updater: updater::Updater::new(c.updater),
};
let _ = m.current_load();
@@ -322,7 +230,7 @@ impl Manager {
}
fn refresh(&mut self) -> io::Result<Instant> {
- let refresh = Duration::from_secs(u64::from(self.updater.config.refresh_sec));
+ let refresh = self.updater.refresh_time();
let mut now = Instant::now();
let mut t_refresh = now + refresh;
@@ -369,55 +277,3 @@ impl Manager {
})
}
}
-
-pub fn fetch_source(url: &str) -> io::Result<proto::Source> {
- use std::env;
- use std::process::{Command, Stdio};
-
- let curl = match env::var_os("CURL") {
- None => OsString::new(),
- Some(v) => v,
- };
- let mut proc = Command::new(if curl.is_empty() {
- OsStr::new("curl")
- } else {
- curl.as_os_str()
- });
-
- proc.stdin(Stdio::null());
- proc.stdout(Stdio::piped());
- proc.stderr(Stdio::piped());
- proc.arg("-gsSfL");
- proc.arg("--fail-early");
- proc.arg("--max-time");
- proc.arg("10");
- proc.arg("--max-filesize");
- proc.arg("1M");
- proc.arg("--");
- proc.arg(url);
-
- let out = proc.output()?;
-
- if !out.status.success() {
- let msg = String::from_utf8_lossy(&out.stderr);
- let msg = msg.replace('\n', "; ");
- return Err(io::Error::new(io::ErrorKind::Other, msg));
- }
-
- let mut de = serde_json::Deserializer::from_slice(&out.stdout);
- let r = serde::Deserialize::deserialize(&mut de)?;
- Ok(r)
-}
-
-pub fn load_source(path: &OsStr) -> io::Result<proto::Source> {
- let mut data = Vec::new();
- {
- use std::io::Read;
- let mut f = fs::File::open(&path)?;
- f.read_to_end(&mut data)?;
- }
-
- let mut de = serde_json::Deserializer::from_slice(&data);
- let r = serde::Deserialize::deserialize(&mut de)?;
- Ok(r)
-}
diff --git a/src/manager/updater.rs b/src/manager/updater.rs
new file mode 100644
index 0000000..0de675c
--- /dev/null
+++ b/src/manager/updater.rs
@@ -0,0 +1,161 @@
+// Copyright 2019 Hristo Venev
+//
+// See COPYING.
+
+use super::{load_file, update_file, Source};
+use crate::{config, proto};
+use std::ffi::{OsStr, OsString};
+use std::path::PathBuf;
+use std::time::{Duration, Instant};
+use std::{fs, io};
+
+pub(super) struct Updater {
+ config: config::UpdaterConfig,
+}
+
+impl Updater {
+ pub fn new(config: config::UpdaterConfig) -> Self {
+ Self { config }
+ }
+
+ fn cache_path(&self, s: &Source) -> Option<PathBuf> {
+ if let Some(ref dir) = self.config.cache_directory {
+ let mut p = dir.clone();
+ p.push(&s.name);
+ Some(p)
+ } else {
+ None
+ }
+ }
+
+ fn cache_update(&self, src: &Source) {
+ let path = if let Some(path) = self.cache_path(src) {
+ path
+ } else {
+ return;
+ };
+
+ let data = serde_json::to_vec(&src.data).unwrap();
+ match update_file(&path, &data) {
+ Ok(()) => {}
+ Err(e) => {
+ eprintln!("<4>Failed to cache [{}]: {}", &src.name, e);
+ }
+ }
+ }
+
+ pub fn cache_load(&self, src: &mut Source) -> bool {
+ let path = if let Some(path) = self.cache_path(src) {
+ path
+ } else {
+ return false;
+ };
+
+ let data = match load_file(&path) {
+ Ok(Some(data)) => data,
+ Ok(None) => {
+ return false;
+ }
+ Err(e) => {
+ eprintln!("<3>Failed to read [{}] from cache: {}", &src.name, e);
+ return false;
+ }
+ };
+
+ let mut de = serde_json::Deserializer::from_slice(&data);
+ src.data = match serde::Deserialize::deserialize(&mut de) {
+ Ok(r) => r,
+ Err(e) => {
+ eprintln!("<3>Failed to load [{}] from cache: {}", &src.name, e);
+ return false;
+ }
+ };
+
+ true
+ }
+
+ pub fn update(&self, src: &mut Source) -> (bool, Instant) {
+ let refresh = self.refresh_time();
+
+ let r = fetch_source(&src.config.url);
+ let now = Instant::now();
+ let r = match r {
+ Ok(r) => {
+ eprintln!("<6>Updated [{}]", &src.config.url);
+ src.data = r;
+ src.backoff = None;
+ src.next_update = now + refresh;
+ self.cache_update(src);
+ return (true, now);
+ }
+ Err(r) => r,
+ };
+
+ let b = src
+ .backoff
+ .unwrap_or_else(|| Duration::from_secs(10).min(refresh / 10));
+ src.next_update = now + b;
+ src.backoff = Some((b + b / 3).min(refresh / 3));
+ eprintln!(
+ "<3>Failed to update [{}], retrying after {:.1?}: {}",
+ &src.config.url, b, &r
+ );
+ (false, now)
+ }
+
+ pub fn refresh_time(&self) -> Duration {
+ Duration::from_secs(u64::from(self.config.refresh_sec))
+ }
+}
+
+fn fetch_source(url: &str) -> io::Result<proto::Source> {
+ use std::env;
+ use std::process::{Command, Stdio};
+
+ let curl = match env::var_os("CURL") {
+ None => OsString::new(),
+ Some(v) => v,
+ };
+ let mut proc = Command::new(if curl.is_empty() {
+ OsStr::new("curl")
+ } else {
+ curl.as_os_str()
+ });
+
+ proc.stdin(Stdio::null());
+ proc.stdout(Stdio::piped());
+ proc.stderr(Stdio::piped());
+ proc.arg("-gsSfL");
+ proc.arg("--fail-early");
+ proc.arg("--max-time");
+ proc.arg("10");
+ proc.arg("--max-filesize");
+ proc.arg("1M");
+ proc.arg("--");
+ proc.arg(url);
+
+ let out = proc.output()?;
+
+ if !out.status.success() {
+ let msg = String::from_utf8_lossy(&out.stderr);
+ let msg = msg.replace('\n', "; ");
+ return Err(io::Error::new(io::ErrorKind::Other, msg));
+ }
+
+ let mut de = serde_json::Deserializer::from_slice(&out.stdout);
+ let r = serde::Deserialize::deserialize(&mut de)?;
+ Ok(r)
+}
+
+pub fn load_source(path: &OsStr) -> io::Result<proto::Source> {
+ let mut data = Vec::new();
+ {
+ use std::io::Read;
+ let mut f = fs::File::open(&path)?;
+ f.read_to_end(&mut data)?;
+ }
+
+ let mut de = serde_json::Deserializer::from_slice(&data);
+ let r = serde::Deserialize::deserialize(&mut de)?;
+ Ok(r)
+}