blob: d4745fdc1db0f74ace97fa6983ff8463dd24a1bf [file] [log] [blame]
Marc Kupietz28cc53e2017-12-23 17:24:55 +01001// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5//
6#include <typeinfo>
Marc Kupietz4b799e92018-01-02 11:04:56 +01007#define EXPORT __attribute__((visibility("visible")))
8#define IMPORT
Marc Kupietz28cc53e2017-12-23 17:24:55 +01009#include <assert.h>
10#include <memory>
11#include <iostream>
12#include <stdint.h>
13#include "rocksdb/cache.h"
14#include "rocksdb/comparator.h"
15#include "rocksdb/db.h"
16#include "rocksdb/env.h"
17#include <rocksdb/merge_operator.h>
18#include "rocksdb/utilities/db_ttl.h"
19#include "merge_operators.h"
20
Marc Kupietz06c9a9f2018-01-02 16:56:43 +010021
Marc Kupietz28cc53e2017-12-23 17:24:55 +010022#define IS_BIG_ENDIAN (*(uint16_t *)"\0\xff" < 0x100)
23#define encodeCollocation(w1, w2, dist) (((uint64_t)dist << 56) | ((uint64_t)w2 << 24) | w1)
Marc Kupietz18375e12017-12-24 10:11:18 +010024#define W1(key) (uint64_t)(key & 0xffffff)
25#define W2(key) (uint64_t)((key >> 24) & 0xffffff)
26#define DIST(key) (int8_t)((uint64_t)((key >> 56) & 0xff))
Marc Kupietz28cc53e2017-12-23 17:24:55 +010027using namespace rocksdb;
28
Marc Kupietz4b799e92018-01-02 11:04:56 +010029namespace rocksdb {
Marc Kupietz28cc53e2017-12-23 17:24:55 +010030 size_t num_merge_operator_calls;
31 void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; }
32
33 size_t num_partial_merge_calls;
34 void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; }
Marc Kupietz28cc53e2017-12-23 17:24:55 +010035
36
Marc Kupietz4b799e92018-01-02 11:04:56 +010037 inline void EncodeFixed64(char* buf, uint64_t value) {
38 if (! IS_BIG_ENDIAN) {
39 memcpy(buf, &value, sizeof(value));
40 } else {
41 buf[0] = value & 0xff;
42 buf[1] = (value >> 8) & 0xff;
43 buf[2] = (value >> 16) & 0xff;
44 buf[3] = (value >> 24) & 0xff;
45 buf[4] = (value >> 32) & 0xff;
46 buf[5] = (value >> 40) & 0xff;
47 buf[6] = (value >> 48) & 0xff;
48 buf[7] = (value >> 56) & 0xff;
49 }
Marc Kupietz28cc53e2017-12-23 17:24:55 +010050 }
51
Marc Kupietz4b799e92018-01-02 11:04:56 +010052 inline uint32_t DecodeFixed32(const char* ptr) {
53 if (! IS_BIG_ENDIAN) {
54 // Load the raw bytes
55 uint32_t result;
56 memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load
57 return result;
58 } else {
59 return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))
60 | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)
61 | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)
62 | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));
63 }
64 }
65
66 inline uint64_t DecodeFixed64(const char* ptr) {
67 if (! IS_BIG_ENDIAN) {
68 // Load the raw bytes
69 uint64_t result;
70 memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load
71 return result;
72 } else {
73 uint64_t lo = DecodeFixed32(ptr);
74 uint64_t hi = DecodeFixed32(ptr + 4);
75 return (hi << 32) | lo;
76 }
77 }
78
79
80 class CountMergeOperator : public AssociativeMergeOperator {
81 public:
82 CountMergeOperator() {
83 mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
Marc Kupietz28cc53e2017-12-23 17:24:55 +010084 }
85
Marc Kupietz4b799e92018-01-02 11:04:56 +010086 virtual bool Merge(const Slice& key,
87 const Slice* existing_value,
88 const Slice& value,
89 std::string* new_value,
90 Logger* logger) const override {
91 assert(new_value->empty());
92 ++num_merge_operator_calls;
93 if (existing_value == nullptr) {
94 new_value->assign(value.data(), value.size());
95 return true;
96 }
Marc Kupietz28cc53e2017-12-23 17:24:55 +010097
Marc Kupietz4b799e92018-01-02 11:04:56 +010098 return mergeOperator_->PartialMerge(
99 key,
100 *existing_value,
101 value,
102 new_value,
103 logger);
104 }
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100105
Marc Kupietz4b799e92018-01-02 11:04:56 +0100106 virtual const char* Name() const override {
107 return "UInt64AddOperator";
108 }
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100109
Marc Kupietz4b799e92018-01-02 11:04:56 +0100110 private:
111 std::shared_ptr<MergeOperator> mergeOperator_;
112 };
113
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100114
Marc Kupietz4b799e92018-01-02 11:04:56 +0100115 class CollocatorIterator : public Iterator {
116 private:
117 char prefixc[sizeof(uint64_t)];
118 Iterator *base_iterator_;
119
120
121 public:
122 CollocatorIterator(Iterator* base_iterator)
123 : base_iterator_(base_iterator)
124 {}
125
126 ~CollocatorIterator();
127
128 void setPrefix(char *prefix) {
129 memcpy(prefixc, prefix, sizeof(uint64_t));
130 }
131
132 virtual void SeekToFirst() { base_iterator_->SeekToFirst(); }
133 virtual void SeekToLast() { base_iterator_->SeekToLast(); }
134 virtual void Seek(const rocksdb::Slice& s) { base_iterator_->Seek(s); }
135 virtual void Prev() { base_iterator_->Prev(); }
136 virtual void Next() { base_iterator_->Next(); }
137 virtual Slice key() const;
138 virtual Slice value() const;
139 virtual Status status() const;
140 virtual bool Valid() const;
141 bool isValid();
142 uint64_t intValue();
143 uint64_t intKey();
Marc Kupietz18375e12017-12-24 10:11:18 +0100144
Marc Kupietz4b799e92018-01-02 11:04:56 +0100145 };
Marc Kupietz18375e12017-12-24 10:11:18 +0100146
Marc Kupietz4b799e92018-01-02 11:04:56 +0100147 // rocksdb::CollocatorIterator::CollocatorIterator(Iterator* base_iterator) {}
148
149 bool rocksdb::CollocatorIterator::Valid() const {
Marc Kupietz18375e12017-12-24 10:11:18 +0100150 return base_iterator_->Valid() && key().starts_with(std::string(prefixc,3));
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100151 }
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100152
Marc Kupietz4b799e92018-01-02 11:04:56 +0100153 bool rocksdb::CollocatorIterator::isValid() {
154 return base_iterator_->Valid() && key().starts_with(std::string(prefixc,3));
155 }
Marc Kupietz18375e12017-12-24 10:11:18 +0100156
Marc Kupietz4b799e92018-01-02 11:04:56 +0100157 uint64_t rocksdb::CollocatorIterator::intKey() {
158 return DecodeFixed64(base_iterator_->key().data());
159 }
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100160
Marc Kupietz4b799e92018-01-02 11:04:56 +0100161 uint64_t rocksdb::CollocatorIterator::intValue() {
162 return DecodeFixed64(base_iterator_->value().data());
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100163 }
164
Marc Kupietz4b799e92018-01-02 11:04:56 +0100165 class Collocators {
166 private:
167 WriteOptions merge_option_; // for merge
168 char _one[sizeof(uint64_t)];
169 Slice _one_slice;
170
171 protected:
172 std::shared_ptr<DB> db_;
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100173
Marc Kupietz4b799e92018-01-02 11:04:56 +0100174 WriteOptions put_option_;
175 ReadOptions get_option_;
176 WriteOptions delete_option_;
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100177
Marc Kupietz4b799e92018-01-02 11:04:56 +0100178 uint64_t default_;
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100179
Marc Kupietz4b799e92018-01-02 11:04:56 +0100180 std::shared_ptr<DB> OpenDb(const char *dbname);
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100181
Marc Kupietz4b799e92018-01-02 11:04:56 +0100182 public:
183 Collocators(const char *db_name);
Marc Kupietz06c9a9f2018-01-02 16:56:43 +0100184
Marc Kupietz4b799e92018-01-02 11:04:56 +0100185 ~Collocators();
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100186
Marc Kupietz4b799e92018-01-02 11:04:56 +0100187 // public interface of Collocators.
188 // All four functions return false
189 // if the underlying level db operation failed.
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100190
Marc Kupietz4b799e92018-01-02 11:04:56 +0100191 // mapped to a levedb Put
192 bool set(const std::string& key, uint64_t value) {
193 // just treat the internal rep of int64 as the string
194 char buf[sizeof(value)];
195 EncodeFixed64(buf, value);
196 Slice slice(buf, sizeof(value));
197 auto s = db_->Put(put_option_, key, slice);
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100198
Marc Kupietz4b799e92018-01-02 11:04:56 +0100199 if (s.ok()) {
200 return true;
201 } else {
202 std::cerr << s.ToString() << std::endl;
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100203 return false;
204 }
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100205 }
Marc Kupietz4b799e92018-01-02 11:04:56 +0100206
207 DB *getDb() {
208 return db_.get();
209 }
210
211 // mapped to a rocksdb Delete
212 bool remove(const std::string& key) {
213 auto s = db_->Delete(delete_option_, key);
214
215 if (s.ok()) {
216 return true;
217 } else {
218 std::cerr << s.ToString() << std::endl;
219 return false;
220 }
221 }
222
223 // mapped to a rocksdb Get
224 bool get(const std::string& key, uint64_t* value) {
225 std::string str;
226 auto s = db_->Get(get_option_, key, &str);
227
228 if (s.IsNotFound()) {
229 // return default value if not found;
230 *value = default_;
231 return true;
232 } else if (s.ok()) {
233 // deserialization
234 if (str.size() != sizeof(uint64_t)) {
235 std::cerr << "value corruption\n";
236 return false;
237 }
238 *value = DecodeFixed64(&str[0]);
239 return true;
240 } else {
241 std::cerr << s.ToString() << std::endl;
242 return false;
243 }
244 }
245
246
247 uint64_t get(const uint32_t w1, const uint32_t w2, const int8_t dist) {
248 char encoded_key[sizeof(uint64_t)];
249 EncodeFixed64(encoded_key, encodeCollocation(w1,w2,dist));
250 uint64_t value = default_;
251 get(std::string(encoded_key, 8), &value);
252 return value;
253 }
254
255 virtual void inc(const std::string& key) {
256 db_->Merge(merge_option_, key, _one_slice);
257 }
258
259 void inc(const uint64_t key) {
260 char encoded_key[sizeof(uint64_t)];
261 EncodeFixed64(encoded_key, key);
262 db_->Merge(merge_option_, std::string(encoded_key, 8), _one_slice);
263 }
264
265 virtual void inc(const uint32_t w1, const uint32_t w2, const uint8_t dist);
Marc Kupietz06c9a9f2018-01-02 16:56:43 +0100266 void dump(uint32_t w1, uint32_t w2, int8_t dist);
Marc Kupietz4b799e92018-01-02 11:04:56 +0100267
268
269 // mapped to a rocksdb Merge operation
270 virtual bool add(const std::string& key, uint64_t value) {
271 char encoded[sizeof(uint64_t)];
272 EncodeFixed64(encoded, value);
273 Slice slice(encoded, sizeof(uint64_t));
274 auto s = db_->Merge(merge_option_, key, slice);
275
276 if (s.ok()) {
277 return true;
278 } else {
279 std::cerr << s.ToString() << std::endl;
280 return false;
281 }
282 }
283
284 CollocatorIterator* SeekIterator(uint64_t w1, uint64_t w2, int8_t dist);
285 };
286
287 rocksdb::Collocators::Collocators(const char *db_name) {
288 std::cout << "Test merge-based counters... " << db_name << "\n";
289 db_ = OpenDb(db_name);
290 assert(db_);
291 uint64_t one = 1;
292 EncodeFixed64(_one, one);
293 _one_slice = Slice(_one, sizeof(uint64_t));
294 }
295
296 rocksdb::CollocatorIterator::~CollocatorIterator() {
297 std::cout << "destroying itera\n";
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100298 }
299
Marc Kupietz4b799e92018-01-02 11:04:56 +0100300 rocksdb::Collocators::~Collocators() {
301 std::cout << "destroying coll\n";
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100302 }
303
Marc Kupietz4b799e92018-01-02 11:04:56 +0100304 void rocksdb::Collocators::inc(const uint32_t w1, const uint32_t w2, const uint8_t dist) {
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100305 inc(encodeCollocation(w1, w2, dist));
306 }
Marc Kupietz4b799e92018-01-02 11:04:56 +0100307
308 std::shared_ptr<DB> rocksdb::Collocators::OpenDb(const char *dbname) {
309 std::cout << "Test merge-based counters... " << dbname << "\n";
310 DB* db;
311 Options options;
312 options.create_if_missing = true;
313 options.merge_operator = std::make_shared<CountMergeOperator>();
314 options.max_successive_merges = 0;
315 Status s;
316 // DestroyDB(dbname, Options());
317 s = DB::Open(options, dbname, &db);
318 if (!s.ok()) {
319 std::cerr << s.ToString() << std::endl;
320 assert(false);
321 }
322 return std::shared_ptr<DB>(db);
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100323 }
324
Marc Kupietz4b799e92018-01-02 11:04:56 +0100325 CollocatorIterator* rocksdb::Collocators::SeekIterator(uint64_t w1, uint64_t w2, int8_t dist) {
Marc Kupietz18375e12017-12-24 10:11:18 +0100326 ReadOptions options;
327 options.prefix_same_as_start = true;
328 char prefixc[sizeof(uint64_t)];
329 EncodeFixed64(prefixc, encodeCollocation(w1, w2, dist));
330 Iterator *it = db_->NewIterator(options);
331 CollocatorIterator *cit = new CollocatorIterator(it);
332 cit->Seek(std::string(prefixc,3));// it->Valid() && it->key().starts_with(std::string(prefixc,3)); it->Next()) {
333 cit->setPrefix(prefixc);
334 return cit;
335 }
336
Marc Kupietz06c9a9f2018-01-02 16:56:43 +0100337 void rocksdb::Collocators::dump(uint32_t w1, uint32_t w2, int8_t dist) {
338 auto it = std::unique_ptr<CollocatorIterator>(SeekIterator(w1, w2, dist));
339 for (; it->isValid(); it->Next()) {
340 uint64_t value = it->intValue();
341 uint64_t key = it->intKey();
342 std::cout << "w1:" << W1(key) << ", w2:" << W2(key) << ", dist:" << (int32_t) DIST(key) << " - count:" << value << std::endl;
343 }
344 std::cout << "ready dumping\n";
345 }
346
Marc Kupietz4b799e92018-01-02 11:04:56 +0100347 rocksdb::Slice rocksdb::CollocatorIterator::key() const { return base_iterator_->key(); }
348 rocksdb::Slice rocksdb::CollocatorIterator::value() const { return base_iterator_->value(); }
349 rocksdb::Status rocksdb::CollocatorIterator::status() const { return base_iterator_->status(); }
350
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100351};
Marc Kupietz06c9a9f2018-01-02 16:56:43 +0100352
353typedef rocksdb::Collocators COLLOCATORS;
354
355extern "C" {
356 COLLOCATORS *open_collocators(char *dbname) {
357 return new rocksdb::Collocators(dbname);
358 }
359
360 void inc_collocators(COLLOCATORS *db, uint32_t w1, uint32_t w2, int8_t dist) {
361 db->inc(w1, w2, dist);
362 }
363
364 void dump_collocators(COLLOCATORS *db, uint32_t w1, uint32_t w2, int8_t dist) {
365 db->dump(w1, w2, dist);
366 }
367}