collocatordb: start for a rocksdb-based collocator db
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..99086b1
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,20 @@
+PLATFORM_CCFLAGS= -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DOS_LINUX -fno-builtin-memcmp -DROCKSDB_FALLOCATE_PRESENT -DSNAPPY -DGFLAGS=1 -DZLIB -DBZIP2 -DLZ4 -DZSTD -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_PTHREAD_ADAPTIVE_MUTEX -DROCKSDB_BACKTRACE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -march=native -DROCKSDB_SUPPORT_THREAD_LOCAL
+PLATFORM_CXXFLAGS=-std=c++11 -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DOS_LINUX -fno-builtin-memcmp -DROCKSDB_FALLOCATE_PRESENT -DSNAPPY -DGFLAGS=1 -DZLIB -DBZIP2 -DLZ4 -DZSTD -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_PTHREAD_ADAPTIVE_MUTEX -DROCKSDB_BACKTRACE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -march=native -DROCKSDB_SUPPORT_THREAD_LOCAL
+PLATFORM=OS_LINUX
+PLATFORM_LDFLAGS= -lpthread -lrt -lsnappy -lgflags -lz -lbz2 -llz4 -lzstd
+
+ifneq ($(USE_RTTI), 1)
+ CXXFLAGS += -fno-rtti
+endif
+LIB_SOURCES = collocatordb.cc
+
+LIBOBJECTS = $(LIB_SOURCES:.cc=.o)
+
+collocatordb: collocatordb.cc
+ $(CXX) $(CXXFLAGS) -L/usr/local/lib $@.cc -o$@ -lrocksdb -I/usr/local/include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
+
+libcollocatordb.a: $(LIBOBJECTS)
+ $(AM_V_at)$(AR) $(ARFLAGS) $@ $(LIBOBJECTS)
+
+.cc.o:
+ $(CXX) $(CXXFLAGS) -c $< -o$@ -I/usr/local/include -O2 -std=c++11 $(PLATFORM_CXXFLAGS)
diff --git a/collocatordb.cc b/collocatordb.cc
new file mode 100644
index 0000000..b4c7679
--- /dev/null
+++ b/collocatordb.cc
@@ -0,0 +1,326 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#include <typeinfo>
+#include <assert.h>
+#include <memory>
+#include <iostream>
+#include <stdint.h>
+#include "rocksdb/cache.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include <rocksdb/merge_operator.h>
+#include "rocksdb/utilities/db_ttl.h"
+#include "merge_operators.h"
+
+#define IS_BIG_ENDIAN (*(uint16_t *)"\0\xff" < 0x100)
+#define encodeCollocation(w1, w2, dist) (((uint64_t)dist << 56) | ((uint64_t)w2 << 24) | w1)
+using namespace rocksdb;
+
+namespace {
+ size_t num_merge_operator_calls;
+ void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; }
+
+ size_t num_partial_merge_calls;
+ void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; }
+}
+
+inline void EncodeFixed64(char* buf, uint64_t value) {
+ if (! IS_BIG_ENDIAN) {
+ memcpy(buf, &value, sizeof(value));
+ } else {
+ buf[0] = value & 0xff;
+ buf[1] = (value >> 8) & 0xff;
+ buf[2] = (value >> 16) & 0xff;
+ buf[3] = (value >> 24) & 0xff;
+ buf[4] = (value >> 32) & 0xff;
+ buf[5] = (value >> 40) & 0xff;
+ buf[6] = (value >> 48) & 0xff;
+ buf[7] = (value >> 56) & 0xff;
+ }
+}
+
+inline uint32_t DecodeFixed32(const char* ptr) {
+ if (! IS_BIG_ENDIAN) {
+ // Load the raw bytes
+ uint32_t result;
+ memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load
+ return result;
+ } else {
+ return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))
+ | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)
+ | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)
+ | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));
+ }
+}
+
+inline uint64_t DecodeFixed64(const char* ptr) {
+ if (! IS_BIG_ENDIAN) {
+ // Load the raw bytes
+ uint64_t result;
+ memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load
+ return result;
+ } else {
+ uint64_t lo = DecodeFixed32(ptr);
+ uint64_t hi = DecodeFixed32(ptr + 4);
+ return (hi << 32) | lo;
+ }
+}
+
+
+class CountMergeOperator : public AssociativeMergeOperator {
+public:
+ CountMergeOperator() {
+ mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
+ }
+
+ virtual bool Merge(const Slice& key,
+ const Slice* existing_value,
+ const Slice& value,
+ std::string* new_value,
+ Logger* logger) const override {
+ assert(new_value->empty());
+ ++num_merge_operator_calls;
+ if (existing_value == nullptr) {
+ new_value->assign(value.data(), value.size());
+ return true;
+ }
+
+ return mergeOperator_->PartialMerge(
+ key,
+ *existing_value,
+ value,
+ new_value,
+ logger);
+ }
+
+ virtual const char* Name() const override {
+ return "UInt64AddOperator";
+ }
+
+private:
+ std::shared_ptr<MergeOperator> mergeOperator_;
+};
+
+namespace {
+} // namespace
+
+class CollocatorIterator : public Iterator {
+public:
+ explicit CollocatorIterator() {
+ }
+};
+
+class Collocators {
+private:
+ WriteOptions merge_option_; // for merge
+ char _one[sizeof(uint64_t)];
+ Slice _one_slice;
+
+protected:
+ std::shared_ptr<DB> db_;
+
+ WriteOptions put_option_;
+ ReadOptions get_option_;
+ WriteOptions delete_option_;
+
+ uint64_t default_;
+
+ std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
+ const size_t max_successive_merges = 0) {
+ DB* db;
+ Options options;
+ options.create_if_missing = true;
+ options.merge_operator = std::make_shared<CountMergeOperator>();
+ options.max_successive_merges = max_successive_merges;
+ Status s;
+ // DestroyDB(dbname, Options());
+ s = DB::Open(options, dbname, &db);
+ if (!s.ok()) {
+ std::cerr << s.ToString() << std::endl;
+ assert(false);
+ }
+ return std::shared_ptr<DB>(db);
+ }
+
+public:
+ explicit Collocators(const std::string& db_name)
+ : put_option_(),
+ get_option_(),
+ delete_option_(),
+ merge_option_() {
+ db_ = OpenDb(db_name, false, 0);
+ assert(db_);
+ uint64_t one = 1;
+ EncodeFixed64(_one, one);
+ _one_slice = Slice(_one, sizeof(uint64_t));
+ }
+
+ virtual ~Collocators() {}
+
+ // public interface of Collocators.
+ // All four functions return false
+ // if the underlying level db operation failed.
+
+ // mapped to a levedb Put
+ bool set(const std::string& key, uint64_t value) {
+ // just treat the internal rep of int64 as the string
+ char buf[sizeof(value)];
+ EncodeFixed64(buf, value);
+ Slice slice(buf, sizeof(value));
+ auto s = db_->Put(put_option_, key, slice);
+
+ if (s.ok()) {
+ return true;
+ } else {
+ std::cerr << s.ToString() << std::endl;
+ return false;
+ }
+ }
+
+ DB *getDb() {
+ return db_.get();
+ }
+
+ // mapped to a rocksdb Delete
+ bool remove(const std::string& key) {
+ auto s = db_->Delete(delete_option_, key);
+
+ if (s.ok()) {
+ return true;
+ } else {
+ std::cerr << s.ToString() << std::endl;
+ return false;
+ }
+ }
+
+ // mapped to a rocksdb Get
+ bool get(const std::string& key, uint64_t* value) {
+ std::string str;
+ auto s = db_->Get(get_option_, key, &str);
+
+ if (s.IsNotFound()) {
+ // return default value if not found;
+ *value = default_;
+ return true;
+ } else if (s.ok()) {
+ // deserialization
+ if (str.size() != sizeof(uint64_t)) {
+ std::cerr << "value corruption\n";
+ return false;
+ }
+ *value = DecodeFixed64(&str[0]);
+ return true;
+ } else {
+ std::cerr << s.ToString() << std::endl;
+ return false;
+ }
+ }
+
+
+ uint64_t get(const uint32_t w1, const uint32_t w2, const int8_t dist) {
+ char encoded_key[sizeof(uint64_t)];
+ EncodeFixed64(encoded_key, encodeCollocation(w1,w2,dist));
+ uint64_t value = default_;
+ int result = get(std::string(encoded_key, 8), &value);
+ return value;
+ }
+
+ virtual void inc(const std::string& key) {
+ db_->Merge(merge_option_, key, _one_slice);
+ }
+
+ virtual void inc(const uint64_t key) {
+ char encoded_key[sizeof(uint64_t)];
+ EncodeFixed64(encoded_key, key);
+ db_->Merge(merge_option_, std::string(encoded_key, 8), _one_slice);
+ }
+
+ virtual void inc(const uint32_t w1, const uint32_t w2, const uint8_t dist) {
+ inc(encodeCollocation(w1, w2, dist));
+ }
+
+ // mapped to a rocksdb Merge operation
+ virtual bool add(const std::string& key, uint64_t value) {
+ char encoded[sizeof(uint64_t)];
+ EncodeFixed64(encoded, value);
+ Slice slice(encoded, sizeof(uint64_t));
+ auto s = db_->Merge(merge_option_, key, slice);
+
+ if (s.ok()) {
+ return true;
+ } else {
+ std::cerr << s.ToString() << std::endl;
+ return false;
+ }
+ }
+
+};
+
+namespace {
+ void dumpDb(DB* db) {
+ char prefixc[sizeof(uint64_t)];
+ EncodeFixed64(prefixc, encodeCollocation(1000,0,0));
+ ReadOptions options;
+ options.prefix_same_as_start = true;
+ auto it = unique_ptr<Iterator>(db->NewIterator(options));
+
+ for (it->Seek(std::string(prefixc,3)); it->Valid() && it->key().starts_with(std::string(prefixc,3)); it->Next()) {
+ uint64_t value = DecodeFixed64(it->value().data());
+ uint64_t key = DecodeFixed64(it->key().data());
+ uint64_t w1 = (uint64_t)(key & 0xffffff);
+ uint64_t w2 = (uint64_t)((key >> 24) & 0xffffff);
+ int8_t dist = (uint64_t)((key >> 56) & 0xff);
+ std::cout << "w1:" << w1 << ", w2:" << w2 << ", dist:" << (int32_t) dist << " - count:" << value << std::endl;
+ }
+
+
+ assert(it->status().ok()); // Check for any errors found during the scan
+ }
+
+ void testCollocators(Collocators& counters) {
+
+ FlushOptions o;
+ DB *db = counters.getDb();
+
+ o.wait = true;
+
+ counters.inc(100,200,5);
+ counters.inc(1000,2000,-5);
+ counters.inc(1000,2000,5);
+ counters.inc(1000,2500,-3);
+ counters.inc(1000,2500,4);
+ counters.inc(1000,2900,3);
+
+ counters.inc(1001,2900,3);
+
+ for(int i=0; i<1000000; i++)
+ counters.inc(rand()%1000,rand()%1000,5);
+
+ // dumpDb(db);
+
+ counters.inc(100,200,5);
+ counters.inc(1000,2000,5);
+ counters.inc(1000,2500,4);
+ counters.inc(1000,2900,3);
+
+ counters.inc(1001,2900,3);
+
+ dumpDb(db);
+ }
+
+ void runTest(int argc, const std::string& dbname, const bool use_ttl = false) {
+ std::cout << "Test merge-based counters... \n";
+ Collocators counters(dbname);
+ testCollocators(counters);
+ }
+} // namespace
+
+int main(int argc, char *argv[]) {
+ runTest(argc, "/tmp/merge_testdb");
+ printf("Passed all tests!\n");
+ return 0;
+}
diff --git a/merge_operators.h b/merge_operators.h
new file mode 100644
index 0000000..602a4d0
--- /dev/null
+++ b/merge_operators.h
@@ -0,0 +1,51 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#ifndef MERGE_OPERATORS_H
+#define MERGE_OPERATORS_H
+
+#include <memory>
+#include <stdio.h>
+
+#include "rocksdb/merge_operator.h"
+
+namespace rocksdb {
+
+class MergeOperators {
+ public:
+ static std::shared_ptr<MergeOperator> CreatePutOperator();
+ static std::shared_ptr<MergeOperator> CreateDeprecatedPutOperator();
+ static std::shared_ptr<MergeOperator> CreateUInt64AddOperator();
+ static std::shared_ptr<MergeOperator> CreateStringAppendOperator();
+ static std::shared_ptr<MergeOperator> CreateStringAppendTESTOperator();
+ static std::shared_ptr<MergeOperator> CreateMaxOperator();
+
+ // Will return a different merge operator depending on the string.
+ // TODO: Hook the "name" up to the actual Name() of the MergeOperators?
+ static std::shared_ptr<MergeOperator> CreateFromStringId(
+ const std::string& name) {
+ if (name == "put") {
+ return CreatePutOperator();
+ } else if (name == "put_v1") {
+ return CreateDeprecatedPutOperator();
+ } else if ( name == "uint64add") {
+ return CreateUInt64AddOperator();
+ } else if (name == "stringappend") {
+ return CreateStringAppendOperator();
+ } else if (name == "stringappendtest") {
+ return CreateStringAppendTESTOperator();
+ } else if (name == "max") {
+ return CreateMaxOperator();
+ } else {
+ // Empty or unknown, just return nullptr
+ return nullptr;
+ }
+ }
+
+};
+
+} // namespace rocksdb
+
+#endif