aboutsummaryrefslogblamecommitdiff
path: root/src/manager.rs
blob: 4b7df6a67098455c7ade23e81ec3a28464655285 (plain) (tree)
1
2
3
4
5
6
7
8
9



                              
                                               

                                

                                               

























                                                              



                                                                   

























                                                                                        



                                                                   

          



                                                                  












































                                                                                      


                                                                          

                                                         



                                                                  

                    










                                    

                                                                         









                                                   
                                      



































                                                                                      



                                                                            






                                                                 
                                                          
                                                                       



                                  














                                                                                 

                                     



                                                  

                                           












                                                                                      
                                      






















                                                                           
                                




































































                                                                                
// Copyright 2019 Hristo Venev
//
// See COPYING.

use crate::{builder, config, model, proto, wg};
use std::ffi::{OsStr, OsString};
use std::path::PathBuf;
use std::time::{Duration, Instant, SystemTime};
use std::{fs, io};

struct Source {
    name: String,
    config: config::Source,
    data: proto::Source,
    next_update: Instant,
    backoff: Option<Duration>,
}

struct Updater {
    config: config::UpdateConfig,
    cache_directory: Option<PathBuf>,
}

impl Updater {
    fn cache_path(&self, s: &Source) -> Option<PathBuf> {
        if let Some(ref dir) = self.cache_directory {
            let mut p = dir.clone();
            p.push(&s.name);
            Some(p)
        } else {
            None
        }
    }

    fn cache_update(&self, src: &Source) -> io::Result<bool> {
        let path = if let Some(path) = match self.cache_path(src) {
            path
        } else {
            return Ok(false);
        };

        let mut tmp_path = OsString::from(path.clone());
        tmp_path.push(".tmp");
        let tmp_path = PathBuf::from(tmp_path);

        let data = serde_json::to_vec(&src.data).unwrap();

        let mut file = fs::File::create(&tmp_path)?;
        match io::Write::write_all(&mut file, &data)
            .and_then(|_| file.sync_data())
            .and_then(|_| fs::rename(&tmp_path, &path))
        {
            Ok(()) => {}
            Err(e) => {
                fs::remove_file(&tmp_path).unwrap_or_else(|e2| {
                    eprintln!("<3>Failed to clean up [{}]: {}", tmp_path.display(), e2);
                });
                return Err(e);
            }
        }

        Ok(true)
    }

    fn cache_load(&self, src: &mut Source) -> bool {
        let path = if let Some(path) = match self.cache_path(src) {
            path
        } else {
            return false;
        };

        let mut file = if let Some(file) = fs::File::open(&path) {
            file
        } else {
            return false;
        };

        let mut data = Vec::new();
        match io::Read::read_to_end(&mut file, &mut data) {
            Ok(_) => {}
            Err(e) => {
                eprintln!("<3>Failed to read [{}] from cache: {}", src.config.url, e);
                return false;
            }
        };

        let mut de = serde_json::Deserializer::from_slice(&data);
        src.data = match serde::Deserialize::deserialize(&mut de) {
            Ok(r) => r,
            Err(e) => {
                eprintln!("<3>Failed to load [{}] from cache: {}", src.config.url, e);
                return false;
            }
        };

        true
    }

    fn update(&self, src: &mut Source) -> (bool, Instant) {
        let refresh = Duration::from_secs(u64::from(self.config.refresh_sec));

        let r = fetch_source(&src.config.url);
        let now = Instant::now();
        let r = match r {
            Ok(r) => {
                eprintln!("<6>Updated [{}]", &src.config.url);
                src.data = r;
                src.backoff = None;
                src.next_update = now + refresh;
                match self.cache_update(src) {
                    Ok(_) => {}
                    Err(e) => {
                        eprintln!("<4>Failed to cache [{}]: {}", &src.config.url, e);
                    }
                }
                return (true, now);
            }
            Err(r) => r,
        };

        let b = src
            .backoff
            .unwrap_or_else(|| Duration::from_secs(10).min(refresh / 10));
        src.next_update = now + b;
        src.backoff = Some((b + b / 3).min(refresh / 3));
        eprintln!(
            "<3>Failed to update [{}], retrying after {:.1?}: {}",
            &src.config.url, b, &r
        );
        (false, now)
    }
}

pub struct Manager {
    dev: wg::Device,
    peer_config: config::PeerConfig,
    sources: Vec<Source>,
    current: model::Config,
    updater: Updater,
}

impl Manager {
    pub fn new(ifname: OsString, c: config::Config) -> io::Result<Self> {
        let mut m = Self {
            dev: wg::Device::new(ifname)?,
            peer_config: c.peer_config,
            sources: vec![],
            current: model::Config::default(),
            updater: Updater {
                config: c.update_config,
                cache_directory: c.cache_directory,
            },
        };

        for (name, cfg) in c.sources {
            m.add_source(name, cfg)?;
        }

        Ok(m)
    }

