OPIC
Object Persistence In C
robin_hood.c
1 /* robin_hood.c ---
2  *
3  * Filename: robin_hood.c
4  * Description:
5  * Author: Felix Chern
6  * Maintainer:
7  * Copyright: (c) 2017 Felix Chern
8  * Created: Sun Apr 2 07:16:17 2017 (-0700)
9  * Version:
10  * Package-Requires: ()
11  * Last-Updated:
12  * By:
13  * Update #: 0
14  * URL:
15  * Doc URL:
16  * Keywords:
17  * Compatibility:
18  *
19  */
20 
21 /* Commentary:
22  *
23  *
24  *
25  */
26 
27 /* Change Log:
28  *
29  *
30  */
31 
32 /* This program is free software: you can redistribute it and/or modify
33  * it under the terms of the GNU Lesser General Public License as published by
34  * the Free Software Foundation, either version 3 of the License, or (at
35  * your option) any later version.
36  *
37  * This program is distributed in the hope that it will be useful, but
38  * WITHOUT ANY WARRANTY; without even the implied warranty of
39  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
40  * Lesser General Public License for more details.
41  *
42  * You should have received a copy of the GNU Lesser General Public License
43  * along with this program. If not, see <http://www.gnu.org/licenses/>.
44  */
45 
46 /* Code: */
47 
48 #include <stdio.h> // TODO use op_log instead
49 #include <stdlib.h>
50 #include <inttypes.h>
51 #include <stdint.h>
52 #include <string.h>
53 #include <stdbool.h>
54 #include "opic/common/op_assert.h"
55 #include "opic/common/op_utils.h"
56 #include "opic/common/op_log.h"
57 #include "opic/op_malloc.h"
58 #include "robin_hood.h"
59 
60 #define PROBE_STATS_SIZE 64
61 #define DEFAULT_LARGE_DATA_THRESHOLD (1UL << 30)
62 #define VISIT_IDX_CACHE 8
63 
64 OP_LOGGER_FACTORY(logger, "opic.hash.robin_hood");
65 
66 enum upsert_result_t
67  {
68  UPSERT_EMPTY,
69  UPSERT_DUP,
70  UPSERT_PUSHDOWN,
71  };
72 
73 static inline enum upsert_result_t
74 HTUpsertNewKey(OPHashTable* table, OPHash hasher,
75  uint64_t hashed_key,
76  void* key,
77  uint8_t** matched_bucket,
78  int* probe_state);
79 
80 static inline void
81 HTUpsertPushDown(OPHashTable* table, OPHash hasher,
82  uint8_t* bucket_cpy, int probe,
83  uint8_t* avoid_bucket, bool* resize);
84 
85 static bool HTSizeUp(OPHashTable* table, OPHash hasher);
86 
87 struct OPHashTable
88 {
89  uint64_t objcnt;
90  uint64_t objcnt_high;
91  uint64_t objcnt_low;
92  uint64_t large_data_threshold;
93  uint8_t capacity_clz; // leading zeros of capacity
94  uint8_t capacity_ms4b; // most significant 4 bits
95  uint16_t longest_probes;
96  size_t keysize;
97  size_t valsize;
98  uint32_t stats[PROBE_STATS_SIZE];
99  opref_t bucket_ref;
100 };
101 
102 struct HTFunnel
103 {
104  OPHashTable* restrict table;
105  OPHash hasher;
106  FunnelCB callback;
107  size_t slotsize;
108  uint8_t capacity_clz;
109  uint8_t partition_clz;
110  uint8_t* restrict tubes;
111  ptrdiff_t* restrict flowheads;
112 };
113 
114 static inline
115 uint64_t HTCapacityInternal(uint8_t capacity_clz, uint8_t capacity_ms4b);
116 
117 bool
118 HTNew(OPHeap* heap, OPHashTable** table,
119  uint64_t num_objects, double load, size_t keysize, size_t valsize)
120 {
121  uint64_t capacity;
122  uint32_t capacity_clz, capacity_ms4b, capacity_msb;
123  size_t bucket_size;
124  void* bucket_ptr;
125 
126  op_assert(load > 0.0 && load < 1.0,
127  "load %lf must within close interval (0.0, 1.0)\n", load);
128  capacity = (uint64_t)(num_objects / load);
129  if (capacity < 8)
130  capacity = 8;
131  capacity_clz = __builtin_clzl(capacity);
132  capacity_msb = 64 - capacity_clz;
133  capacity_ms4b = round_up_div(capacity, 1UL << (capacity_msb - 4));
134  capacity = HTCapacityInternal((uint8_t)capacity_clz, (uint8_t)capacity_ms4b);
135 
136  bucket_size = keysize + valsize + 1;
137 
138  *table = OPCalloc(heap, 1, sizeof(OPHashTable));
139  if (!*table)
140  return false;
141  bucket_ptr = OPCalloc(heap, 1, bucket_size * capacity);
142  if (!bucket_ptr)
143  {
144  OPDealloc(table);
145  return false;
146  }
147  (*table)->bucket_ref = OPPtr2Ref(bucket_ptr);
148  (*table)->large_data_threshold = DEFAULT_LARGE_DATA_THRESHOLD;
149  (*table)->capacity_clz = capacity_clz;
150  (*table)->capacity_ms4b = capacity_ms4b;
151  (*table)->objcnt_high = (uint64_t)(capacity * load);
152  (*table)->objcnt_low = capacity * 2 / 10;
153  (*table)->keysize = keysize;
154  (*table)->valsize = valsize;
155  return true;
156 }
157 
158 void
159 HTDestroy(OPHashTable* table)
160 {
161  OPDealloc(OPRef2Ptr(table, table->bucket_ref));
162  OPDealloc(table);
163 }
164 
165 uint64_t HTObjcnt(OPHashTable* table)
166 {
167  return table->objcnt;
168 }
169 
170 uint64_t HTCapacity(OPHashTable* table)
171 {
172  return HTCapacityInternal(table->capacity_clz, table->capacity_ms4b);
173 }
174 
175 size_t HTKeysize(OPHashTable* table)
176 {
177  return table->keysize;
178 }
179 
180 size_t HTValsize(OPHashTable* table)
181 {
182  return table->valsize;
183 }
184 
185 uint64_t HTCapacityInternal(uint8_t capacity_clz, uint8_t capacity_ms4b)
186 {
187  return (1UL << (64 - capacity_clz - 4)) * capacity_ms4b;
188 }
189 
190 static inline uint64_t
191 quadratic_exact(uint64_t hashed_key, int probe)
192 {
193  return hashed_key + probe * (probe + 1);
194 }
195 
196 static inline uint64_t
197 quadratic_partial(uint64_t probing_key, int probe)
198 {
199  return probing_key + probe * 2;
200 }
201 
202 // Fast mod and scale
203 // This is mod next power of two, times a number between 8 and 15
204 // then devide by 16. This gives us fast division on non power of
205 // two table size.
206 // Both linear probing and quadratic probing needs to double the
207 // probe sequence because the scaling part of this algorithm has
208 // some probability to trim off the last bit in the probed hash.
209 static inline uint64_t
210 fast_mod_scale(uint64_t probed_hash, uint64_t mask, uint64_t scale)
211 {
212  return (probed_hash & mask) * scale >> 4;
213 }
214 
215 static inline uintptr_t
216 hash_with_probe(OPHashTable* table, uint64_t key, int probe)
217 {
218  uint64_t mask = (1ULL << (64 - table->capacity_clz)) - 1;
219 
220  // These probing methods are just for experiments.
221  // They work on insertion and querying, but not deletion.
222  // Only the quadratic probing is supported for all operation.
223  // It is also the fastest probing strategry I find so far.
224 
225  // Linear probing
226  // Under high load, linear probe can increase up to 50.
227  // The high probe number makes both insert and query slow.
228  // uint64_t probed_hash = key + probe * 2;
229 
230  // Quadratic probing
231  uint64_t probed_hash = quadratic_exact(key, probe);
232 
233  // Double hashing
234  // double hashing gives good probe distribution, but lacking
235  // the cache locality makes it slower than both quadratic probing
236  // and linear probing.
237  // uint64_t up32key = key >> 32;
238  // uint64_t probed_hash = key + up32key * probe;
239 
240  return fast_mod_scale(probed_hash, mask, table->capacity_ms4b);
241 }
242 
243 static inline int
244 findprobe(OPHashTable* table, OPHash hasher, uintptr_t idx)
245 {
246  const size_t keysize = table->keysize;
247  const size_t valsize = table->valsize;
248  const size_t bucket_size = keysize + valsize + 1;
249  uint8_t* buckets = OPRef2Ptr(table, table->bucket_ref);
250  uint64_t hashed_key, probing_key, probing_idx;
251  uint64_t mask = (1ULL << (64 - table->capacity_clz)) - 1;
252 
253  hashed_key = hasher(&buckets[idx*bucket_size + 1], keysize);
254  probing_key = hashed_key;
255 
256  for (int i = 0; i <= table->longest_probes; i++)
257  {
258  probing_key = quadratic_partial(probing_key, i);
259  probing_idx = fast_mod_scale(probing_key, mask, table->capacity_ms4b);
260  if (probing_idx == idx)
261  return i;
262  }
263  OP_LOG_ERROR(logger, "Didn't find any match probe!\n");
264  return -1;
265 }
266 
267 static inline void
268 IncreaseProbeStat(OPHashTable* table, int probe)
269 {
270  table->objcnt++;
271  table->longest_probes = probe > table->longest_probes ?
272  probe : table->longest_probes;
273  table->stats[probe]++;
274 }
275 
276 static inline enum upsert_result_t
277 HTUpsertNewKey(OPHashTable* table, OPHash hasher,
278  uint64_t hashed_key,
279  void* key,
280  uint8_t** matched_bucket, int* probe_state)
281 {
282  const size_t keysize = table->keysize;
283  const size_t valsize = table->valsize;
284  const size_t bucket_size = keysize + valsize + 1;
285  uint8_t* restrict buckets;
286  int probe, old_probe;
287  uintptr_t idx, _idx;
288 
289  buckets = OPRef2Ptr(table, table->bucket_ref);
290  probe = 0;
291 
292  while (true)
293  {
294  idx = hash_with_probe(table, hashed_key, probe);
295  if (probe > PROBE_STATS_SIZE)
296  {
297  HTSizeUp(table, hasher);
298  probe = 0;
299  buckets = OPRef2Ptr(table, table->bucket_ref);
300  continue;
301  }
302  // empty bucket
303  if (buckets[idx * bucket_size] == 0)
304  {
305  IncreaseProbeStat(table, probe);
306  *matched_bucket = &buckets[idx * bucket_size];
307  return UPSERT_EMPTY;
308  }
309  // deleted bucket
310  else if (buckets[idx * bucket_size] == 2)
311  {
312  for (int p = probe+1; p <= table->longest_probes; p++)
313  {
314  _idx = hash_with_probe(table, hashed_key, p);
315  // if is empty or deleted, skip and look for next one
316  if (buckets[_idx * bucket_size] == 1)
317  continue;
318  if (memeq(key, &buckets[_idx * bucket_size + 1], bucket_size))
319  {
320  *matched_bucket = &buckets[_idx * bucket_size];
321  return UPSERT_DUP;
322  }
323  }
324  IncreaseProbeStat(table, probe);
325  *matched_bucket = &buckets[idx * bucket_size];
326  return UPSERT_EMPTY;
327  }
328  if (memeq(key, &buckets[idx * bucket_size + 1], keysize))
329  {
330  *matched_bucket = &buckets[idx * bucket_size];
331  return UPSERT_DUP;
332  }
333  old_probe = findprobe(table, hasher, idx);
334  if (probe > old_probe)
335  {
336  table->longest_probes = probe > table->longest_probes ?
337  probe : table->longest_probes;
338  table->stats[old_probe]--;
339  table->stats[probe]++;
340  *matched_bucket = &buckets[idx * bucket_size];
341  *probe_state = old_probe+1;
342  return UPSERT_PUSHDOWN;
343  }
344  probe++;
345  }
346 }
347 
348 static inline void
349 HTUpsertPushDown(OPHashTable* table, OPHash hasher,
350  uint8_t* bucket_cpy, int probe, uint8_t* avoid_bucket,
351  bool* resized)
352 {
353  const size_t keysize = table->keysize;
354  const size_t valsize = table->valsize;
355  const size_t bucket_size = keysize + valsize + 1;
356  uint8_t* restrict buckets;
357  int old_probe;
358  uint8_t bucket_tmp[bucket_size];
359  uintptr_t idx;
360  uint64_t hashed_key, iter, capacity;
361  int visit;
362  uintptr_t visited_idx[VISIT_IDX_CACHE];
363 
364  visit = 0;
365  *resized = false;
366  hashed_key = hasher(&bucket_cpy[1], keysize);
367  buckets = OPRef2Ptr(table, table->bucket_ref);
368  capacity = HTCapacity(table);
369  iter = 0;
370 
371  while (true)
372  {
373  next_iter:
374  iter++;
375  idx = hash_with_probe(table, hashed_key, probe);
376 
377  if (iter > capacity)
378  {
379  HTSizeUp(table, hasher);
380  capacity = HTCapacity(table);
381  iter = 0;
382  probe = 0;
383  buckets = OPRef2Ptr(table, table->bucket_ref);
384  *resized = true;
385  continue;
386  }
387  if (&buckets[idx * bucket_size] == avoid_bucket)
388  {
389  probe++;
390  continue;
391  }
392  if (visit > 2) // cycle can happen only after two visit
393  {
394  if (visit < VISIT_IDX_CACHE)
395  {
396  for (int i = 0; i < visit; i++)
397  {
398  if (idx == visited_idx[i])
399  {
400  probe++;
401  goto next_iter;
402  }
403  }
404  }
405  else
406  {
407  for (int i = visit + 1; i < visit + VISIT_IDX_CACHE; i++)
408  {
409  if (idx == visited_idx[i % VISIT_IDX_CACHE])
410  {
411  probe++;
412  goto next_iter;
413  }
414  }
415  }
416  }
417  visited_idx[visit % VISIT_IDX_CACHE] = idx;
418  visit++;
419 
420  // empty bucket or tombstone bucket
421  if (buckets[idx * bucket_size] != 1)
422  {
423  IncreaseProbeStat(table, probe);
424  memcpy(&buckets[idx * bucket_size], bucket_cpy, bucket_size);
425  return;
426  }
427 
428  old_probe = findprobe(table, hasher, idx);
429  if (probe > old_probe)
430  {
431  table->longest_probes = probe > table->longest_probes ?
432  probe : table->longest_probes;
433  table->stats[old_probe]--;
434  table->stats[probe]++;
435  memcpy(bucket_tmp, &buckets[idx * bucket_size], bucket_size);
436  memcpy(&buckets[idx * bucket_size], bucket_cpy, bucket_size);
437  memcpy(bucket_cpy, bucket_tmp, bucket_size);
438  probe = old_probe + 1;
439  hashed_key = hasher(&bucket_cpy[1], keysize);
440  continue;
441  }
442  probe++;
443  }
444 }
445 
446 static bool
447 HTSizeUp(OPHashTable* table, OPHash hasher)
448 {
449  const size_t keysize = table->keysize;
450  const size_t valsize = table->valsize;
451  const size_t bucket_size = keysize + valsize + 1;
452  const size_t large_data_threshold = table->large_data_threshold;
453  uint8_t* old_buckets;
454  uint8_t* new_buckets;
455  uint8_t new_capacity_ms4b, new_capacity_clz;
456  uint64_t old_capacity, new_capacity;
457  bool resized;
458 
459  old_capacity = HTCapacity(table);
460  old_buckets = OPRef2Ptr(table, table->bucket_ref);
461 
462  if (old_capacity * bucket_size >= large_data_threshold)
463  {
464  // increase size by 20% ~ 33%
465  switch(table->capacity_ms4b)
466  {
467  case 8:
468  new_capacity_ms4b = 10;
469  new_capacity_clz = table->capacity_clz;
470  break;
471  case 9:
472  case 10:
473  new_capacity_ms4b = 12;
474  new_capacity_clz = table->capacity_clz;
475  break;
476  case 11:
477  case 12:
478  new_capacity_ms4b = 14;
479  new_capacity_clz = table->capacity_clz;
480  break;
481  case 13:
482  case 14:
483  new_capacity_ms4b = 8;
484  new_capacity_clz = table->capacity_clz - 1;
485  break;
486  case 15:
487  new_capacity_ms4b = 10;
488  new_capacity_clz = table->capacity_clz - 1;
489  break;
490  default: op_assert(false, "Unknown capacity_ms4b %d\n",
491  table->capacity_ms4b);
492  }
493  }
494  else
495  {
496  new_capacity_ms4b = 8;
497  new_capacity_clz = table->capacity_ms4b == 8 ?
498  table->capacity_clz - 1 : table->capacity_clz - 2;
499  }
500  new_capacity = HTCapacityInternal(new_capacity_clz, new_capacity_ms4b);
501  OP_LOG_INFO(logger, "Resize from %" PRIu64 " to %" PRIu64,
502  old_capacity, new_capacity);
503 
504  new_buckets = OPCalloc(ObtainOPHeap(table), 1, bucket_size * new_capacity);
505  if (!new_buckets)
506  {
507  OP_LOG_ERROR(logger, "Cannot obtain new bucket for size %" PRIu64,
508  new_capacity);
509  return false;
510  }
511 
512  table->objcnt = 0;
513  table->objcnt_high = new_capacity * 8 / 10;
514  table->objcnt_low = new_capacity * 2 / 10;
515  table->capacity_clz = new_capacity_clz;
516  table->capacity_ms4b = new_capacity_ms4b;
517  table->longest_probes = 0;
518  memset(table->stats, 0x00, sizeof(uint32_t) * PROBE_STATS_SIZE);
519  table->bucket_ref = OPPtr2Ref(new_buckets);
520 
521  for (uint64_t idx = 0; idx < old_capacity; idx++)
522  {
523  if (old_buckets[idx*bucket_size] == 1)
524  {
525  HTUpsertPushDown(table, hasher, &old_buckets[idx * bucket_size],
526  0, NULL, &resized);
527  }
528  }
529  OPDealloc(old_buckets);
530  return true;
531 }
532 
533 static bool
534 HTSizeDown(OPHashTable* table, OPHash hasher)
535 {
536  const size_t keysize = table->keysize;
537  const size_t valsize = table->valsize;
538  const size_t bucket_size = keysize + valsize + 1;
539  uint8_t* old_buckets;
540  uint8_t* new_buckets;
541  uint8_t new_capacity_ms4b, new_capacity_clz;
542  uint64_t old_capacity, new_capacity;
543  bool resized;
544 
545  old_capacity = HTCapacity(table);
546  old_buckets = OPRef2Ptr(table, table->bucket_ref);
547  op_assert(old_capacity > 16,
548  "Can not resize smaller than 16, but got old_capacity %"
549  PRIu64 "\n", old_capacity);
550 
551  switch(table->capacity_ms4b)
552  {
553  case 8: // new load 0.45
554  case 9: // new load 0.50
555  case 10: // new load 0.55
556  case 11: // new load 0.60
557  new_capacity_ms4b = 8;
558  new_capacity_clz = table->capacity_clz + 1;
559  break;
560  case 12: // new load 0.40
561  case 13: // new load 0.43
562  case 14: // new load 0.46
563  case 15: // new load 0.50
564  new_capacity_ms4b = 12;
565  new_capacity_clz = table->capacity_clz + 1;
566  break;
567  default: op_assert(false, "Unknown capacity_ms4b %d\n",
568  table->capacity_ms4b);
569  }
570 
571  new_capacity = HTCapacityInternal(new_capacity_clz, new_capacity_ms4b);
572  OP_LOG_INFO(logger, "Resize from %" PRIu64 " to %" PRIu64,
573  old_capacity, new_capacity);
574  new_buckets = OPCalloc(ObtainOPHeap(table), 1, bucket_size * new_capacity);
575  if (!new_buckets)
576  {
577  OP_LOG_ERROR(logger, "Cannot obtain new bucket for size %" PRIu64,
578  new_capacity);
579  return false;
580  }
581 
582  table->objcnt = 0;
583  table->objcnt_high = new_capacity * 8 / 10;
584  table->objcnt_low = new_capacity * 2 / 10;
585  table->capacity_clz = new_capacity_clz;
586  table->capacity_ms4b = new_capacity_ms4b;
587  table->longest_probes = 0;
588  memset(table->stats, 0x00, sizeof(uint32_t) * PROBE_STATS_SIZE);
589  table->bucket_ref = OPPtr2Ref(new_buckets);
590 
591  for (uint64_t idx = 0; idx < old_capacity; idx++)
592  {
593  if (old_buckets[idx*bucket_size] == 1)
594  {
595  HTUpsertPushDown(table, hasher, &old_buckets[idx * bucket_size],
596  0, NULL, &resized);
597  }
598  }
599  OPDealloc(old_buckets);
600  return true;
601 }
602 
603 static inline
604 bool HTPreHashInsertCustom(OPHashTable* table, OPHash hasher,
605  uint64_t hashed_key, void* key, void* val)
606 {
607  const size_t keysize = table->keysize;
608  const size_t valsize = table->valsize;
609  const size_t bucket_size = keysize + valsize + 1;
610  enum upsert_result_t upsert_result;
611  uint8_t* matched_bucket;
612  int probe;
613  uint8_t bucket_cpy[bucket_size];
614  bool resized;
615 
616  if (table->objcnt > table->objcnt_high)
617  {
618  if(!HTSizeUp(table, hasher))
619  return false;
620  }
621 
622  upsert_result = HTUpsertNewKey(table, hasher, hashed_key, key,
623  &matched_bucket, &probe);
624 
625  switch (upsert_result)
626  {
627  case UPSERT_EMPTY:
628  *matched_bucket = 1;
629  memcpy(&matched_bucket[1], key, keysize);
630  case UPSERT_DUP:
631  memcpy(&matched_bucket[1 + keysize], val, valsize);
632  break;
633  case UPSERT_PUSHDOWN:
634  memcpy(bucket_cpy, matched_bucket, bucket_size);
635  memcpy(&matched_bucket[1], key, keysize);
636  memcpy(&matched_bucket[1 + keysize], val, valsize);
637  HTUpsertPushDown(table, hasher, bucket_cpy, probe,
638  matched_bucket, &resized);
639  }
640  return true;
641 }
642 
643 bool HTInsertCustom(OPHashTable* table, OPHash hasher, void* key, void* val)
644 {
645  uint64_t hashed_key;
646  hashed_key = hasher(key, table->keysize);
647  return HTPreHashInsertCustom(table, hasher, hashed_key, key, val);
648 }
649 
650 bool HTUpsertCustom(OPHashTable* table, OPHash hasher,
651  void* key, void** val_ref, bool* is_duplicate)
652 {
653  const size_t keysize = table->keysize;
654  const size_t valsize = table->valsize;
655  const size_t bucket_size = keysize + valsize + 1;
656  enum upsert_result_t upsert_result;
657  uint64_t hashed_key;
658  uint8_t* matched_bucket;
659  int probe;
660  uint8_t bucket_cpy[bucket_size];
661  bool resized;
662 
663  if (table->objcnt > table->objcnt_high)
664  {
665  if (!HTSizeUp(table, hasher))
666  return false;
667  }
668 
669  hashed_key = hasher(key, keysize);
670  upsert_result = HTUpsertNewKey(table, hasher, hashed_key, key,
671  &matched_bucket, &probe);
672  *val_ref = &matched_bucket[keysize + 1];
673  switch (upsert_result)
674  {
675  case UPSERT_EMPTY:
676  *is_duplicate = false;
677  *matched_bucket = 1;
678  memcpy(&matched_bucket[1], key, keysize);
679  break;
680  case UPSERT_DUP:
681  *is_duplicate = true;
682  break;
683  case UPSERT_PUSHDOWN:
684  *is_duplicate = false;
685  memcpy(bucket_cpy, matched_bucket, bucket_size);
686  memcpy(&matched_bucket[1], key, keysize);
687  HTUpsertPushDown(table, hasher, bucket_cpy, probe,
688  matched_bucket, &resized);
689  if (resized)
690  {
691  *val_ref = HTGetCustom(table, hasher, key);
692  }
693  }
694  return true;
695 }
696 
697 static inline bool
698 HTPreHashSearchIdx(OPHashTable* table, uint64_t hashed_key,
699  void* key, uintptr_t* idx)
700 {
701  const size_t keysize = table->keysize;
702  const size_t valsize = table->valsize;
703  const size_t bucket_size = keysize + valsize + 1;
704  uint8_t* buckets = OPRef2Ptr(table, table->bucket_ref);
705  uintptr_t idx_next;
706  uint64_t mask = (1ULL << (64 - table->capacity_clz)) - 1;
707  uint64_t probing_key;
708 
709  probing_key = hashed_key;
710  *idx = fast_mod_scale(probing_key, mask, table->capacity_ms4b);
711  probing_key = quadratic_partial(probing_key, 1);
712  idx_next = fast_mod_scale(probing_key, mask, table->capacity_ms4b);
713  for (int probe = 2; probe <= table->longest_probes+2; probe++)
714  {
715  __builtin_prefetch(&buckets[idx_next * bucket_size], 0, 0);
716  if (buckets[*idx * bucket_size] == 0)
717  return false;
718  if (buckets[*idx * bucket_size] == 2)
719  goto next_iter;
720  if (memeq(key, &buckets[*idx*bucket_size + 1], keysize))
721  return true;
722  next_iter:
723  *idx = idx_next;
724  probing_key = quadratic_partial(probing_key, probe);
725  idx_next = fast_mod_scale(probing_key, mask, table->capacity_ms4b);
726  }
727  return false;
728 }
729 
730 static inline bool
731 HTSearchIdx(OPHashTable* table, OPHash hasher, void* key, uintptr_t* idx)
732 {
733  const size_t keysize = table->keysize;
734  uint64_t hashed_key;
735  hashed_key = hasher(key, keysize);
736  return HTPreHashSearchIdx(table, hashed_key, key, idx);
737 }
738 
739 void* HTGetCustom(OPHashTable* table, OPHash hasher, void* key)
740 {
741  const size_t keysize = table->keysize;
742  const size_t valsize = table->valsize;
743  const size_t bucket_size = keysize + valsize + 1;
744  uint8_t* buckets = OPRef2Ptr(table, table->bucket_ref);
745  uintptr_t idx;
746  if (HTSearchIdx(table, hasher, key, &idx))
747  {
748  return &buckets[idx*bucket_size + keysize + 1];
749  }
750  return NULL;
751 }
752 
753 int HTGetProbeCustom(OPHashTable* table, OPHash hasher, void* key)
754 {
755  uintptr_t idx;
756  if (HTSearchIdx(table, hasher, key, &idx))
757  {
758  return findprobe(table, hasher, idx);
759  }
760  return -1;
761 }
762 
763 static inline void*
764 HTPreHashDeleteCustom(OPHashTable* table, OPHash hasher,
765  uint64_t hashed_key, void* key)
766 {
767  /*
768  * This works for load that is not super high, i.e. < 0.9.
769  * It slows down the growth for both E[psl] and Var[psl], but only
770  * slows down, not bounding it.
771  */
772  const size_t keysize = table->keysize;
773  const size_t valsize = table->valsize;
774  const size_t bucket_size = keysize + valsize + 1;
775  uint8_t* restrict buckets;
776  uintptr_t idx;
777  int record_probe;
778 
779  if (table->objcnt < table->objcnt_low &&
780  table->objcnt > 16)
781  {
782  if (!HTSizeDown(table, hasher))
783  return NULL;
784  }
785 
786  if (!HTPreHashSearchIdx(table, hashed_key, key, &idx))
787  return NULL;
788 
789  buckets = OPRef2Ptr(table, table->bucket_ref);
790 
791  table->objcnt--;
792  record_probe = findprobe(table, hasher, idx);
793  if (record_probe < PROBE_STATS_SIZE)
794  table->stats[record_probe]--;
795  else
796  OP_LOG_WARN(logger, "Large probe: %d\n", record_probe);
797 
798  if (record_probe == table->longest_probes &&
799  table->stats[record_probe] == 0)
800  {
801  for (int i = table->longest_probes; i >= 0; i--)
802  {
803  if (table->stats[i])
804  {
805  table->longest_probes = i;
806  break;
807  }
808  }
809  }
810  // Turns out marking tombstone is good enough.
811  // I used a have a implementation which lookup candidates
812  // with high probing count that can fullfill the deleted spot, and
813  // thought this would reduce both the mean and variance of the probe
814  // counts like robin hood insertion does.
815  // After some experiments I found this strategy gives similar probe
816  // distribution like the one without the rebalancing strategy.
817  buckets[idx * bucket_size] = 2;
818  return &buckets[idx * bucket_size + 1 + keysize];
819 }
820 
821 void* HTDeleteCustom(OPHashTable* table, OPHash hasher, void* key)
822 {
823  uint64_t hashed_key;
824  hashed_key = hasher(key, table->keysize);
825  return HTPreHashDeleteCustom(table, hasher, hashed_key, key);
826 }
827 
828 void HTIterate(OPHashTable* table, OPHashIterator iterator, void* context)
829 {
830  const size_t keysize = table->keysize;
831  const size_t valsize = table->valsize;
832  const size_t bucket_size = keysize + valsize + 1;
833  uint8_t* buckets = OPRef2Ptr(table, table->bucket_ref);
834  uint64_t capacity = HTCapacity(table);
835 
836  for (uint64_t idx = 0; idx < capacity; idx++)
837  {
838  if (buckets[idx*bucket_size] == 1)
839  {
840  iterator(&buckets[idx*bucket_size + 1],
841  &buckets[idx*bucket_size + 1 + keysize],
842  keysize, valsize, context);
843  }
844  }
845 }
846 
847 void HTPrintStat(OPHashTable* table)
848 {
849  for (int i = 0; i < PROBE_STATS_SIZE; i++)
850  if (table->stats[i])
851  printf("probe %02d: %d\n", i, table->stats[i]);
852 }
853 
854 uint32_t HTMaxProbe(OPHashTable* table)
855 {
856  return table->longest_probes;
857 }
858 
859 uint32_t HTProbeStat(OPHashTable* table, uint32_t idx)
860 {
861  if (idx < PROBE_STATS_SIZE)
862  return table->stats[idx];
863  return 0;
864 }
865 
866 HTFunnel* HTFunnelNewCustom(OPHashTable* table, OPHash hasher,
867  FunnelCB callback,
868  size_t slotsize, size_t partition_size)
869 {
870  HTFunnel* funnel;
871  size_t bucketsize;
872  int tube_num;
873 
874  funnel = malloc(sizeof(HTFunnel));
875  bucketsize = table->keysize + table->valsize + 1;
876  funnel->table = table;
877  funnel->hasher = hasher;
878  funnel->callback = callback;
879  funnel->slotsize = slotsize;
880  funnel->partition_clz = __builtin_clzll(partition_size / bucketsize);
881  funnel->capacity_clz = 0;
882  funnel->tubes = NULL;
883  funnel->flowheads = NULL;
884  if (funnel->partition_clz > table->capacity_clz)
885  {
886  funnel->capacity_clz = table->capacity_clz;
887  tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
888  funnel->tubes = malloc(tube_num * slotsize);
889  funnel->flowheads = malloc(tube_num * sizeof(ptrdiff_t*));
890  for (int i = 0; i < tube_num; i++)
891  funnel->flowheads[i] = i * funnel->slotsize;
892  }
893  return funnel;
894 }
895 
896 void HTFunnelDestroy(HTFunnel* funnel)
897 {
898  if (!funnel)
899  return;
900 
901  if (funnel->tubes)
902  free(funnel->tubes);
903  if (funnel->flowheads)
904  free(funnel->flowheads);
905  free(funnel);
906 }
907 
908 void HTFunnelPreHashInsert(HTFunnel* funnel,
909  uint64_t hashed_key,
910  void* key, void* value)
911 {
912  const size_t keysize = funnel->table->keysize;
913  const size_t valsize = funnel->table->valsize;
914  const size_t bucket_size = keysize + valsize + 1;
915  const size_t trip_bundle_size = sizeof(hashed_key) + keysize + valsize;
916 
917  OPHashTable* table;
918  int tube_num, old_tube_num, row_idx, probe;
919  uint64_t mask;
920  ptrdiff_t flowhead, flowbase, tubeidx;
921  ptrdiff_t* old_flowheads;
922  uint8_t *old_tubes, *tube_key, *tube_val, *matched_bucket;
923  uint64_t* tube_hashed_key;
924  bool resized;
925  enum upsert_result_t upsert_result;
926  uint8_t bucket_cpy[bucket_size];
927 
928  table = funnel->table;
929 
930  if (funnel->capacity_clz != table->capacity_clz)
931  {
932  if (funnel->capacity_clz == 0)
933  {
934  // If the capacity of the hash table is smaller than
935  // partitions, simply insert the items into the hash table.
936  // Otherwise, initialize the funnels.
937  if (funnel->partition_clz > table->capacity_clz)
938  {
939  funnel->capacity_clz = table->capacity_clz;
940  tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
941  funnel->tubes = malloc(tube_num * funnel->slotsize);
942  funnel->flowheads = malloc(tube_num * sizeof(ptrdiff_t*));
943  for (row_idx = 0; row_idx < tube_num; row_idx++)
944  funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
945  }
946  else
947  {
948  HTPreHashInsertCustom(table, funnel->hasher,
949  hashed_key, key, value);
950  return;
951  }
952  }
953  else
954  {
955  old_tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
956  old_tubes = funnel->tubes;
957  old_flowheads = funnel->flowheads;
958 
959  funnel->capacity_clz = table->capacity_clz;
960  tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
961  funnel->tubes = malloc(tube_num * funnel->slotsize);
962  funnel->flowheads = malloc(tube_num * sizeof(ptrdiff_t*));
963  for (row_idx = 0; row_idx < tube_num; row_idx++)
964  funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
965 
966  for (row_idx = 0; row_idx < old_tube_num; row_idx++)
967  {
968  tubeidx = row_idx * funnel->slotsize;
969  flowhead = old_flowheads[row_idx];
970  while (tubeidx < flowhead)
971  {
972  tube_hashed_key = (uint64_t*)&old_tubes[tubeidx];
973  tubeidx += sizeof(uint64_t);
974  tube_key = &old_tubes[tubeidx];
975  tubeidx += keysize;
976  tube_val = &old_tubes[tubeidx];
977  tubeidx += valsize;
978  HTFunnelPreHashInsert(funnel,
979  *tube_hashed_key,
980  tube_key, tube_val);
981  }
982  }
983  free(old_flowheads);
984  free(old_tubes);
985  }
986  }
987 
988  mask = (1ULL << (64 - funnel->capacity_clz)) - 1;
989  row_idx = (hashed_key & mask) >> funnel->partition_clz;
990  flowhead = funnel->flowheads[row_idx];
991  flowbase = row_idx * funnel->slotsize;
992 
993  // flush funnel into hash table
994  if (trip_bundle_size + flowhead - flowbase > funnel->slotsize)
995  {
996  tubeidx = flowbase;
997  while (tubeidx < flowhead)
998  {
999  tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1000  tubeidx += sizeof(uint64_t);
1001  tube_key = &funnel->tubes[tubeidx];
1002  tubeidx += keysize;
1003  tube_val = &funnel->tubes[tubeidx];
1004  tubeidx += valsize;
1005 
1006  upsert_result = HTUpsertNewKey(table, funnel->hasher,
1007  *tube_hashed_key,
1008  tube_key,
1009  &matched_bucket,
1010  &probe);
1011  switch (upsert_result)
1012  {
1013  case UPSERT_EMPTY:
1014  *matched_bucket = 1;
1015  memcpy(&matched_bucket[1], tube_key, keysize);
1016  case UPSERT_DUP:
1017  memcpy(&matched_bucket[1 + keysize], tube_val, valsize);
1018  break;
1019  case UPSERT_PUSHDOWN:
1020  memcpy(bucket_cpy, matched_bucket, bucket_size);
1021  memcpy(&matched_bucket[1], tube_key, keysize);
1022  memcpy(&matched_bucket[1 + keysize], tube_val, valsize);
1023  HTUpsertPushDown(table, funnel->hasher, bucket_cpy, probe,
1024  matched_bucket, &resized);
1025  }
1026  }
1027  funnel->flowheads[row_idx] = flowbase;
1028  flowhead = flowbase;
1029  }
1030 
1031  tubeidx = flowhead;
1032  memcpy(&funnel->tubes[tubeidx], &hashed_key, sizeof(uint64_t));
1033  tubeidx += sizeof(hashed_key);
1034  memcpy(&funnel->tubes[tubeidx], key, keysize);
1035  tubeidx += keysize;
1036  memcpy(&funnel->tubes[tubeidx], value, valsize);
1037  tubeidx += valsize;
1038  funnel->flowheads[row_idx] = tubeidx;
1039 }
1040 
1041 void HTFunnelInsert(HTFunnel* funnel,
1042  void* key, void* value)
1043 {
1044  uint64_t hashed_key;
1045  hashed_key = funnel->hasher(key, funnel->table->keysize);
1046  HTFunnelPreHashInsert(funnel, hashed_key, key, value);
1047 }
1048 
1049 void HTFunnelInsertFlush(HTFunnel* funnel)
1050 {
1051  const size_t keysize = funnel->table->keysize;
1052  const size_t valsize = funnel->table->valsize;
1053  const size_t bucket_size = keysize + valsize + 1;
1054 
1055  OPHashTable* table;
1056  int tube_num, row_idx, probe;
1057  ptrdiff_t flowhead, tubeidx;
1058  uint8_t *tube_key, *tube_val, *matched_bucket;
1059  uint64_t* tube_hashed_key;
1060  bool resized;
1061  enum upsert_result_t upsert_result;
1062  uint8_t bucket_cpy[bucket_size];
1063 
1064  if (!funnel->tubes || !funnel->table)
1065  return;
1066 
1067  table = funnel->table;
1068  tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1069 
1070  for (row_idx = 0; row_idx < tube_num; row_idx++)
1071  {
1072  tubeidx = row_idx * funnel->slotsize;
1073  flowhead = funnel->flowheads[row_idx];
1074  while (tubeidx < flowhead)
1075  {
1076  tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1077  tubeidx += sizeof(uint64_t);
1078  tube_key = &funnel->tubes[tubeidx];
1079  tubeidx += keysize;
1080  tube_val = &funnel->tubes[tubeidx];
1081  tubeidx += valsize;
1082 
1083  upsert_result = HTUpsertNewKey(table, funnel->hasher,
1084  *tube_hashed_key,
1085  tube_key,
1086  &matched_bucket,
1087  &probe);
1088  switch (upsert_result)
1089  {
1090  case UPSERT_EMPTY:
1091  *matched_bucket = 1;
1092  memcpy(&matched_bucket[1], tube_key, keysize);
1093  case UPSERT_DUP:
1094  memcpy(&matched_bucket[1 + keysize], tube_val, valsize);
1095  break;
1096  case UPSERT_PUSHDOWN:
1097  memcpy(bucket_cpy, matched_bucket, bucket_size);
1098  memcpy(&matched_bucket[1], tube_key, keysize);
1099  memcpy(&matched_bucket[1 + keysize], tube_val, valsize);
1100  HTUpsertPushDown(table, funnel->hasher, bucket_cpy, probe,
1101  matched_bucket, &resized);
1102  }
1103  }
1104  funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
1105  }
1106 }
1107 
1108 void HTFunnelPreHashUpsert(HTFunnel* funnel,
1109  uint64_t hashed_key,
1110  void* key, void* value,
1111  void* context, size_t ctxsize_st)
1112 {
1113  const size_t keysize = funnel->table->keysize;
1114  const size_t valsize = funnel->table->valsize;
1115  const size_t bucket_size = keysize + valsize + 1;
1116 
1117  size_t trip_bundle_size;
1118  OPHashTable* table;
1119  OPFunnelUpsertCB upsertcb;
1120  int tube_num, old_tube_num, row_idx, probe;
1121  uint64_t mask;
1122  ptrdiff_t flowhead, flowbase, tubeidx;
1123  ptrdiff_t* old_flowheads;
1124  uint8_t *old_tubes, *tube_key, *tube_val, *tube_ctx, *matched_bucket;
1125  uint32_t* tube_ctxsize;
1126  uint32_t ctxsize;
1127  uint64_t* tube_hashed_key;
1128  bool resized;
1129  enum upsert_result_t upsert_result;
1130  uint8_t bucket_cpy[bucket_size];
1131 
1132  table = funnel->table;
1133  upsertcb = funnel->callback.upsertcb;
1134  ctxsize = (uint32_t)ctxsize_st;
1135 
1136  if (funnel->capacity_clz != table->capacity_clz)
1137  {
1138  if (funnel->capacity_clz == 0)
1139  {
1140  // If the capacity of the hash table is smaller than
1141  // partitions, simply insert the items into the hash table.
1142  // Otherwise, initialize the funnels.
1143  if (funnel->partition_clz > table->capacity_clz)
1144  {
1145  funnel->capacity_clz = table->capacity_clz;
1146  tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1147  funnel->tubes = malloc(tube_num * funnel->slotsize);
1148  funnel->flowheads = malloc(tube_num * sizeof(ptrdiff_t*));
1149  for (row_idx = 0; row_idx < tube_num; row_idx++)
1150  funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
1151  }
1152  else
1153  {
1154  HTInsertCustom(table, funnel->hasher, key, value);
1155  upsert_result = HTUpsertNewKey(table, funnel->hasher,
1156  hashed_key,
1157  key,
1158  &matched_bucket,
1159  &probe);
1160  switch (upsert_result)
1161  {
1162  case UPSERT_EMPTY:
1163  *matched_bucket = 1;
1164  memcpy(&matched_bucket[1], key, keysize);
1165  if (upsertcb)
1166  upsertcb(&matched_bucket[1], // key
1167  &matched_bucket[1 + keysize], // table_value
1168  value, // upsert_value
1169  context, // context
1170  keysize, valsize, ctxsize,
1171  false); // is_duplicate = false
1172  break;
1173  case UPSERT_DUP:
1174  if (upsertcb)
1175  upsertcb(&matched_bucket[1], // key
1176  &matched_bucket[1 + keysize], // table_value
1177  value, // upsert_value
1178  context, // context
1179  keysize, valsize, ctxsize,
1180  true); // is_duplicate = true
1181  break;
1182  case UPSERT_PUSHDOWN:
1183  memcpy(bucket_cpy, matched_bucket, bucket_size);
1184  memcpy(&matched_bucket[1], key, keysize);
1185  HTUpsertPushDown(table, funnel->hasher, bucket_cpy, probe,
1186  matched_bucket, &resized);
1187  // if resized, the matched bucket no longer point to correct
1188  // address of the inserted bucket, we need to search for it
1189  // again.
1190  if (resized)
1191  {
1192  // reference to value
1193  matched_bucket = HTGetCustom(table, funnel->hasher, key);
1194  // change it to regular matched bucket
1195  matched_bucket -= 1 + keysize;
1196  }
1197  if (upsertcb)
1198  upsertcb(&matched_bucket[1], // key
1199  &matched_bucket[1 + keysize], // table_value
1200  value, // upsert_value
1201  context, // context
1202  keysize, valsize, ctxsize,
1203  false); // is_duplicate = false
1204  }
1205  return;
1206  }
1207  }
1208  else
1209  {
1210  old_tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1211  old_tubes = funnel->tubes;
1212  old_flowheads = funnel->flowheads;
1213 
1214  funnel->capacity_clz = table->capacity_clz;
1215  tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1216  funnel->tubes = malloc(tube_num * funnel->slotsize);
1217  funnel->flowheads = malloc(tube_num * sizeof(ptrdiff_t*));
1218  for (row_idx = 0; row_idx < tube_num; row_idx++)
1219  funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
1220 
1221  for (row_idx = 0; row_idx < old_tube_num; row_idx++)
1222  {
1223  tubeidx = row_idx * funnel->slotsize;
1224  flowhead = old_flowheads[row_idx];
1225  while (tubeidx < flowhead)
1226  {
1227  tube_hashed_key = (uint64_t*)&old_tubes[tubeidx];
1228  tubeidx += sizeof(uint64_t);
1229  tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1230  tubeidx += sizeof(uint32_t);
1231  tube_key = &old_tubes[tubeidx];
1232  tubeidx += keysize;
1233  tube_val = &old_tubes[tubeidx];
1234  tubeidx += valsize;
1235  tube_ctx = &funnel->tubes[tubeidx];
1236  tubeidx += *tube_ctxsize;
1237  upsert_result = HTUpsertNewKey(table, funnel->hasher,
1238  *tube_hashed_key,
1239  tube_key,
1240  &matched_bucket,
1241  &probe);
1242  switch (upsert_result)
1243  {
1244  case UPSERT_EMPTY:
1245  *matched_bucket = 1;
1246  memcpy(&matched_bucket[1], tube_key, keysize);
1247  if (upsertcb)
1248  upsertcb(&matched_bucket[1], // key
1249  &matched_bucket[1 + keysize], // table_value
1250  tube_val, // funnel_value
1251  tube_ctx, // context
1252  keysize, valsize, *tube_ctxsize,
1253  false); // is_duplicate = false
1254  break;
1255  case UPSERT_DUP:
1256  if (upsertcb)
1257  upsertcb(&matched_bucket[1], // key
1258  &matched_bucket[1 + keysize], // table_value
1259  tube_val, // funnel_value
1260  tube_ctx, // context
1261  keysize, valsize, *tube_ctxsize,
1262  true); // is_duplicate = true
1263  break;
1264  case UPSERT_PUSHDOWN:
1265  memcpy(bucket_cpy, matched_bucket, bucket_size);
1266  memcpy(&matched_bucket[1], tube_key, keysize);
1267  HTUpsertPushDown(table, funnel->hasher, bucket_cpy, probe,
1268  matched_bucket, &resized);
1269  if (resized)
1270  {
1271  // reference to value
1272  matched_bucket = HTGetCustom(table, funnel->hasher,
1273  tube_key);
1274  // change it to regular matched bucket
1275  matched_bucket -= 1 + keysize;
1276  }
1277  if (upsertcb)
1278  upsertcb(&matched_bucket[1], // key
1279  &matched_bucket[1 + keysize], // table_value
1280  tube_val, // funnel_value
1281  tube_ctx, // context
1282  keysize, valsize, *tube_ctxsize,
1283  false); // is_duplicate = false
1284  }
1285  }
1286  }
1287  free(old_flowheads);
1288  free(old_tubes);
1289  }
1290  }
1291 
1292  mask = (1ULL << (64 - funnel->capacity_clz)) - 1;
1293  row_idx = (hashed_key & mask) >> funnel->partition_clz;
1294  flowhead = funnel->flowheads[row_idx];
1295  flowbase = row_idx * funnel->slotsize;
1296  trip_bundle_size = sizeof(hashed_key) + sizeof(uint32_t)
1297  + keysize + valsize + ctxsize;
1298  op_assert(trip_bundle_size < funnel->slotsize);
1299 
1300  // flush funnel into hash table
1301  if (trip_bundle_size + flowhead - flowbase > funnel->slotsize)
1302  {
1303  tubeidx = flowbase;
1304  while (tubeidx < flowhead)
1305  {
1306  tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1307  tubeidx += sizeof(uint64_t);
1308  tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1309  tubeidx += sizeof(uint32_t);
1310  tube_key = &funnel->tubes[tubeidx];
1311  tubeidx += keysize;
1312  tube_val = &funnel->tubes[tubeidx];
1313  tubeidx += valsize;
1314  tube_ctx = &funnel->tubes[tubeidx];
1315  tubeidx += *tube_ctxsize;
1316  upsert_result = HTUpsertNewKey(table, funnel->hasher,
1317  *tube_hashed_key,
1318  tube_key,
1319  &matched_bucket,
1320  &probe);
1321  switch (upsert_result)
1322  {
1323  case UPSERT_EMPTY:
1324  *matched_bucket = 1;
1325  memcpy(&matched_bucket[1], tube_key, keysize);
1326  if (upsertcb)
1327  upsertcb(&matched_bucket[1], // key
1328  &matched_bucket[1 + keysize], // table_value
1329  tube_val, // funnel_value
1330  tube_ctx, // context
1331  keysize, valsize, *tube_ctxsize,
1332  false); // is_duplicate = false
1333  break;
1334  case UPSERT_DUP:
1335  if (upsertcb)
1336  upsertcb(&matched_bucket[1], // key
1337  &matched_bucket[1 + keysize], // table_value
1338  tube_val, // funnel_value
1339  tube_ctx, // context
1340  keysize, valsize, *tube_ctxsize,
1341  true); // is_duplicate = true
1342  break;
1343  case UPSERT_PUSHDOWN:
1344  memcpy(bucket_cpy, matched_bucket, bucket_size);
1345  memcpy(&matched_bucket[1], tube_key, keysize);
1346  HTUpsertPushDown(table, funnel->hasher, bucket_cpy, probe,
1347  matched_bucket, &resized);
1348  // if resized, the matched bucket no longer point to correct
1349  // address of the inserted bucket, we need to search for it again.
1350  if (resized)
1351  {
1352  // reference to value
1353  matched_bucket = HTGetCustom(table, funnel->hasher, tube_key);
1354  // change it to regular matched bucket
1355  matched_bucket -= 1 + keysize;
1356  }
1357  if (upsertcb)
1358  upsertcb(&matched_bucket[1], // key
1359  &matched_bucket[1 + keysize], // table_value
1360  tube_val, // funnel_value
1361  tube_ctx, // context
1362  keysize, valsize, *tube_ctxsize,
1363  false); // is_duplicate = false
1364  }
1365  }
1366  funnel->flowheads[row_idx] = flowbase;
1367  flowhead = flowbase;
1368  }
1369 
1370  tubeidx = flowhead;
1371  memcpy(&funnel->tubes[tubeidx], &hashed_key, sizeof(uint64_t));
1372  tubeidx += sizeof(hashed_key);
1373  memcpy(&funnel->tubes[tubeidx], &ctxsize, sizeof(uint32_t));
1374  tubeidx += sizeof(uint32_t);
1375  memcpy(&funnel->tubes[tubeidx], key, keysize);
1376  tubeidx += keysize;
1377  memcpy(&funnel->tubes[tubeidx], value, valsize);
1378  tubeidx += valsize;
1379  memcpy(&funnel->tubes[tubeidx], context, ctxsize);
1380  tubeidx += ctxsize;
1381  funnel->flowheads[row_idx] = tubeidx;
1382 }
1383 
1384 void HTFunnelUpsert(HTFunnel* funnel,
1385  void* key, void* value, void* context, size_t ctxsize)
1386 {
1387  uint64_t hashed_key;
1388  hashed_key = funnel->hasher(key, funnel->table->keysize);
1389  HTFunnelPreHashUpsert(funnel, hashed_key, key, value, context, ctxsize);
1390 }
1391 
1392 void HTFunnelUpsertFlush(HTFunnel* funnel)
1393 {
1394  const size_t keysize = funnel->table->keysize;
1395  const size_t valsize = funnel->table->valsize;
1396  const size_t bucket_size = keysize + valsize + 1;
1397 
1398  OPHashTable* table;
1399  OPFunnelUpsertCB upsertcb;
1400  int tube_num, row_idx, probe;
1401  ptrdiff_t flowhead, tubeidx;
1402  uint8_t *tube_key, *tube_val, *tube_ctx, *matched_bucket;
1403  uint32_t* tube_ctxsize;
1404  uint64_t* tube_hashed_key;
1405  bool resized;
1406  enum upsert_result_t upsert_result;
1407  uint8_t bucket_cpy[bucket_size];
1408 
1409  if (!funnel->tubes || !funnel->table)
1410  return;
1411 
1412  table = funnel->table;
1413  upsertcb = funnel->callback.upsertcb;
1414  tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1415 
1416  for (row_idx = 0; row_idx < tube_num; row_idx++)
1417  {
1418  tubeidx = row_idx * funnel->slotsize;
1419  flowhead = funnel->flowheads[row_idx];
1420  while (tubeidx < flowhead)
1421  {
1422  tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1423  tubeidx += sizeof(uint64_t);
1424  tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1425  tubeidx += sizeof(uint32_t);
1426  tube_key = &funnel->tubes[tubeidx];
1427  tubeidx += keysize;
1428  tube_val = &funnel->tubes[tubeidx];
1429  tubeidx += valsize;
1430  tube_ctx = &funnel->tubes[tubeidx];
1431  tubeidx += *tube_ctxsize;
1432 
1433  upsert_result = HTUpsertNewKey(table, funnel->hasher,
1434  *tube_hashed_key,
1435  tube_key,
1436  &matched_bucket,
1437  &probe);
1438  switch (upsert_result)
1439  {
1440  case UPSERT_EMPTY:
1441  *matched_bucket = 1;
1442  memcpy(&matched_bucket[1], tube_key, keysize);
1443  if (upsertcb)
1444  upsertcb(&matched_bucket[1], // key
1445  &matched_bucket[1 + keysize], // table_value
1446  tube_val, // funnel_value
1447  tube_ctx, // context
1448  keysize, valsize, *tube_ctxsize,
1449  false); // is_duplicate = false
1450  break;
1451  case UPSERT_DUP:
1452  if (upsertcb)
1453  upsertcb(&matched_bucket[1], // key
1454  &matched_bucket[1 + keysize], // table_value
1455  tube_val, // funnel_value
1456  tube_ctx, // context
1457  keysize, valsize, *tube_ctxsize,
1458  true); // is_duplicate = true
1459  break;
1460  case UPSERT_PUSHDOWN:
1461  memcpy(bucket_cpy, matched_bucket, bucket_size);
1462  memcpy(&matched_bucket[1], tube_key, keysize);
1463  HTUpsertPushDown(table, funnel->hasher, bucket_cpy, probe,
1464  matched_bucket, &resized);
1465  // if resized, the matched bucket no longer point to correct
1466  // address of the inserted bucket, we need to search for it again.
1467  if (resized)
1468  {
1469  // reference to value
1470  matched_bucket = HTGetCustom(table, funnel->hasher, tube_key);
1471  // change it to regular matched bucket
1472  matched_bucket -= 1 + keysize;
1473  }
1474  if (upsertcb)
1475  upsertcb(&matched_bucket[1], // key
1476  &matched_bucket[1 + keysize], // table_value
1477  tube_val, // funnel_value
1478  tube_ctx, // context
1479  keysize, valsize, *tube_ctxsize,
1480  false); // is_duplicate = false
1481  }
1482  }
1483  funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
1484  }
1485 }
1486 
1487 void HTFunnelGet(HTFunnel* funnel, void* key, void* context, size_t ctxsize)
1488 {
1489  uint64_t hashed_key;
1490  hashed_key = funnel->hasher(key, funnel->table->keysize);
1491  HTFunnelPreHashGet(funnel, hashed_key, key, context, ctxsize);
1492 }
1493 
1494 void HTFunnelPreHashGet(HTFunnel* funnel, uint64_t hashed_key,
1495  void* key, void* context, size_t ctxsize_st)
1496 {
1497  const size_t keysize = funnel->table->keysize;
1498  const size_t valsize = funnel->table->valsize;
1499  const size_t bucket_size = keysize + valsize + 1;
1500 
1501  OPHashTable* table;
1502  OPFunnelGetCB getcb;
1503  int row_idx;
1504  uint64_t mask;
1505  size_t trip_bundle_size;
1506  ptrdiff_t flowhead, flowbase, tubeidx;
1507  uintptr_t bucket_idx;
1508  uint8_t *tube_key, *tube_ctx;
1509  uint32_t* tube_ctxsize;
1510  uint32_t ctxsize;
1511  uint64_t* tube_hashed_key;
1512  uint8_t* restrict buckets;
1513 
1514  table = funnel->table;
1515  buckets = OPRef2Ptr(table, table->bucket_ref);
1516  getcb = funnel->callback.getcb;
1517 
1518  // hash table is too small for using funnel
1519  if (!funnel->tubes)
1520  {
1521  if (HTPreHashSearchIdx(table,
1522  hashed_key,
1523  key,
1524  &bucket_idx))
1525  {
1526  if (getcb)
1527  getcb(&buckets[bucket_idx * bucket_size + 1],
1528  &buckets[bucket_idx * bucket_size + 1 + keysize],
1529  context, keysize, valsize, ctxsize_st);
1530  }
1531  else
1532  {
1533  if (getcb)
1534  getcb(key, NULL, context, keysize, valsize, ctxsize_st);
1535  }
1536  return;
1537  }
1538 
1539  ctxsize = (uint32_t)ctxsize_st;
1540  mask = (1ULL << (64 - funnel->capacity_clz)) - 1;
1541  row_idx = (hashed_key & mask) >> funnel->partition_clz;
1542  flowhead = funnel->flowheads[row_idx];
1543  flowbase = row_idx * funnel->slotsize;
1544  trip_bundle_size = sizeof(hashed_key) + sizeof(uint32_t) + keysize + ctxsize;
1545 
1546  // flush funnel and call callback
1547  if (trip_bundle_size + flowhead - flowbase > funnel->slotsize)
1548  {
1549  tubeidx = flowbase;
1550  while (tubeidx < flowhead)
1551  {
1552  tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1553  tubeidx += sizeof(uint64_t);
1554  tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1555  tubeidx += sizeof(uint32_t);
1556  tube_key = &funnel->tubes[tubeidx];
1557  tubeidx += keysize;
1558  tube_ctx = &funnel->tubes[tubeidx];
1559  tubeidx += *tube_ctxsize;
1560  if (HTPreHashSearchIdx(table,
1561  *tube_hashed_key,
1562  tube_key,
1563  &bucket_idx))
1564  {
1565  if (getcb)
1566  getcb(&buckets[bucket_idx * bucket_size + 1],
1567  &buckets[bucket_idx * bucket_size + 1 + keysize],
1568  tube_ctx, keysize, valsize, *tube_ctxsize);
1569  }
1570  else
1571  {
1572  if (getcb)
1573  getcb(tube_key, NULL,
1574  tube_ctx, keysize, valsize, *tube_ctxsize);
1575  }
1576  }
1577  funnel->flowheads[row_idx] = flowbase;
1578  flowhead = flowbase;
1579  }
1580 
1581  tubeidx = flowhead;
1582  memcpy(&funnel->tubes[tubeidx], &hashed_key, sizeof(uint64_t));
1583  tubeidx += sizeof(hashed_key);
1584  memcpy(&funnel->tubes[tubeidx], &ctxsize, sizeof(uint32_t));
1585  tubeidx += sizeof(uint32_t);
1586  memcpy(&funnel->tubes[tubeidx], key, keysize);
1587  tubeidx += keysize;
1588  memcpy(&funnel->tubes[tubeidx], context, ctxsize);
1589  tubeidx += ctxsize;
1590  funnel->flowheads[row_idx] = tubeidx;
1591 }
1592 
1593 void HTFunnelGetFlush(HTFunnel* funnel)
1594 {
1595  const size_t keysize = funnel->table->keysize;
1596  const size_t valsize = funnel->table->valsize;
1597  const size_t bucket_size = keysize + valsize + 1;
1598 
1599  OPHashTable* table;
1600  OPFunnelGetCB getcb;
1601  int tube_num, row_idx;
1602  ptrdiff_t flowhead, tubeidx;
1603  uintptr_t bucket_idx;
1604  uint8_t *tube_key, *tube_ctx;
1605  uint32_t* tube_ctxsize;
1606  uint64_t* tube_hashed_key;
1607  uint8_t* restrict buckets;
1608 
1609  if (!funnel->tubes || !funnel->table)
1610  return;
1611 
1612  table = funnel->table;
1613  buckets = OPRef2Ptr(table, table->bucket_ref);
1614  getcb = funnel->callback.getcb;
1615  tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1616 
1617  for (row_idx = 0; row_idx < tube_num; row_idx++)
1618  {
1619  tubeidx = row_idx * funnel->slotsize;
1620  flowhead = funnel->flowheads[row_idx];
1621  while (tubeidx < flowhead)
1622  {
1623  tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1624  tubeidx += sizeof(uint64_t);
1625  tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1626  tubeidx += sizeof(uint32_t);
1627  tube_key = &funnel->tubes[tubeidx];
1628  tubeidx += keysize;
1629  tube_ctx = &funnel->tubes[tubeidx];
1630  tubeidx += *tube_ctxsize;
1631  if (HTPreHashSearchIdx(table,
1632  *tube_hashed_key,
1633  tube_key,
1634  &bucket_idx))
1635  {
1636  if (getcb)
1637  getcb(&buckets[bucket_idx * bucket_size + 1],
1638  &buckets[bucket_idx * bucket_size + 1 + keysize],
1639  tube_ctx, keysize, valsize, *tube_ctxsize);
1640  }
1641  else
1642  {
1643  if (getcb)
1644  getcb(tube_key, NULL,
1645  tube_ctx, keysize, valsize, *tube_ctxsize);
1646  }
1647  }
1648  funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
1649  }
1650 }
1651 
1652 void HTFunnelDelete(HTFunnel* funnel, void* key,
1653  void* context, size_t ctxsize)
1654 {
1655  uint64_t hashed_key;
1656  hashed_key = funnel->hasher(key, funnel->table->keysize);
1657  HTFunnelPreHashDelete(funnel, hashed_key, key, context, ctxsize);
1658 }
1659 
1660 void HTFunnelPreHashDelete(HTFunnel* funnel, uint64_t hashed_key,
1661  void* key, void* context, size_t ctxsize_st)
1662 {
1663  const size_t keysize = funnel->table->keysize;
1664  const size_t valsize = funnel->table->valsize;
1665 
1666  OPHashTable* table;
1667  OPFunnelDeleteCB deletecb;
1668  int row_idx;
1669  uint64_t mask;
1670  size_t trip_bundle_size;
1671  ptrdiff_t flowhead, flowbase, tubeidx;
1672  uint8_t *tube_key, *tube_ctx, *deleted_key, *deleted_val;
1673  uint32_t* tube_ctxsize;
1674  uint32_t ctxsize;
1675  uint64_t* tube_hashed_key;
1676 
1677  table = funnel->table;
1678  deletecb = funnel->callback.deletecb;
1679 
1680  // hash table is too small for using funnel
1681  if (!funnel->tubes)
1682  {
1683  deleted_val = HTPreHashDeleteCustom(table,
1684  funnel->hasher,
1685  hashed_key, key);
1686  if (deleted_val)
1687  {
1688  deleted_key = deleted_val - keysize;
1689  if (deletecb)
1690  deletecb(deleted_key, deleted_val,
1691  context, keysize, valsize, ctxsize_st);
1692  }
1693  else
1694  {
1695  if (deletecb)
1696  deletecb(key, NULL, context, keysize, valsize, ctxsize_st);
1697  }
1698  return;
1699  }
1700 
1701  ctxsize = (uint32_t)ctxsize_st;
1702  mask = (1ULL << (64 - funnel->capacity_clz)) - 1;
1703  row_idx = (hashed_key & mask) >> funnel->partition_clz;
1704  flowhead = funnel->flowheads[row_idx];
1705  flowbase = row_idx * funnel->slotsize;
1706  trip_bundle_size = sizeof(hashed_key) + sizeof(uint32_t) + keysize + ctxsize;
1707 
1708  // flush funnel and call callback
1709  if (trip_bundle_size + flowhead - flowbase > funnel->slotsize)
1710  {
1711  tubeidx = flowbase;
1712  while (tubeidx < flowhead)
1713  {
1714  tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1715  tubeidx += sizeof(uint64_t);
1716  tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1717  tubeidx += sizeof(uint32_t);
1718  tube_key = &funnel->tubes[tubeidx];
1719  tubeidx += keysize;
1720  tube_ctx = &funnel->tubes[tubeidx];
1721  tubeidx += *tube_ctxsize;
1722  deleted_val = HTPreHashDeleteCustom(table,
1723  funnel->hasher,
1724  *tube_hashed_key, tube_key);
1725  if (deleted_val)
1726  {
1727  deleted_key = deleted_val - keysize;
1728  if (deletecb)
1729  deletecb(deleted_key, deleted_val,
1730  tube_ctx, keysize, valsize, *tube_ctxsize);
1731  }
1732  else
1733  {
1734  if (deletecb)
1735  deletecb(tube_key, NULL,
1736  tube_ctx, keysize, valsize, *tube_ctxsize);
1737  }
1738  }
1739  funnel->flowheads[row_idx] = flowbase;
1740  flowhead = flowbase;
1741  }
1742 
1743  tubeidx = flowhead;
1744  memcpy(&funnel->tubes[tubeidx], &hashed_key, sizeof(uint64_t));
1745  tubeidx += sizeof(hashed_key);
1746  memcpy(&funnel->tubes[tubeidx], &ctxsize, sizeof(uint32_t));
1747  tubeidx += sizeof(uint32_t);
1748  memcpy(&funnel->tubes[tubeidx], key, keysize);
1749  tubeidx += keysize;
1750  memcpy(&funnel->tubes[tubeidx], context, ctxsize);
1751  tubeidx += ctxsize;
1752  funnel->flowheads[row_idx] = tubeidx;
1753 }
1754 
1755 void HTFunnelDeleteFlush(HTFunnel* funnel)
1756 {
1757  const size_t keysize = funnel->table->keysize;
1758  const size_t valsize = funnel->table->valsize;
1759 
1760  OPHashTable* table;
1761  OPFunnelDeleteCB deletecb;
1762  int tube_num, row_idx;
1763  ptrdiff_t flowhead, tubeidx;
1764  uint8_t *tube_key, *tube_ctx, *deleted_key, *deleted_val;
1765  uint32_t* tube_ctxsize;
1766  uint64_t* tube_hashed_key;
1767 
1768  if (!funnel->tubes || !funnel->table)
1769  return;
1770 
1771  table = funnel->table;
1772  deletecb = funnel->callback.deletecb;
1773  tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1774 
1775  for (row_idx = 0; row_idx < tube_num; row_idx++)
1776  {
1777  tubeidx = row_idx * funnel->slotsize;
1778  flowhead = funnel->flowheads[row_idx];
1779  while (tubeidx < flowhead)
1780  {
1781  tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1782  tubeidx += sizeof(uint64_t);
1783  tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1784  tubeidx += sizeof(uint32_t);
1785  tube_key = &funnel->tubes[tubeidx];
1786  tubeidx += keysize;
1787  tube_ctx = &funnel->tubes[tubeidx];
1788  tubeidx += *tube_ctxsize;
1789  deleted_val = HTPreHashDeleteCustom(table,
1790  funnel->hasher,
1791  *tube_hashed_key, tube_key);
1792  if (deleted_val)
1793  {
1794  deleted_key = deleted_val - keysize;
1795  if (deletecb)
1796  deletecb(deleted_key, deleted_val,
1797  tube_ctx, keysize, valsize, *tube_ctxsize);
1798  }
1799  else
1800  {
1801  if (deletecb)
1802  deletecb(tube_key, NULL,
1803  tube_ctx, keysize, valsize, *tube_ctxsize);
1804  }
1805  }
1806  funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
1807  }
1808 }
1809 
1810 /* robin_hood.c ends here */
Opaque object for memory allocation.
static opref_t OPPtr2Ref(void *addr)
Converts a pointer allocated in OPHeap to an opref_t.
Definition: op_malloc.h:295
A general hashmap/hashset/hashmultimap implemantation using robin hood hashing.
uintptr_t opref_t
The "pointer type" used within objects created by OPHeap.
Definition: op_malloc.h:109
static void * OPRef2Ptr(void *ptr_in_heap, opref_t ref)
Converts an opref_t reference to a regular pointer.
Definition: op_malloc.h:309
void(* OPHashIterator)(void *key, void *value, size_t keysize, size_t valsize, void *context)
HashTable iterator interface.
Definition: op_hash.h:59
uint64_t(* OPHash)(void *key, size_t size)
Hash function interface.
Definition: op_hash.h:45