From 69df9a0145ed5743b37ce44ece87bbbb69eb6a1c Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Mon, 30 Aug 2021 20:18:48 +0200 Subject: [PATCH] add tls --- Cargo.lock | 1 + Cargo.toml | 1 + src/database/abstraction/sqlite.rs | 44 ++++++++++-------------------- 3 files changed, 17 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 880829a..03b1731 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -267,6 +267,7 @@ dependencies = [ "serde_yaml", "sled", "thiserror", + "thread_local", "threadpool", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 034f94b..bb44d08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ crossbeam = { version = "0.8.1", optional = true } num_cpus = "1.13.0" threadpool = "1.8.1" heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true } +thread_local = "1.1.3" [features] default = ["conduit_bin", "backend_sqlite"] diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index f17eac9..99deeba 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -10,6 +10,7 @@ use std::{ pin::Pin, sync::Arc, }; +use thread_local::ThreadLocal; use tokio::sync::oneshot::Sender; use tracing::debug; @@ -40,6 +41,8 @@ impl Drop for NonAliasingBox { pub struct Engine { writer: Mutex, + read_conn_tls: ThreadLocal, + read_iterator_conn_tls: ThreadLocal, path: PathBuf, cache_size_per_thread: u32, @@ -62,34 +65,14 @@ impl Engine { self.writer.lock() } - fn read_lock(&self) -> &'static Connection { - READ_CONNECTION.with(|cell| { - let connection = &mut cell.borrow_mut(); - - if (*connection).is_none() { - let c = Box::leak(Box::new( - Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap(), - )); - **connection = Some(c); - } - - connection.unwrap() - }) + fn read_lock<'a>(&'a self) -> &'a Connection { + self.read_conn_tls + .get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap()) } - fn read_lock_iterator(&self) -> &'static Connection { - READ_CONNECTION_ITERATOR.with(|cell| { - let connection = &mut cell.borrow_mut(); - - if (*connection).is_none() { - let c = Box::leak(Box::new( - Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap(), - )); - **connection = Some(c); - } - - connection.unwrap() - }) + fn read_lock_iterator<'a>(&'a self) -> &'a Connection { + self.read_iterator_conn_tls + .get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap()) } pub fn flush_wal(self: &Arc) -> Result<()> { @@ -105,15 +88,18 @@ impl DatabaseEngine for Engine { // calculates cache-size per permanent connection // 1. convert MB to KiB - // 2. divide by permanent connections + // 2. divide by permanent connections + permanent iter connections + write connection // 3. round down to nearest integer - let cache_size_per_thread: u32 = - ((config.db_cache_capacity_mb * 1024.0) / (num_cpus::get().max(1) + 1) as f64) as u32; + let cache_size_per_thread: u32 = ((config.db_cache_capacity_mb * 1024.0) + / ((num_cpus::get().max(1) * 2) + 1) as f64) + as u32; let writer = Mutex::new(Self::prepare_conn(&path, cache_size_per_thread)?); let arc = Arc::new(Engine { writer, + read_conn_tls: ThreadLocal::new(), + read_iterator_conn_tls: ThreadLocal::new(), path, cache_size_per_thread, });