Increase writing performance
Increases write throughput
from 350kb/s to 550kb/s on compute2 and
from 560kb/s to 760kb/s on my office computer
both writing to ssd.
compute: 635kb/s
korap-worker-08: 590kb/s
Change-Id: I7c8f6c7777c811bf18b5fffdd12d1bd862107ebb
diff --git a/src/collocatordb.cc b/src/collocatordb.cc
index 57a01d9..b46d536 100644
--- a/src/collocatordb.cc
+++ b/src/collocatordb.cc
@@ -18,6 +18,7 @@
#include <rocksdb/slice_transform.h>
#include <sstream> // for ostringstream
#include <string>
+#include <thread>
#include <vector>
#define WINDOW_SIZE 5
@@ -515,44 +516,57 @@
return std::shared_ptr<DB>(db);
}
-std::shared_ptr<DB> CollocatorDB::OpenDb(const char *dbname) {
- DB *db;
- Options options;
+ std::shared_ptr<DB> CollocatorDB::OpenDb(const char *dbname) {
+ DB *db;
+ Options options;
- options.env->SetBackgroundThreads(4);
- options.create_if_missing = true;
- options.merge_operator = std::make_shared<CountMergeOperator>();
- options.max_successive_merges = 0;
- // options.prefix_extractor.reset(NewFixedPrefixTransform(8));
- options.IncreaseParallelism();
- options.OptimizeLevelStyleCompaction();
- // options.max_write_buffer_number = 48;
- // options.max_background_jobs = 48;
- // options.allow_concurrent_memtable_write=true;
- // options.memtable_factory.reset(NewHashLinkListRepFactory(200000));
- // options.enable_write_thread_adaptive_yield = 1;
- // options.allow_concurrent_memtable_write = 1;
- // options.memtable_factory.reset(new SkipListFactory);
- // options.write_buffer_size = 1 << 22;
- // options.allow_mmap_reads = true;
- // options.allow_mmap_writes = true;
- // options.max_background_compactions = 40;
- // BlockBasedTableOptions table_options;
- // table_options.filter_policy.reset(NewBloomFilterPolicy(24, false));
- // options.bloom_locality = 1;
- // std::shared_ptr<Cache> cache = NewLRUCache(512 * 1024 * 1024);
- // table_options.block_cache = cache;
- // options.table_factory.reset(NewBlockBasedTableFactory(table_options));
- Status s;
- // DestroyDB(dbname, Options());
- s = DB::Open(options, dbname, &db);
- if (!s.ok()) {
- std::cerr << s.ToString() << std::endl;
- assert(false);
+ int max_cores = static_cast<int>(std::thread::hardware_concurrency());
+
+ // options.env->SetBackgroundThreads(32, Env::Priority::HIGH); // Increase background threads for high priority
+ // options.env->SetBackgroundThreads(16, Env::Priority::LOW); // Increase background threads for low priority
+ options.create_if_missing = true;
+ options.merge_operator = std::make_shared<CountMergeOperator>();
+ //options.max_successive_merges = 0;
+ // options.IncreaseParallelism(max_cores); // Utilize all available cores
+ // options.OptimizeLevelStyleCompaction();
+
+ // Increase write buffer size and number of write buffers
+ // options.write_buffer_size = 512 * 1024 * 1024; // 512MB
+ // options.max_write_buffer_number = max_cores;
+ // options.min_write_buffer_number_to_merge = max_cores / 2;
+
+ // Enable concurrent memtable writes
+ options.allow_concurrent_memtable_write = true;
+ options.enable_write_thread_adaptive_yield = true;
+ options.allow_mmap_writes = true;
+ options.allow_mmap_reads = true;
+
+ // Optimize block cache size
+ BlockBasedTableOptions table_options;
+ table_options.block_cache = NewLRUCache(8 * 1024 * 1024 * 1024L); // 8GB block cache
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+
+ // Adjust compaction settings
+ options.level0_file_num_compaction_trigger = 100;
+ options.level0_slowdown_writes_trigger = 200;
+ options.level0_stop_writes_trigger = 400;
+ options.max_background_compactions = max_cores / 2;
+ options.max_background_flushes = max_cores / 4;
+ // options.disableWA
+ // Tune write options
+ merge_option_.low_pri = true; // Use low priority for compactions
+ merge_option_.disableWAL = true; // Disable Write-Ahead Logging for faster writes
+ merge_option_.sync = false; // Disable sync for faster writes
+ merge_option_.no_slowdown = true; // Disable write slowdown for faster writes
+
+ Status s = DB::Open(options, dbname, &db);
+ if (!s.ok()) {
+ std::cerr << s.ToString() << std::endl;
+ assert(false);
+ }
+ total = 1000;
+ return std::shared_ptr<DB>(db);
}
- total = 1000;
- return std::shared_ptr<DB>(db);
-}
CollocatorIterator *
CollocatorDB::SeekIterator(uint64_t w1, uint64_t w2, int8_t dist) const {