improvement: better config, better logs
parent
df16b2ba98
commit
edfd3c1f34
|
@ -78,10 +78,10 @@ where
|
||||||
|
|
||||||
if status != 200 {
|
if status != 200 {
|
||||||
warn!(
|
warn!(
|
||||||
"Server returned bad response {} ({}): {} {:?}",
|
"Appservice returned bad response {} {}\n{}\n{:?}",
|
||||||
destination,
|
destination,
|
||||||
url,
|
|
||||||
status,
|
status,
|
||||||
|
url,
|
||||||
utils::string_from_bytes(&body)
|
utils::string_from_bytes(&body)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,7 @@ where
|
||||||
);
|
);
|
||||||
response.map_err(|_| {
|
response.map_err(|_| {
|
||||||
warn!(
|
warn!(
|
||||||
"Server returned invalid response bytes {} ({})",
|
"Appservice returned invalid response bytes {}\n{}",
|
||||||
destination, url
|
destination, url
|
||||||
);
|
);
|
||||||
Error::BadServerResponse("Server returned bad response.")
|
Error::BadServerResponse("Server returned bad response.")
|
||||||
|
|
|
@ -77,7 +77,7 @@ pub async fn search_events_route(
|
||||||
|
|
||||||
Ok(search_events::Response::new(ResultCategories {
|
Ok(search_events::Response::new(ResultCategories {
|
||||||
room_events: ResultRoomEvents {
|
room_events: ResultRoomEvents {
|
||||||
count: None, // TODO? maybe not
|
count: Some((results.len() as u32).into()), // TODO: set this to none. Element shouldn't depend on it
|
||||||
groups: BTreeMap::new(), // TODO
|
groups: BTreeMap::new(), // TODO
|
||||||
next_batch,
|
next_batch,
|
||||||
results,
|
results,
|
||||||
|
|
|
@ -18,15 +18,14 @@ use rocket::futures::{self, channel::mpsc};
|
||||||
use ruma::{DeviceId, ServerName, UserId};
|
use ruma::{DeviceId, ServerName, UserId};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::fs::remove_dir_all;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::{convert::TryInto, fs::remove_dir_all};
|
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
#[derive(Clone, Deserialize)]
|
#[derive(Clone, Deserialize)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
#[serde(default = "default_server_name")]
|
|
||||||
server_name: Box<ServerName>,
|
server_name: Box<ServerName>,
|
||||||
database_path: Option<String>,
|
database_path: String,
|
||||||
#[serde(default = "default_cache_capacity")]
|
#[serde(default = "default_cache_capacity")]
|
||||||
cache_capacity: u64,
|
cache_capacity: u64,
|
||||||
#[serde(default = "default_max_request_size")]
|
#[serde(default = "default_max_request_size")]
|
||||||
|
@ -41,12 +40,6 @@ pub struct Config {
|
||||||
federation_enabled: bool,
|
federation_enabled: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_server_name() -> Box<ServerName> {
|
|
||||||
"localhost"
|
|
||||||
.try_into()
|
|
||||||
.expect("localhost is valid servername")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn default_cache_capacity() -> u64 {
|
fn default_cache_capacity() -> u64 {
|
||||||
1024 * 1024 * 1024
|
1024 * 1024 * 1024
|
||||||
}
|
}
|
||||||
|
@ -90,31 +83,13 @@ impl Database {
|
||||||
|
|
||||||
/// Load an existing database or create a new one.
|
/// Load an existing database or create a new one.
|
||||||
pub async fn load_or_create(config: Config) -> Result<Self> {
|
pub async fn load_or_create(config: Config) -> Result<Self> {
|
||||||
let path = config
|
|
||||||
.database_path
|
|
||||||
.clone()
|
|
||||||
.map(Ok::<_, Error>)
|
|
||||||
.unwrap_or_else(|| {
|
|
||||||
let path = ProjectDirs::from("xyz", "koesters", "conduit")
|
|
||||||
.ok_or_else(|| {
|
|
||||||
Error::bad_config("The OS didn't return a valid home directory path.")
|
|
||||||
})?
|
|
||||||
.data_dir()
|
|
||||||
.join(config.server_name.as_str());
|
|
||||||
|
|
||||||
Ok(path
|
|
||||||
.to_str()
|
|
||||||
.ok_or_else(|| Error::bad_config("Database path contains invalid unicode."))?
|
|
||||||
.to_owned())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let db = sled::Config::default()
|
let db = sled::Config::default()
|
||||||
.path(&path)
|
.path(&config.database_path)
|
||||||
.cache_capacity(config.cache_capacity)
|
.cache_capacity(config.cache_capacity)
|
||||||
.print_profile_on_drop(true)
|
.print_profile_on_drop(false)
|
||||||
.open()?;
|
.open()?;
|
||||||
|
|
||||||
info!("Opened sled database at {}", path);
|
info!("Opened sled database at {}", config.database_path);
|
||||||
|
|
||||||
let (admin_sender, admin_receiver) = mpsc::unbounded();
|
let (admin_sender, admin_receiver) = mpsc::unbounded();
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,7 @@ impl Admin {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
if conduit_room.is_none() {
|
if conduit_room.is_none() {
|
||||||
warn!("Conduit instance does not have an #admins room. Logging to that room will not work.");
|
warn!("Conduit instance does not have an #admins room. Logging to that room will not work. Restart Conduit after creating a user to fix this.");
|
||||||
}
|
}
|
||||||
|
|
||||||
let send_message = |message: message::MessageEventContent| {
|
let send_message = |message: message::MessageEventContent| {
|
||||||
|
|
|
@ -8,7 +8,7 @@ use std::{
|
||||||
|
|
||||||
use crate::{appservice_server, server_server, utils, Error, PduEvent, Result};
|
use crate::{appservice_server, server_server, utils, Error, PduEvent, Result};
|
||||||
use federation::transactions::send_transaction_message;
|
use federation::transactions::send_transaction_message;
|
||||||
use log::warn;
|
use log::info;
|
||||||
use rocket::futures::stream::{FuturesUnordered, StreamExt};
|
use rocket::futures::stream::{FuturesUnordered, StreamExt};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::{appservice, federation, OutgoingRequest},
|
api::{appservice, federation, OutgoingRequest},
|
||||||
|
@ -123,7 +123,7 @@ impl Sending {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err((server, is_appservice, e)) => {
|
Err((server, is_appservice, e)) => {
|
||||||
warn!("Couldn't send transaction to {}: {}", server, e);
|
info!("Couldn't send transaction to {}\n{}", server, e);
|
||||||
let mut prefix = if is_appservice {
|
let mut prefix = if is_appservice {
|
||||||
"+".as_bytes().to_vec()
|
"+".as_bytes().to_vec()
|
||||||
} else {
|
} else {
|
||||||
|
|
30
src/error.rs
30
src/error.rs
|
@ -121,29 +121,45 @@ impl log::Log for ConduitLogger {
|
||||||
fn log(&self, record: &log::Record<'_>) {
|
fn log(&self, record: &log::Record<'_>) {
|
||||||
let output = format!("{} - {}", record.level(), record.args());
|
let output = format!("{} - {}", record.level(), record.args());
|
||||||
|
|
||||||
eprintln!("{}", output);
|
|
||||||
|
|
||||||
if self.enabled(record.metadata())
|
if self.enabled(record.metadata())
|
||||||
&& record
|
&& (record
|
||||||
.module_path()
|
.module_path()
|
||||||
.map_or(false, |path| path.starts_with("conduit::"))
|
.map_or(false, |path| path.starts_with("conduit::"))
|
||||||
|
|| record
|
||||||
|
.module_path()
|
||||||
|
.map_or(true, |path| !path.starts_with("rocket::")) // Rockets logs are annoying
|
||||||
|
&& record.metadata().level() <= log::Level::Warn)
|
||||||
{
|
{
|
||||||
|
let first_line = output
|
||||||
|
.lines()
|
||||||
|
.next()
|
||||||
|
.expect("lines always returns one item");
|
||||||
|
|
||||||
|
eprintln!("{}", output);
|
||||||
|
|
||||||
|
let mute_duration = match record.metadata().level() {
|
||||||
|
log::Level::Error => Duration::from_secs(60 * 5), // 5 minutes
|
||||||
|
log::Level::Warn => Duration::from_secs(60 * 60 * 24), // A day
|
||||||
|
_ => Duration::from_secs(60 * 60 * 24 * 7), // A week
|
||||||
|
};
|
||||||
|
|
||||||
if self
|
if self
|
||||||
.last_logs
|
.last_logs
|
||||||
.read()
|
.read()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.get(&output)
|
.get(first_line)
|
||||||
.map_or(false, |i| i.elapsed() < Duration::from_secs(60 * 30))
|
.map_or(false, |i| i.elapsed() < mute_duration)
|
||||||
|
// Don't post this log again for some time
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Ok(mut_last_logs) = &mut self.last_logs.try_write() {
|
if let Ok(mut_last_logs) = &mut self.last_logs.try_write() {
|
||||||
mut_last_logs.insert(output.clone(), Instant::now());
|
mut_last_logs.insert(first_line.to_owned(), Instant::now());
|
||||||
}
|
}
|
||||||
|
|
||||||
self.db.admin.send(AdminCommand::SendMessage(
|
self.db.admin.send(AdminCommand::SendMessage(
|
||||||
message::MessageEventContent::text_plain(output),
|
message::MessageEventContent::notice_plain(output),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
27
src/main.rs
27
src/main.rs
|
@ -13,18 +13,32 @@ mod utils;
|
||||||
|
|
||||||
pub use database::Database;
|
pub use database::Database;
|
||||||
pub use error::{ConduitLogger, Error, Result};
|
pub use error::{ConduitLogger, Error, Result};
|
||||||
use log::LevelFilter;
|
|
||||||
pub use pdu::PduEvent;
|
pub use pdu::PduEvent;
|
||||||
pub use rocket::State;
|
pub use rocket::State;
|
||||||
pub use ruma_wrapper::{ConduitResult, Ruma, RumaResponse};
|
pub use ruma_wrapper::{ConduitResult, Ruma, RumaResponse};
|
||||||
|
|
||||||
|
use log::LevelFilter;
|
||||||
|
use rocket::figment::{
|
||||||
|
providers::{Env, Format, Toml},
|
||||||
|
Figment,
|
||||||
|
};
|
||||||
use rocket::{catch, catchers, fairing::AdHoc, routes, Request};
|
use rocket::{catch, catchers, fairing::AdHoc, routes, Request};
|
||||||
|
|
||||||
fn setup_rocket() -> rocket::Rocket {
|
fn setup_rocket() -> rocket::Rocket {
|
||||||
// Force log level off, so we can use our own logger
|
// Force log level off, so we can use our own logger
|
||||||
std::env::set_var("ROCKET_LOG_LEVEL", "off");
|
std::env::set_var("CONDUIT_LOG_LEVEL", "off");
|
||||||
|
|
||||||
rocket::ignite()
|
let config =
|
||||||
|
Figment::from(rocket::Config::release_default())
|
||||||
|
.merge(
|
||||||
|
Toml::file(Env::var("CONDUIT_CONFIG").expect(
|
||||||
|
"The CONDUIT_CONFIG env var needs to be set. Example: /etc/conduit.toml",
|
||||||
|
))
|
||||||
|
.nested(),
|
||||||
|
)
|
||||||
|
.merge(Env::prefixed("CONDUIT_").global());
|
||||||
|
|
||||||
|
rocket::custom(config)
|
||||||
.mount(
|
.mount(
|
||||||
"/",
|
"/",
|
||||||
routes![
|
routes![
|
||||||
|
@ -137,8 +151,11 @@ fn setup_rocket() -> rocket::Rocket {
|
||||||
)
|
)
|
||||||
.register(catchers![not_found_catcher])
|
.register(catchers![not_found_catcher])
|
||||||
.attach(AdHoc::on_attach("Config", |rocket| async {
|
.attach(AdHoc::on_attach("Config", |rocket| async {
|
||||||
let data =
|
let config = rocket
|
||||||
Database::load_or_create(rocket.figment().extract().expect("config is valid"))
|
.figment()
|
||||||
|
.extract()
|
||||||
|
.expect("It looks like your config is invalid. Please take a look at the error");
|
||||||
|
let data = Database::load_or_create(config)
|
||||||
.await
|
.await
|
||||||
.expect("config is valid");
|
.expect("config is valid");
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma};
|
use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma};
|
||||||
use get_profile_information::v1::ProfileField;
|
use get_profile_information::v1::ProfileField;
|
||||||
use http::header::{HeaderValue, AUTHORIZATION, HOST};
|
use http::header::{HeaderValue, AUTHORIZATION, HOST};
|
||||||
use log::warn;
|
use log::{info, warn};
|
||||||
use rocket::{get, post, put, response::content::Json, State};
|
use rocket::{get, post, put, response::content::Json, State};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::{
|
api::{
|
||||||
|
@ -95,7 +95,7 @@ where
|
||||||
ruma::signatures::sign_json(
|
ruma::signatures::sign_json(
|
||||||
globals.server_name().as_str(),
|
globals.server_name().as_str(),
|
||||||
globals.keypair(),
|
globals.keypair(),
|
||||||
dbg!(&mut request_json),
|
&mut request_json,
|
||||||
)
|
)
|
||||||
.expect("our request json is what ruma expects");
|
.expect("our request json is what ruma expects");
|
||||||
|
|
||||||
|
@ -161,18 +161,18 @@ where
|
||||||
.bytes()
|
.bytes()
|
||||||
.await
|
.await
|
||||||
.unwrap_or_else(|e| {
|
.unwrap_or_else(|e| {
|
||||||
warn!("server error: {}", e);
|
warn!("server error {}", e);
|
||||||
Vec::new().into()
|
Vec::new().into()
|
||||||
}) // TODO: handle timeout
|
}) // TODO: handle timeout
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
if status != 200 {
|
if status != 200 {
|
||||||
warn!(
|
info!(
|
||||||
"Server returned bad response {} ({}): {} {:?}",
|
"Server returned bad response {} {}\n{}\n{:?}",
|
||||||
destination,
|
destination,
|
||||||
url,
|
|
||||||
status,
|
status,
|
||||||
|
url,
|
||||||
utils::string_from_bytes(&body)
|
utils::string_from_bytes(&body)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -183,8 +183,8 @@ where
|
||||||
.expect("reqwest body is valid http body"),
|
.expect("reqwest body is valid http body"),
|
||||||
);
|
);
|
||||||
response.map_err(|_| {
|
response.map_err(|_| {
|
||||||
warn!(
|
info!(
|
||||||
"Server returned invalid response bytes {} ({})",
|
"Server returned invalid response bytes {}\n{}",
|
||||||
destination, url
|
destination, url
|
||||||
);
|
);
|
||||||
Error::BadServerResponse("Server returned bad response.")
|
Error::BadServerResponse("Server returned bad response.")
|
||||||
|
|
Loading…
Reference in New Issue