blob: b4c7679dc43b0892c2572f65cc3d26227a59502c [file] [log] [blame]
Marc Kupietz28cc53e2017-12-23 17:24:55 +01001// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5//
6#include <typeinfo>
7#include <assert.h>
8#include <memory>
9#include <iostream>
10#include <stdint.h>
11#include "rocksdb/cache.h"
12#include "rocksdb/comparator.h"
13#include "rocksdb/db.h"
14#include "rocksdb/env.h"
15#include <rocksdb/merge_operator.h>
16#include "rocksdb/utilities/db_ttl.h"
17#include "merge_operators.h"
18
19#define IS_BIG_ENDIAN (*(uint16_t *)"\0\xff" < 0x100)
20#define encodeCollocation(w1, w2, dist) (((uint64_t)dist << 56) | ((uint64_t)w2 << 24) | w1)
21using namespace rocksdb;
22
23namespace {
24 size_t num_merge_operator_calls;
25 void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; }
26
27 size_t num_partial_merge_calls;
28 void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; }
29}
30
31inline void EncodeFixed64(char* buf, uint64_t value) {
32 if (! IS_BIG_ENDIAN) {
33 memcpy(buf, &value, sizeof(value));
34 } else {
35 buf[0] = value & 0xff;
36 buf[1] = (value >> 8) & 0xff;
37 buf[2] = (value >> 16) & 0xff;
38 buf[3] = (value >> 24) & 0xff;
39 buf[4] = (value >> 32) & 0xff;
40 buf[5] = (value >> 40) & 0xff;
41 buf[6] = (value >> 48) & 0xff;
42 buf[7] = (value >> 56) & 0xff;
43 }
44}
45
46inline uint32_t DecodeFixed32(const char* ptr) {
47 if (! IS_BIG_ENDIAN) {
48 // Load the raw bytes
49 uint32_t result;
50 memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load
51 return result;
52 } else {
53 return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))
54 | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)
55 | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)
56 | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));
57 }
58}
59
60inline uint64_t DecodeFixed64(const char* ptr) {
61 if (! IS_BIG_ENDIAN) {
62 // Load the raw bytes
63 uint64_t result;
64 memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load
65 return result;
66 } else {
67 uint64_t lo = DecodeFixed32(ptr);
68 uint64_t hi = DecodeFixed32(ptr + 4);
69 return (hi << 32) | lo;
70 }
71}
72
73
74class CountMergeOperator : public AssociativeMergeOperator {
75public:
76 CountMergeOperator() {
77 mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
78 }
79
80 virtual bool Merge(const Slice& key,
81 const Slice* existing_value,
82 const Slice& value,
83 std::string* new_value,
84 Logger* logger) const override {
85 assert(new_value->empty());
86 ++num_merge_operator_calls;
87 if (existing_value == nullptr) {
88 new_value->assign(value.data(), value.size());
89 return true;
90 }
91
92 return mergeOperator_->PartialMerge(
93 key,
94 *existing_value,
95 value,
96 new_value,
97 logger);
98 }
99
100 virtual const char* Name() const override {
101 return "UInt64AddOperator";
102 }
103
104private:
105 std::shared_ptr<MergeOperator> mergeOperator_;
106};
107
108namespace {
109} // namespace
110
111class CollocatorIterator : public Iterator {
112public:
113 explicit CollocatorIterator() {
114 }
115};
116
117class Collocators {
118private:
119 WriteOptions merge_option_; // for merge
120 char _one[sizeof(uint64_t)];
121 Slice _one_slice;
122
123protected:
124 std::shared_ptr<DB> db_;
125
126 WriteOptions put_option_;
127 ReadOptions get_option_;
128 WriteOptions delete_option_;
129
130 uint64_t default_;
131
132 std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
133 const size_t max_successive_merges = 0) {
134 DB* db;
135 Options options;
136 options.create_if_missing = true;
137 options.merge_operator = std::make_shared<CountMergeOperator>();
138 options.max_successive_merges = max_successive_merges;
139 Status s;
140 // DestroyDB(dbname, Options());
141 s = DB::Open(options, dbname, &db);
142 if (!s.ok()) {
143 std::cerr << s.ToString() << std::endl;
144 assert(false);
145 }
146 return std::shared_ptr<DB>(db);
147 }
148
149public:
150 explicit Collocators(const std::string& db_name)
151 : put_option_(),
152 get_option_(),
153 delete_option_(),
154 merge_option_() {
155 db_ = OpenDb(db_name, false, 0);
156 assert(db_);
157 uint64_t one = 1;
158 EncodeFixed64(_one, one);
159 _one_slice = Slice(_one, sizeof(uint64_t));
160 }
161
162 virtual ~Collocators() {}
163
164 // public interface of Collocators.
165 // All four functions return false
166 // if the underlying level db operation failed.
167
168 // mapped to a levedb Put
169 bool set(const std::string& key, uint64_t value) {
170 // just treat the internal rep of int64 as the string
171 char buf[sizeof(value)];
172 EncodeFixed64(buf, value);
173 Slice slice(buf, sizeof(value));
174 auto s = db_->Put(put_option_, key, slice);
175
176 if (s.ok()) {
177 return true;
178 } else {
179 std::cerr << s.ToString() << std::endl;
180 return false;
181 }
182 }
183
184 DB *getDb() {
185 return db_.get();
186 }
187
188 // mapped to a rocksdb Delete
189 bool remove(const std::string& key) {
190 auto s = db_->Delete(delete_option_, key);
191
192 if (s.ok()) {
193 return true;
194 } else {
195 std::cerr << s.ToString() << std::endl;
196 return false;
197 }
198 }
199
200 // mapped to a rocksdb Get
201 bool get(const std::string& key, uint64_t* value) {
202 std::string str;
203 auto s = db_->Get(get_option_, key, &str);
204
205 if (s.IsNotFound()) {
206 // return default value if not found;
207 *value = default_;
208 return true;
209 } else if (s.ok()) {
210 // deserialization
211 if (str.size() != sizeof(uint64_t)) {
212 std::cerr << "value corruption\n";
213 return false;
214 }
215 *value = DecodeFixed64(&str[0]);
216 return true;
217 } else {
218 std::cerr << s.ToString() << std::endl;
219 return false;
220 }
221 }
222
223
224 uint64_t get(const uint32_t w1, const uint32_t w2, const int8_t dist) {
225 char encoded_key[sizeof(uint64_t)];
226 EncodeFixed64(encoded_key, encodeCollocation(w1,w2,dist));
227 uint64_t value = default_;
228 int result = get(std::string(encoded_key, 8), &value);
229 return value;
230 }
231
232 virtual void inc(const std::string& key) {
233 db_->Merge(merge_option_, key, _one_slice);
234 }
235
236 virtual void inc(const uint64_t key) {
237 char encoded_key[sizeof(uint64_t)];
238 EncodeFixed64(encoded_key, key);
239 db_->Merge(merge_option_, std::string(encoded_key, 8), _one_slice);
240 }
241
242 virtual void inc(const uint32_t w1, const uint32_t w2, const uint8_t dist) {
243 inc(encodeCollocation(w1, w2, dist));
244 }
245
246 // mapped to a rocksdb Merge operation
247 virtual bool add(const std::string& key, uint64_t value) {
248 char encoded[sizeof(uint64_t)];
249 EncodeFixed64(encoded, value);
250 Slice slice(encoded, sizeof(uint64_t));
251 auto s = db_->Merge(merge_option_, key, slice);
252
253 if (s.ok()) {
254 return true;
255 } else {
256 std::cerr << s.ToString() << std::endl;
257 return false;
258 }
259 }
260
261};
262
263namespace {
264 void dumpDb(DB* db) {
265 char prefixc[sizeof(uint64_t)];
266 EncodeFixed64(prefixc, encodeCollocation(1000,0,0));
267 ReadOptions options;
268 options.prefix_same_as_start = true;
269 auto it = unique_ptr<Iterator>(db->NewIterator(options));
270
271 for (it->Seek(std::string(prefixc,3)); it->Valid() && it->key().starts_with(std::string(prefixc,3)); it->Next()) {
272 uint64_t value = DecodeFixed64(it->value().data());
273 uint64_t key = DecodeFixed64(it->key().data());
274 uint64_t w1 = (uint64_t)(key & 0xffffff);
275 uint64_t w2 = (uint64_t)((key >> 24) & 0xffffff);
276 int8_t dist = (uint64_t)((key >> 56) & 0xff);
277 std::cout << "w1:" << w1 << ", w2:" << w2 << ", dist:" << (int32_t) dist << " - count:" << value << std::endl;
278 }
279
280
281 assert(it->status().ok()); // Check for any errors found during the scan
282 }
283
284 void testCollocators(Collocators& counters) {
285
286 FlushOptions o;
287 DB *db = counters.getDb();
288
289 o.wait = true;
290
291 counters.inc(100,200,5);
292 counters.inc(1000,2000,-5);
293 counters.inc(1000,2000,5);
294 counters.inc(1000,2500,-3);
295 counters.inc(1000,2500,4);
296 counters.inc(1000,2900,3);
297
298 counters.inc(1001,2900,3);
299
300 for(int i=0; i<1000000; i++)
301 counters.inc(rand()%1000,rand()%1000,5);
302
303 // dumpDb(db);
304
305 counters.inc(100,200,5);
306 counters.inc(1000,2000,5);
307 counters.inc(1000,2500,4);
308 counters.inc(1000,2900,3);
309
310 counters.inc(1001,2900,3);
311
312 dumpDb(db);
313 }
314
315 void runTest(int argc, const std::string& dbname, const bool use_ttl = false) {
316 std::cout << "Test merge-based counters... \n";
317 Collocators counters(dbname);
318 testCollocators(counters);
319 }
320} // namespace
321
322int main(int argc, char *argv[]) {
323 runTest(argc, "/tmp/merge_testdb");
324 printf("Passed all tests!\n");
325 return 0;
326}