blob: 4eb6134f3cb29d9f3a74ce71f07cedcce987f84e [file] [log] [blame]
Marc Kupietz28cc53e2017-12-23 17:24:55 +01001// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5//
6#include <typeinfo>
7#include <assert.h>
8#include <memory>
9#include <iostream>
10#include <stdint.h>
11#include "rocksdb/cache.h"
12#include "rocksdb/comparator.h"
13#include "rocksdb/db.h"
14#include "rocksdb/env.h"
15#include <rocksdb/merge_operator.h>
16#include "rocksdb/utilities/db_ttl.h"
17#include "merge_operators.h"
18
19#define IS_BIG_ENDIAN (*(uint16_t *)"\0\xff" < 0x100)
20#define encodeCollocation(w1, w2, dist) (((uint64_t)dist << 56) | ((uint64_t)w2 << 24) | w1)
Marc Kupietz18375e12017-12-24 10:11:18 +010021#define W1(key) (uint64_t)(key & 0xffffff)
22#define W2(key) (uint64_t)((key >> 24) & 0xffffff)
23#define DIST(key) (int8_t)((uint64_t)((key >> 56) & 0xff))
Marc Kupietz28cc53e2017-12-23 17:24:55 +010024using namespace rocksdb;
25
26namespace {
27 size_t num_merge_operator_calls;
28 void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; }
29
30 size_t num_partial_merge_calls;
31 void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; }
32}
33
34inline void EncodeFixed64(char* buf, uint64_t value) {
35 if (! IS_BIG_ENDIAN) {
36 memcpy(buf, &value, sizeof(value));
37 } else {
38 buf[0] = value & 0xff;
39 buf[1] = (value >> 8) & 0xff;
40 buf[2] = (value >> 16) & 0xff;
41 buf[3] = (value >> 24) & 0xff;
42 buf[4] = (value >> 32) & 0xff;
43 buf[5] = (value >> 40) & 0xff;
44 buf[6] = (value >> 48) & 0xff;
45 buf[7] = (value >> 56) & 0xff;
46 }
47}
48
49inline uint32_t DecodeFixed32(const char* ptr) {
50 if (! IS_BIG_ENDIAN) {
51 // Load the raw bytes
52 uint32_t result;
53 memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load
54 return result;
55 } else {
56 return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))
57 | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)
58 | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)
59 | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));
60 }
61}
62
63inline uint64_t DecodeFixed64(const char* ptr) {
64 if (! IS_BIG_ENDIAN) {
65 // Load the raw bytes
66 uint64_t result;
67 memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load
68 return result;
69 } else {
70 uint64_t lo = DecodeFixed32(ptr);
71 uint64_t hi = DecodeFixed32(ptr + 4);
72 return (hi << 32) | lo;
73 }
74}
75
76
77class CountMergeOperator : public AssociativeMergeOperator {
78public:
79 CountMergeOperator() {
80 mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
81 }
82
83 virtual bool Merge(const Slice& key,
84 const Slice* existing_value,
85 const Slice& value,
86 std::string* new_value,
87 Logger* logger) const override {
88 assert(new_value->empty());
89 ++num_merge_operator_calls;
90 if (existing_value == nullptr) {
91 new_value->assign(value.data(), value.size());
92 return true;
93 }
94
95 return mergeOperator_->PartialMerge(
96 key,
97 *existing_value,
98 value,
99 new_value,
100 logger);
101 }
102
103 virtual const char* Name() const override {
104 return "UInt64AddOperator";
105 }
106
107private:
108 std::shared_ptr<MergeOperator> mergeOperator_;
109};
110
111namespace {
112} // namespace
113
114class CollocatorIterator : public Iterator {
Marc Kupietz18375e12017-12-24 10:11:18 +0100115private:
116 char prefixc[sizeof(uint64_t)];
117 Iterator *base_iterator_;
118
119
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100120public:
Marc Kupietz18375e12017-12-24 10:11:18 +0100121 explicit CollocatorIterator(Iterator* base_iterator)
122 : base_iterator_(base_iterator)
123 {}
124
125 void setPrefix(char *prefix) {
126 memcpy(prefixc, prefix, sizeof(uint64_t));
127 }
128
129 virtual void SeekToFirst() { base_iterator_->SeekToFirst(); }
130 virtual void SeekToLast() { base_iterator_->SeekToLast(); }
131 virtual void Seek(const rocksdb::Slice& s) { base_iterator_->Seek(s); }
132 virtual void Prev() { base_iterator_->Prev(); }
133 virtual void Next() { base_iterator_->Next(); }
134 virtual Slice key() const { return base_iterator_->key(); }
135 virtual Slice value() const { return base_iterator_->value(); }
136 virtual Status status() const { return base_iterator_->status(); }
137
138 virtual bool Valid() const override {
139 return base_iterator_->Valid() && key().starts_with(std::string(prefixc,3));
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100140 }
141};
142
Marc Kupietz18375e12017-12-24 10:11:18 +0100143
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100144class Collocators {
145private:
146 WriteOptions merge_option_; // for merge
147 char _one[sizeof(uint64_t)];
148 Slice _one_slice;
149
150protected:
151 std::shared_ptr<DB> db_;
152
153 WriteOptions put_option_;
154 ReadOptions get_option_;
155 WriteOptions delete_option_;
156
157 uint64_t default_;
158
159 std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
160 const size_t max_successive_merges = 0) {
161 DB* db;
162 Options options;
163 options.create_if_missing = true;
164 options.merge_operator = std::make_shared<CountMergeOperator>();
165 options.max_successive_merges = max_successive_merges;
166 Status s;
167 // DestroyDB(dbname, Options());
168 s = DB::Open(options, dbname, &db);
169 if (!s.ok()) {
170 std::cerr << s.ToString() << std::endl;
171 assert(false);
172 }
173 return std::shared_ptr<DB>(db);
174 }
175
176public:
177 explicit Collocators(const std::string& db_name)
178 : put_option_(),
179 get_option_(),
180 delete_option_(),
Marc Kupietz18375e12017-12-24 10:11:18 +0100181 merge_option_()
182{
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100183 db_ = OpenDb(db_name, false, 0);
184 assert(db_);
185 uint64_t one = 1;
186 EncodeFixed64(_one, one);
187 _one_slice = Slice(_one, sizeof(uint64_t));
188 }
189
190 virtual ~Collocators() {}
191
192 // public interface of Collocators.
193 // All four functions return false
194 // if the underlying level db operation failed.
195
196 // mapped to a levedb Put
197 bool set(const std::string& key, uint64_t value) {
198 // just treat the internal rep of int64 as the string
199 char buf[sizeof(value)];
200 EncodeFixed64(buf, value);
201 Slice slice(buf, sizeof(value));
202 auto s = db_->Put(put_option_, key, slice);
203
204 if (s.ok()) {
205 return true;
206 } else {
207 std::cerr << s.ToString() << std::endl;
208 return false;
209 }
210 }
211
212 DB *getDb() {
213 return db_.get();
214 }
215
216 // mapped to a rocksdb Delete
217 bool remove(const std::string& key) {
218 auto s = db_->Delete(delete_option_, key);
219
220 if (s.ok()) {
221 return true;
222 } else {
223 std::cerr << s.ToString() << std::endl;
224 return false;
225 }
226 }
227
228 // mapped to a rocksdb Get
229 bool get(const std::string& key, uint64_t* value) {
230 std::string str;
231 auto s = db_->Get(get_option_, key, &str);
232
233 if (s.IsNotFound()) {
234 // return default value if not found;
235 *value = default_;
236 return true;
237 } else if (s.ok()) {
238 // deserialization
239 if (str.size() != sizeof(uint64_t)) {
240 std::cerr << "value corruption\n";
241 return false;
242 }
243 *value = DecodeFixed64(&str[0]);
244 return true;
245 } else {
246 std::cerr << s.ToString() << std::endl;
247 return false;
248 }
249 }
250
251
252 uint64_t get(const uint32_t w1, const uint32_t w2, const int8_t dist) {
253 char encoded_key[sizeof(uint64_t)];
254 EncodeFixed64(encoded_key, encodeCollocation(w1,w2,dist));
255 uint64_t value = default_;
Marc Kupietz18375e12017-12-24 10:11:18 +0100256 get(std::string(encoded_key, 8), &value);
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100257 return value;
258 }
259
260 virtual void inc(const std::string& key) {
261 db_->Merge(merge_option_, key, _one_slice);
262 }
263
264 virtual void inc(const uint64_t key) {
265 char encoded_key[sizeof(uint64_t)];
266 EncodeFixed64(encoded_key, key);
267 db_->Merge(merge_option_, std::string(encoded_key, 8), _one_slice);
268 }
269
270 virtual void inc(const uint32_t w1, const uint32_t w2, const uint8_t dist) {
271 inc(encodeCollocation(w1, w2, dist));
272 }
273
274 // mapped to a rocksdb Merge operation
275 virtual bool add(const std::string& key, uint64_t value) {
276 char encoded[sizeof(uint64_t)];
277 EncodeFixed64(encoded, value);
278 Slice slice(encoded, sizeof(uint64_t));
279 auto s = db_->Merge(merge_option_, key, slice);
280
281 if (s.ok()) {
282 return true;
283 } else {
284 std::cerr << s.ToString() << std::endl;
285 return false;
286 }
287 }
288
Marc Kupietz18375e12017-12-24 10:11:18 +0100289 virtual CollocatorIterator* SeekIterator(uint64_t w1, uint64_t w2, int8_t dist) {
290 ReadOptions options;
291 options.prefix_same_as_start = true;
292 char prefixc[sizeof(uint64_t)];
293 EncodeFixed64(prefixc, encodeCollocation(w1, w2, dist));
294 Iterator *it = db_->NewIterator(options);
295 CollocatorIterator *cit = new CollocatorIterator(it);
296 cit->Seek(std::string(prefixc,3));// it->Valid() && it->key().starts_with(std::string(prefixc,3)); it->Next()) {
297 cit->setPrefix(prefixc);
298 return cit;
299 }
300
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100301};
302
303namespace {
Marc Kupietz18375e12017-12-24 10:11:18 +0100304 void dumpDb(Collocators counters) {
305 auto it = std::unique_ptr<CollocatorIterator>(counters.SeekIterator(1000,0,0));
306 for (; it->Valid(); it->Next()) {
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100307 uint64_t value = DecodeFixed64(it->value().data());
308 uint64_t key = DecodeFixed64(it->key().data());
Marc Kupietz18375e12017-12-24 10:11:18 +0100309 std::cout << "w1:" << W1(key) << ", w2:" << W2(key) << ", dist:" << (int32_t) DIST(key) << " - count:" << value << std::endl;
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100310 }
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100311 assert(it->status().ok()); // Check for any errors found during the scan
312 }
313
314 void testCollocators(Collocators& counters) {
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100315 counters.inc(100,200,5);
316 counters.inc(1000,2000,-5);
317 counters.inc(1000,2000,5);
318 counters.inc(1000,2500,-3);
319 counters.inc(1000,2500,4);
320 counters.inc(1000,2900,3);
321
322 counters.inc(1001,2900,3);
323
Marc Kupietz18375e12017-12-24 10:11:18 +0100324 for(int i=0; i<10000; i++)
325 counters.inc(rand()%1010,rand()%1010,rand()%10-5);
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100326
327 // dumpDb(db);
328
329 counters.inc(100,200,5);
330 counters.inc(1000,2000,5);
331 counters.inc(1000,2500,4);
332 counters.inc(1000,2900,3);
333
334 counters.inc(1001,2900,3);
335
Marc Kupietz18375e12017-12-24 10:11:18 +0100336 dumpDb(counters);
Marc Kupietz28cc53e2017-12-23 17:24:55 +0100337 }
338
339 void runTest(int argc, const std::string& dbname, const bool use_ttl = false) {
340 std::cout << "Test merge-based counters... \n";
341 Collocators counters(dbname);
342 testCollocators(counters);
343 }
344} // namespace
345
346int main(int argc, char *argv[]) {
347 runTest(argc, "/tmp/merge_testdb");
348 printf("Passed all tests!\n");
349 return 0;
350}