blob: 4eb6134f3cb29d9f3a74ce71f07cedcce987f84e [file] [log] [blame]
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#include <typeinfo>
#include <assert.h>
#include <memory>
#include <iostream>
#include <stdint.h>
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include <rocksdb/merge_operator.h>
#include "rocksdb/utilities/db_ttl.h"
#include "merge_operators.h"
#define IS_BIG_ENDIAN (*(uint16_t *)"\0\xff" < 0x100)
#define encodeCollocation(w1, w2, dist) (((uint64_t)dist << 56) | ((uint64_t)w2 << 24) | w1)
#define W1(key) (uint64_t)(key & 0xffffff)
#define W2(key) (uint64_t)((key >> 24) & 0xffffff)
#define DIST(key) (int8_t)((uint64_t)((key >> 56) & 0xff))
using namespace rocksdb;
namespace {
size_t num_merge_operator_calls;
void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; }
size_t num_partial_merge_calls;
void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; }
}
inline void EncodeFixed64(char* buf, uint64_t value) {
if (! IS_BIG_ENDIAN) {
memcpy(buf, &value, sizeof(value));
} else {
buf[0] = value & 0xff;
buf[1] = (value >> 8) & 0xff;
buf[2] = (value >> 16) & 0xff;
buf[3] = (value >> 24) & 0xff;
buf[4] = (value >> 32) & 0xff;
buf[5] = (value >> 40) & 0xff;
buf[6] = (value >> 48) & 0xff;
buf[7] = (value >> 56) & 0xff;
}
}
inline uint32_t DecodeFixed32(const char* ptr) {
if (! IS_BIG_ENDIAN) {
// Load the raw bytes
uint32_t result;
memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load
return result;
} else {
return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))
| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)
| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)
| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));
}
}
inline uint64_t DecodeFixed64(const char* ptr) {
if (! IS_BIG_ENDIAN) {
// Load the raw bytes
uint64_t result;
memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load
return result;
} else {
uint64_t lo = DecodeFixed32(ptr);
uint64_t hi = DecodeFixed32(ptr + 4);
return (hi << 32) | lo;
}
}
class CountMergeOperator : public AssociativeMergeOperator {
public:
CountMergeOperator() {
mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
}
virtual bool Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const override {
assert(new_value->empty());
++num_merge_operator_calls;
if (existing_value == nullptr) {
new_value->assign(value.data(), value.size());
return true;
}
return mergeOperator_->PartialMerge(
key,
*existing_value,
value,
new_value,
logger);
}
virtual const char* Name() const override {
return "UInt64AddOperator";
}
private:
std::shared_ptr<MergeOperator> mergeOperator_;
};
namespace {
} // namespace
class CollocatorIterator : public Iterator {
private:
char prefixc[sizeof(uint64_t)];
Iterator *base_iterator_;
public:
explicit CollocatorIterator(Iterator* base_iterator)
: base_iterator_(base_iterator)
{}
void setPrefix(char *prefix) {
memcpy(prefixc, prefix, sizeof(uint64_t));
}
virtual void SeekToFirst() { base_iterator_->SeekToFirst(); }
virtual void SeekToLast() { base_iterator_->SeekToLast(); }
virtual void Seek(const rocksdb::Slice& s) { base_iterator_->Seek(s); }
virtual void Prev() { base_iterator_->Prev(); }
virtual void Next() { base_iterator_->Next(); }
virtual Slice key() const { return base_iterator_->key(); }
virtual Slice value() const { return base_iterator_->value(); }
virtual Status status() const { return base_iterator_->status(); }
virtual bool Valid() const override {
return base_iterator_->Valid() && key().starts_with(std::string(prefixc,3));
}
};
class Collocators {
private:
WriteOptions merge_option_; // for merge
char _one[sizeof(uint64_t)];
Slice _one_slice;
protected:
std::shared_ptr<DB> db_;
WriteOptions put_option_;
ReadOptions get_option_;
WriteOptions delete_option_;
uint64_t default_;
std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
const size_t max_successive_merges = 0) {
DB* db;
Options options;
options.create_if_missing = true;
options.merge_operator = std::make_shared<CountMergeOperator>();
options.max_successive_merges = max_successive_merges;
Status s;
// DestroyDB(dbname, Options());
s = DB::Open(options, dbname, &db);
if (!s.ok()) {
std::cerr << s.ToString() << std::endl;
assert(false);
}
return std::shared_ptr<DB>(db);
}
public:
explicit Collocators(const std::string& db_name)
: put_option_(),
get_option_(),
delete_option_(),
merge_option_()
{
db_ = OpenDb(db_name, false, 0);
assert(db_);
uint64_t one = 1;
EncodeFixed64(_one, one);
_one_slice = Slice(_one, sizeof(uint64_t));
}
virtual ~Collocators() {}
// public interface of Collocators.
// All four functions return false
// if the underlying level db operation failed.
// mapped to a levedb Put
bool set(const std::string& key, uint64_t value) {
// just treat the internal rep of int64 as the string
char buf[sizeof(value)];
EncodeFixed64(buf, value);
Slice slice(buf, sizeof(value));
auto s = db_->Put(put_option_, key, slice);
if (s.ok()) {
return true;
} else {
std::cerr << s.ToString() << std::endl;
return false;
}
}
DB *getDb() {
return db_.get();
}
// mapped to a rocksdb Delete
bool remove(const std::string& key) {
auto s = db_->Delete(delete_option_, key);
if (s.ok()) {
return true;
} else {
std::cerr << s.ToString() << std::endl;
return false;
}
}
// mapped to a rocksdb Get
bool get(const std::string& key, uint64_t* value) {
std::string str;
auto s = db_->Get(get_option_, key, &str);
if (s.IsNotFound()) {
// return default value if not found;
*value = default_;
return true;
} else if (s.ok()) {
// deserialization
if (str.size() != sizeof(uint64_t)) {
std::cerr << "value corruption\n";
return false;
}
*value = DecodeFixed64(&str[0]);
return true;
} else {
std::cerr << s.ToString() << std::endl;
return false;
}
}
uint64_t get(const uint32_t w1, const uint32_t w2, const int8_t dist) {
char encoded_key[sizeof(uint64_t)];
EncodeFixed64(encoded_key, encodeCollocation(w1,w2,dist));
uint64_t value = default_;
get(std::string(encoded_key, 8), &value);
return value;
}
virtual void inc(const std::string& key) {
db_->Merge(merge_option_, key, _one_slice);
}
virtual void inc(const uint64_t key) {
char encoded_key[sizeof(uint64_t)];
EncodeFixed64(encoded_key, key);
db_->Merge(merge_option_, std::string(encoded_key, 8), _one_slice);
}
virtual void inc(const uint32_t w1, const uint32_t w2, const uint8_t dist) {
inc(encodeCollocation(w1, w2, dist));
}
// mapped to a rocksdb Merge operation
virtual bool add(const std::string& key, uint64_t value) {
char encoded[sizeof(uint64_t)];
EncodeFixed64(encoded, value);
Slice slice(encoded, sizeof(uint64_t));
auto s = db_->Merge(merge_option_, key, slice);
if (s.ok()) {
return true;
} else {
std::cerr << s.ToString() << std::endl;
return false;
}
}
virtual CollocatorIterator* SeekIterator(uint64_t w1, uint64_t w2, int8_t dist) {
ReadOptions options;
options.prefix_same_as_start = true;
char prefixc[sizeof(uint64_t)];
EncodeFixed64(prefixc, encodeCollocation(w1, w2, dist));
Iterator *it = db_->NewIterator(options);
CollocatorIterator *cit = new CollocatorIterator(it);
cit->Seek(std::string(prefixc,3));// it->Valid() && it->key().starts_with(std::string(prefixc,3)); it->Next()) {
cit->setPrefix(prefixc);
return cit;
}
};
namespace {
void dumpDb(Collocators counters) {
auto it = std::unique_ptr<CollocatorIterator>(counters.SeekIterator(1000,0,0));
for (; it->Valid(); it->Next()) {
uint64_t value = DecodeFixed64(it->value().data());
uint64_t key = DecodeFixed64(it->key().data());
std::cout << "w1:" << W1(key) << ", w2:" << W2(key) << ", dist:" << (int32_t) DIST(key) << " - count:" << value << std::endl;
}
assert(it->status().ok()); // Check for any errors found during the scan
}
void testCollocators(Collocators& counters) {
counters.inc(100,200,5);
counters.inc(1000,2000,-5);
counters.inc(1000,2000,5);
counters.inc(1000,2500,-3);
counters.inc(1000,2500,4);
counters.inc(1000,2900,3);
counters.inc(1001,2900,3);
for(int i=0; i<10000; i++)
counters.inc(rand()%1010,rand()%1010,rand()%10-5);
// dumpDb(db);
counters.inc(100,200,5);
counters.inc(1000,2000,5);
counters.inc(1000,2500,4);
counters.inc(1000,2900,3);
counters.inc(1001,2900,3);
dumpDb(counters);
}
void runTest(int argc, const std::string& dbname, const bool use_ttl = false) {
std::cout << "Test merge-based counters... \n";
Collocators counters(dbname);
testCollocators(counters);
}
} // namespace
int main(int argc, char *argv[]) {
runTest(argc, "/tmp/merge_testdb");
printf("Passed all tests!\n");
return 0;
}