mod settings; #[macro_use] extern crate log; use ammonia::clean; use chrono::prelude::*; use chrono::Duration; use scraper::{Html, Selector}; use serde::{Deserialize, Serialize}; use settings::Settings; use sqlx::sqlite::SqlitePool; use std::env; use std::fs::File; use url::Url; #[derive(Debug, Deserialize, Serialize)] struct Page { url: Url, status: u16, last_fetched: DateTime, body: String, } #[tokio::main] async fn main() { env_logger::init(); let settings = Settings::new().unwrap(); let pool = SqlitePool::connect(&env::var("DATABASE_URL").unwrap()) .await .unwrap(); let mut to_crawl = settings.sitemap; let mut crawled = 0; loop { let mut handles = Vec::new(); for url in to_crawl { let job = tokio::spawn(crawl_url(url, settings.allowlist.clone())); handles.push(job); crawled += 1; } let mut results = Vec::new(); for job in handles { results.push(job.await); } to_crawl = Vec::new(); for res in results { for t in res { for url in t { info!("pushing {}", &url.as_str()); to_crawl.push(url); } } } if to_crawl == vec![] { break; } } info!("Succesfully crawled {} pages!", crawled); let mut cache_dir = dirs::cache_dir().unwrap(); cache_dir.push("ferret"); for i in cacache::list_sync(cache_dir.as_path()) { let data = cacache::read(cache_dir.as_path(), &i.unwrap().key) .await .unwrap(); let decoded_page: Page = bincode::deserialize(&data).unwrap(); insert_db(&pool, &decoded_page).await; info!( "Found page: {} {}", &decoded_page.url.as_str(), &decoded_page.last_fetched.format("%Y-%m-%d %H:%M:%S") ); } } async fn crawl_url(url: Url, allow: Vec) -> Vec { let mut cache_dir = dirs::cache_dir().unwrap(); cache_dir.push("ferret"); for i in cacache::list_sync(cache_dir.as_path()) { match i { Ok(_) => { if i.as_ref().unwrap().key == url.clone().into_string() { let now = Utc::now(); let timestamp = DateTime::::from_utc( NaiveDateTime::from_timestamp_opt(i.unwrap().time as i64, 0).unwrap(), Utc, ); let diff = now - timestamp; if diff <= Duration::hours(1) { error!("Already crawled {}", &url.as_str()); return vec![]; } } } Err(e) => error!("{}", e), } } let mut resp = reqwest::get(url).await; match resp { Ok(v) => { let page = Page { url: v.url().to_owned(), status: v.status().as_u16(), last_fetched: Utc::now(), body: v.text().await.unwrap(), }; if page.status >= 400 { error!("{:?} Error for {}", &page.status, &page.url); return vec![]; } else { info!("Crawled {:?}: {:?}", &page.url.as_str(), &page.status); let encoded_page: Vec = bincode::serialize(&page).unwrap(); cacache::write(cache_dir.as_path(), &page.url.as_str(), encoded_page) .await .unwrap(); return find_links(&page.body, &page.url, allow).await; // let data = cacache::read(cache_dir, &page.url.as_str()).await.unwrap(); // let decoded_page: Page = bincode::deserialize(&data).unwrap(); } } Err(e) => { error!("Could not get url: {}", e); return vec![]; } } } async fn find_links(html: &str, base: &Url, allow: Vec) -> Vec { let document = Html::parse_document(html); let selector = Selector::parse("a").unwrap(); let mut links: Vec = Vec::default(); for element in document.select(&selector) { let href = element.value().attr("href").unwrap(); let url = base.join(href).unwrap(); // if &url == base { // break; // } info!("Found url: {}", &url.as_str()); for x in &allow { match &url.domain() { Some(d) => { if &x.domain().unwrap() == d { links.push(url); break; } } None => {} } } } return links; } async fn insert_db(pool: &SqlitePool, page: &Page) { let mut conn = pool.acquire().await.unwrap(); let url = page.url.clone().into_string(); let timestamp = page.last_fetched.clone().timestamp(); let body = page.body.clone(); let safe_html = clean(&*body); let id = sqlx::query!( r#" REPLACE INTO cached_urls ( last_fetched, url, body ) VALUES ( ?1, ?2, ?3 ) "#, timestamp, url, safe_html, ) .execute(&mut *conn) .await .unwrap() .last_insert_rowid(); }