    fn add_source(&mut self, name: String, config: config::Source) -> io::Result<()> {
        let mut s = Source {
            name,
            config,
            data: proto::Source::empty(),
            next_update: Instant::now(),
            backoff: None,
        };

        self.init_source(&mut s)?;
        self.sources.push(s);
        Ok(())
    }

    fn init_source(&mut self, s: &mut Source) -> io::Result<()> {
        if self.updater.update(s).0 {
            return Ok(());
        }
        if self.updater.cache_load(s) {
            return Ok(());
        }
        if !s.config.required {
            return Ok(());
        }
        if self.updater.update(s).0 {
            return Ok(());
        }
        if self.updater.update(s).0 {
            return Ok(());
        }
        Err(io::Error::new(
            io::ErrorKind::Other,
            format!("Failed to update required source [{}]", &s.config.url),
        ))
    }

    fn make_config(
        &self,
        public_key: model::Key,
        ts: SystemTime,
    ) -> (model::Config, Vec<builder::ConfigError>, SystemTime) {
        let mut t_cfg = ts + Duration::from_secs(1 << 20);
        let mut sources: Vec<(&Source, &proto::SourceConfig)> = vec![];
        for src in &self.sources {
            let sc = src
                .data
                .next
                .as_ref()
                .and_then(|next| {
                    if ts >= next.update_at {
                        Some(&next.config)
                    } else {
                        t_cfg = t_cfg.min(next.update_at);
                        None
                    }
                })
                .unwrap_or(&src.data.config);
            sources.push((src, sc));
        }

        let mut cfg = builder::ConfigBuilder::new(public_key, &self.peer_config);

        for (src, sc) in &sources {
            for peer in &sc.servers {
                cfg.add_server(&src.config, peer);
            }
        }

        for (src, sc) in &sources {
            for peer in &sc.road_warriors {
                cfg.add_road_warrior(&src.config, peer);
            }
        }

        let (cfg, errs) = cfg.build();
        (cfg, errs, t_cfg)
    }

    fn refresh(&mut self) -> io::Result<Instant> {
        let refresh = Duration::from_secs(u64::from(self.updater.config.refresh_sec));
        let mut now = Instant::now();
        let mut t_refresh = now + refresh;

        for src in &mut self.sources {
            if now >= src.next_update {
                now = self.updater.update(src).1;
            }
            t_refresh = t_refresh.min(src.next_update);
        }

        Ok(t_refresh)
    }

    pub fn update(&mut self) -> io::Result<Instant> {
        let t_refresh = self.refresh()?;

        let public_key = self.dev.get_public_key()?;
        let now = Instant::now();
        let sysnow = SystemTime::now();
        let (config, errors, t_cfg) = self.make_config(public_key, sysnow);
        let time_to_cfg = t_cfg
            .duration_since(sysnow)
            .unwrap_or(Duration::from_secs(0));
        let t_cfg = now + time_to_cfg;

        if config != self.current {
            eprintln!("<5>Applying configuration update");
            for err in &errors {
                eprintln!("<{}>{}", if err.important { '4' } else { '5' }, err);
            }
            self.dev.apply_diff(&self.current, &config)?;
            self.current = config;
        }

        Ok(if t_cfg < t_refresh {
            eprintln!("<6>Next configuration update after {:.1?}", time_to_cfg);
            t_cfg
        } else if t_refresh > now {
            t_refresh
        } else {
            eprintln!("<4>Next refresh immediately?");
            now
        })
    }
}

pub fn fetch_source(url: &str) -> io::Result<proto::Source> {
    use std::env;
    use std::process::{Command, Stdio};

    let curl = match env::var_os("CURL") {
        None => OsString::new(),
        Some(v) => v,
    };
    let mut proc = Command::new(if curl.is_empty() {
        OsStr::new("curl")
    } else {
        curl.as_os_str()
    });

    proc.stdin(Stdio::null());
    proc.stdout(Stdio::piped());
    proc.stderr(Stdio::piped());
    proc.arg("-gsSfL");
    proc.arg("--fail-early");
    proc.arg("--max-time");
    proc.arg("10");
    proc.arg("--max-filesize");
    proc.arg("1M");
    proc.arg("--");
    proc.arg(url);

    let out = proc.output()?;

    if !out.status.success() {
        let msg = String::from_utf8_lossy(&out.stderr);
        let msg = msg.replace('\n', "; ");
        return Err(io::Error::new(io::ErrorKind::Other, msg));
    }

    let mut de = serde_json::Deserializer::from_slice(&out.stdout);
    let r = serde::Deserialize::deserialize(&mut de)?;
    Ok(r)
}

pub fn load_source(path: &OsStr) -> io::Result<proto::Source> {
    let mut data = Vec::new();
    {
        use std::io::Read;
        let mut f = fs::File::open(&path)?;
        f.read_to_end(&mut data)?;
    }

    let mut de = serde_json::Deserializer::from_slice(&data);
    let r = serde::Deserialize::deserialize(&mut de)?;
    Ok(r)
}