55 #include "opic/common/op_utils.h" 60 #define PROBE_STATS_SIZE 64 61 #define DEFAULT_LARGE_DATA_THRESHOLD (1UL << 30) 62 #define VISIT_IDX_CACHE 8 64 OP_LOGGER_FACTORY(logger,
"opic.hash.robin_hood");
73 static inline enum upsert_result_t
74 HTUpsertNewKey(OPHashTable* table,
OPHash hasher,
77 uint8_t** matched_bucket,
81 HTUpsertPushDown(OPHashTable* table,
OPHash hasher,
82 uint8_t* bucket_cpy,
int probe,
83 uint8_t* avoid_bucket,
bool* resize);
85 static bool HTSizeUp(OPHashTable* table,
OPHash hasher);
92 uint64_t large_data_threshold;
94 uint8_t capacity_ms4b;
95 uint16_t longest_probes;
98 uint32_t stats[PROBE_STATS_SIZE];
104 OPHashTable* restrict table;
108 uint8_t capacity_clz;
109 uint8_t partition_clz;
110 uint8_t* restrict tubes;
111 ptrdiff_t* restrict flowheads;
115 uint64_t HTCapacityInternal(uint8_t capacity_clz, uint8_t capacity_ms4b);
118 HTNew(
OPHeap* heap, OPHashTable** table,
119 uint64_t num_objects,
double load,
size_t keysize,
size_t valsize)
122 uint32_t capacity_clz, capacity_ms4b, capacity_msb;
126 op_assert(load > 0.0 && load < 1.0,
127 "load %lf must within close interval (0.0, 1.0)\n", load);
128 capacity = (uint64_t)(num_objects / load);
131 capacity_clz = __builtin_clzl(capacity);
132 capacity_msb = 64 - capacity_clz;
133 capacity_ms4b = round_up_div(capacity, 1UL << (capacity_msb - 4));
134 capacity = HTCapacityInternal((uint8_t)capacity_clz, (uint8_t)capacity_ms4b);
136 bucket_size = keysize + valsize + 1;
138 *table = OPCalloc(heap, 1,
sizeof(OPHashTable));
141 bucket_ptr = OPCalloc(heap, 1, bucket_size * capacity);
147 (*table)->bucket_ref =
OPPtr2Ref(bucket_ptr);
148 (*table)->large_data_threshold = DEFAULT_LARGE_DATA_THRESHOLD;
149 (*table)->capacity_clz = capacity_clz;
150 (*table)->capacity_ms4b = capacity_ms4b;
151 (*table)->objcnt_high = (uint64_t)(capacity * load);
152 (*table)->objcnt_low = capacity * 2 / 10;
153 (*table)->keysize = keysize;
154 (*table)->valsize = valsize;
159 HTDestroy(OPHashTable* table)
161 OPDealloc(
OPRef2Ptr(table, table->bucket_ref));
165 uint64_t HTObjcnt(OPHashTable* table)
167 return table->objcnt;
170 uint64_t HTCapacity(OPHashTable* table)
172 return HTCapacityInternal(table->capacity_clz, table->capacity_ms4b);
175 size_t HTKeysize(OPHashTable* table)
177 return table->keysize;
180 size_t HTValsize(OPHashTable* table)
182 return table->valsize;
185 uint64_t HTCapacityInternal(uint8_t capacity_clz, uint8_t capacity_ms4b)
187 return (1UL << (64 - capacity_clz - 4)) * capacity_ms4b;
190 static inline uint64_t
191 quadratic_exact(uint64_t hashed_key,
int probe)
193 return hashed_key + probe * (probe + 1);
196 static inline uint64_t
197 quadratic_partial(uint64_t probing_key,
int probe)
199 return probing_key + probe * 2;
209 static inline uint64_t
210 fast_mod_scale(uint64_t probed_hash, uint64_t mask, uint64_t scale)
212 return (probed_hash & mask) * scale >> 4;
215 static inline uintptr_t
216 hash_with_probe(OPHashTable* table, uint64_t key,
int probe)
218 uint64_t mask = (1ULL << (64 - table->capacity_clz)) - 1;
231 uint64_t probed_hash = quadratic_exact(key, probe);
240 return fast_mod_scale(probed_hash, mask, table->capacity_ms4b);
244 findprobe(OPHashTable* table,
OPHash hasher, uintptr_t idx)
246 const size_t keysize = table->keysize;
247 const size_t valsize = table->valsize;
248 const size_t bucket_size = keysize + valsize + 1;
249 uint8_t* buckets =
OPRef2Ptr(table, table->bucket_ref);
250 uint64_t hashed_key, probing_key, probing_idx;
251 uint64_t mask = (1ULL << (64 - table->capacity_clz)) - 1;
253 hashed_key = hasher(&buckets[idx*bucket_size + 1], keysize);
254 probing_key = hashed_key;
256 for (
int i = 0; i <= table->longest_probes; i++)
258 probing_key = quadratic_partial(probing_key, i);
259 probing_idx = fast_mod_scale(probing_key, mask, table->capacity_ms4b);
260 if (probing_idx == idx)
263 OP_LOG_ERROR(logger,
"Didn't find any match probe!\n");
268 IncreaseProbeStat(OPHashTable* table,
int probe)
271 table->longest_probes = probe > table->longest_probes ?
272 probe : table->longest_probes;
273 table->stats[probe]++;
276 static inline enum upsert_result_t
277 HTUpsertNewKey(OPHashTable* table,
OPHash hasher,
280 uint8_t** matched_bucket,
int* probe_state)
282 const size_t keysize = table->keysize;
283 const size_t valsize = table->valsize;
284 const size_t bucket_size = keysize + valsize + 1;
285 uint8_t* restrict buckets;
286 int probe, old_probe;
289 buckets =
OPRef2Ptr(table, table->bucket_ref);
294 idx = hash_with_probe(table, hashed_key, probe);
295 if (probe > PROBE_STATS_SIZE)
297 HTSizeUp(table, hasher);
299 buckets =
OPRef2Ptr(table, table->bucket_ref);
303 if (buckets[idx * bucket_size] == 0)
305 IncreaseProbeStat(table, probe);
306 *matched_bucket = &buckets[idx * bucket_size];
310 else if (buckets[idx * bucket_size] == 2)
312 for (
int p = probe+1; p <= table->longest_probes; p++)
314 _idx = hash_with_probe(table, hashed_key, p);
316 if (buckets[_idx * bucket_size] == 1)
318 if (memeq(key, &buckets[_idx * bucket_size + 1], bucket_size))
320 *matched_bucket = &buckets[_idx * bucket_size];
324 IncreaseProbeStat(table, probe);
325 *matched_bucket = &buckets[idx * bucket_size];
328 if (memeq(key, &buckets[idx * bucket_size + 1], keysize))
330 *matched_bucket = &buckets[idx * bucket_size];
333 old_probe = findprobe(table, hasher, idx);
334 if (probe > old_probe)
336 table->longest_probes = probe > table->longest_probes ?
337 probe : table->longest_probes;
338 table->stats[old_probe]--;
339 table->stats[probe]++;
340 *matched_bucket = &buckets[idx * bucket_size];
341 *probe_state = old_probe+1;
342 return UPSERT_PUSHDOWN;
349 HTUpsertPushDown(OPHashTable* table,
OPHash hasher,
350 uint8_t* bucket_cpy,
int probe, uint8_t* avoid_bucket,
353 const size_t keysize = table->keysize;
354 const size_t valsize = table->valsize;
355 const size_t bucket_size = keysize + valsize + 1;
356 uint8_t* restrict buckets;
358 uint8_t bucket_tmp[bucket_size];
360 uint64_t hashed_key, iter, capacity;
362 uintptr_t visited_idx[VISIT_IDX_CACHE];
366 hashed_key = hasher(&bucket_cpy[1], keysize);
367 buckets =
OPRef2Ptr(table, table->bucket_ref);
368 capacity = HTCapacity(table);
375 idx = hash_with_probe(table, hashed_key, probe);
379 HTSizeUp(table, hasher);
380 capacity = HTCapacity(table);
383 buckets =
OPRef2Ptr(table, table->bucket_ref);
387 if (&buckets[idx * bucket_size] == avoid_bucket)
394 if (visit < VISIT_IDX_CACHE)
396 for (
int i = 0; i < visit; i++)
398 if (idx == visited_idx[i])
407 for (
int i = visit + 1; i < visit + VISIT_IDX_CACHE; i++)
409 if (idx == visited_idx[i % VISIT_IDX_CACHE])
417 visited_idx[visit % VISIT_IDX_CACHE] = idx;
421 if (buckets[idx * bucket_size] != 1)
423 IncreaseProbeStat(table, probe);
424 memcpy(&buckets[idx * bucket_size], bucket_cpy, bucket_size);
428 old_probe = findprobe(table, hasher, idx);
429 if (probe > old_probe)
431 table->longest_probes = probe > table->longest_probes ?
432 probe : table->longest_probes;
433 table->stats[old_probe]--;
434 table->stats[probe]++;
435 memcpy(bucket_tmp, &buckets[idx * bucket_size], bucket_size);
436 memcpy(&buckets[idx * bucket_size], bucket_cpy, bucket_size);
437 memcpy(bucket_cpy, bucket_tmp, bucket_size);
438 probe = old_probe + 1;
439 hashed_key = hasher(&bucket_cpy[1], keysize);
447 HTSizeUp(OPHashTable* table,
OPHash hasher)
449 const size_t keysize = table->keysize;
450 const size_t valsize = table->valsize;
451 const size_t bucket_size = keysize + valsize + 1;
452 const size_t large_data_threshold = table->large_data_threshold;
453 uint8_t* old_buckets;
454 uint8_t* new_buckets;
455 uint8_t new_capacity_ms4b, new_capacity_clz;
456 uint64_t old_capacity, new_capacity;
459 old_capacity = HTCapacity(table);
460 old_buckets =
OPRef2Ptr(table, table->bucket_ref);
462 if (old_capacity * bucket_size >= large_data_threshold)
465 switch(table->capacity_ms4b)
468 new_capacity_ms4b = 10;
469 new_capacity_clz = table->capacity_clz;
473 new_capacity_ms4b = 12;
474 new_capacity_clz = table->capacity_clz;
478 new_capacity_ms4b = 14;
479 new_capacity_clz = table->capacity_clz;
483 new_capacity_ms4b = 8;
484 new_capacity_clz = table->capacity_clz - 1;
487 new_capacity_ms4b = 10;
488 new_capacity_clz = table->capacity_clz - 1;
490 default: op_assert(
false,
"Unknown capacity_ms4b %d\n",
491 table->capacity_ms4b);
496 new_capacity_ms4b = 8;
497 new_capacity_clz = table->capacity_ms4b == 8 ?
498 table->capacity_clz - 1 : table->capacity_clz - 2;
500 new_capacity = HTCapacityInternal(new_capacity_clz, new_capacity_ms4b);
501 OP_LOG_INFO(logger,
"Resize from %" PRIu64
" to %" PRIu64,
502 old_capacity, new_capacity);
504 new_buckets = OPCalloc(ObtainOPHeap(table), 1, bucket_size * new_capacity);
507 OP_LOG_ERROR(logger,
"Cannot obtain new bucket for size %" PRIu64,
513 table->objcnt_high = new_capacity * 8 / 10;
514 table->objcnt_low = new_capacity * 2 / 10;
515 table->capacity_clz = new_capacity_clz;
516 table->capacity_ms4b = new_capacity_ms4b;
517 table->longest_probes = 0;
518 memset(table->stats, 0x00,
sizeof(uint32_t) * PROBE_STATS_SIZE);
519 table->bucket_ref =
OPPtr2Ref(new_buckets);
521 for (uint64_t idx = 0; idx < old_capacity; idx++)
523 if (old_buckets[idx*bucket_size] == 1)
525 HTUpsertPushDown(table, hasher, &old_buckets[idx * bucket_size],
529 OPDealloc(old_buckets);
534 HTSizeDown(OPHashTable* table,
OPHash hasher)
536 const size_t keysize = table->keysize;
537 const size_t valsize = table->valsize;
538 const size_t bucket_size = keysize + valsize + 1;
539 uint8_t* old_buckets;
540 uint8_t* new_buckets;
541 uint8_t new_capacity_ms4b, new_capacity_clz;
542 uint64_t old_capacity, new_capacity;
545 old_capacity = HTCapacity(table);
546 old_buckets =
OPRef2Ptr(table, table->bucket_ref);
547 op_assert(old_capacity > 16,
548 "Can not resize smaller than 16, but got old_capacity %" 549 PRIu64
"\n", old_capacity);
551 switch(table->capacity_ms4b)
557 new_capacity_ms4b = 8;
558 new_capacity_clz = table->capacity_clz + 1;
564 new_capacity_ms4b = 12;
565 new_capacity_clz = table->capacity_clz + 1;
567 default: op_assert(
false,
"Unknown capacity_ms4b %d\n",
568 table->capacity_ms4b);
571 new_capacity = HTCapacityInternal(new_capacity_clz, new_capacity_ms4b);
572 OP_LOG_INFO(logger,
"Resize from %" PRIu64
" to %" PRIu64,
573 old_capacity, new_capacity);
574 new_buckets = OPCalloc(ObtainOPHeap(table), 1, bucket_size * new_capacity);
577 OP_LOG_ERROR(logger,
"Cannot obtain new bucket for size %" PRIu64,
583 table->objcnt_high = new_capacity * 8 / 10;
584 table->objcnt_low = new_capacity * 2 / 10;
585 table->capacity_clz = new_capacity_clz;
586 table->capacity_ms4b = new_capacity_ms4b;
587 table->longest_probes = 0;
588 memset(table->stats, 0x00,
sizeof(uint32_t) * PROBE_STATS_SIZE);
589 table->bucket_ref =
OPPtr2Ref(new_buckets);
591 for (uint64_t idx = 0; idx < old_capacity; idx++)
593 if (old_buckets[idx*bucket_size] == 1)
595 HTUpsertPushDown(table, hasher, &old_buckets[idx * bucket_size],
599 OPDealloc(old_buckets);
604 bool HTPreHashInsertCustom(OPHashTable* table,
OPHash hasher,
605 uint64_t hashed_key,
void* key,
void* val)
607 const size_t keysize = table->keysize;
608 const size_t valsize = table->valsize;
609 const size_t bucket_size = keysize + valsize + 1;
610 enum upsert_result_t upsert_result;
611 uint8_t* matched_bucket;
613 uint8_t bucket_cpy[bucket_size];
616 if (table->objcnt > table->objcnt_high)
618 if(!HTSizeUp(table, hasher))
622 upsert_result = HTUpsertNewKey(table, hasher, hashed_key, key,
623 &matched_bucket, &probe);
625 switch (upsert_result)
629 memcpy(&matched_bucket[1], key, keysize);
631 memcpy(&matched_bucket[1 + keysize], val, valsize);
633 case UPSERT_PUSHDOWN:
634 memcpy(bucket_cpy, matched_bucket, bucket_size);
635 memcpy(&matched_bucket[1], key, keysize);
636 memcpy(&matched_bucket[1 + keysize], val, valsize);
637 HTUpsertPushDown(table, hasher, bucket_cpy, probe,
638 matched_bucket, &resized);
643 bool HTInsertCustom(OPHashTable* table,
OPHash hasher,
void* key,
void* val)
646 hashed_key = hasher(key, table->keysize);
647 return HTPreHashInsertCustom(table, hasher, hashed_key, key, val);
650 bool HTUpsertCustom(OPHashTable* table,
OPHash hasher,
651 void* key,
void** val_ref,
bool* is_duplicate)
653 const size_t keysize = table->keysize;
654 const size_t valsize = table->valsize;
655 const size_t bucket_size = keysize + valsize + 1;
656 enum upsert_result_t upsert_result;
658 uint8_t* matched_bucket;
660 uint8_t bucket_cpy[bucket_size];
663 if (table->objcnt > table->objcnt_high)
665 if (!HTSizeUp(table, hasher))
669 hashed_key = hasher(key, keysize);
670 upsert_result = HTUpsertNewKey(table, hasher, hashed_key, key,
671 &matched_bucket, &probe);
672 *val_ref = &matched_bucket[keysize + 1];
673 switch (upsert_result)
676 *is_duplicate =
false;
678 memcpy(&matched_bucket[1], key, keysize);
681 *is_duplicate =
true;
683 case UPSERT_PUSHDOWN:
684 *is_duplicate =
false;
685 memcpy(bucket_cpy, matched_bucket, bucket_size);
686 memcpy(&matched_bucket[1], key, keysize);
687 HTUpsertPushDown(table, hasher, bucket_cpy, probe,
688 matched_bucket, &resized);
691 *val_ref = HTGetCustom(table, hasher, key);
698 HTPreHashSearchIdx(OPHashTable* table, uint64_t hashed_key,
699 void* key, uintptr_t* idx)
701 const size_t keysize = table->keysize;
702 const size_t valsize = table->valsize;
703 const size_t bucket_size = keysize + valsize + 1;
704 uint8_t* buckets =
OPRef2Ptr(table, table->bucket_ref);
706 uint64_t mask = (1ULL << (64 - table->capacity_clz)) - 1;
707 uint64_t probing_key;
709 probing_key = hashed_key;
710 *idx = fast_mod_scale(probing_key, mask, table->capacity_ms4b);
711 probing_key = quadratic_partial(probing_key, 1);
712 idx_next = fast_mod_scale(probing_key, mask, table->capacity_ms4b);
713 for (
int probe = 2; probe <= table->longest_probes+2; probe++)
715 __builtin_prefetch(&buckets[idx_next * bucket_size], 0, 0);
716 if (buckets[*idx * bucket_size] == 0)
718 if (buckets[*idx * bucket_size] == 2)
720 if (memeq(key, &buckets[*idx*bucket_size + 1], keysize))
724 probing_key = quadratic_partial(probing_key, probe);
725 idx_next = fast_mod_scale(probing_key, mask, table->capacity_ms4b);
731 HTSearchIdx(OPHashTable* table,
OPHash hasher,
void* key, uintptr_t* idx)
733 const size_t keysize = table->keysize;
735 hashed_key = hasher(key, keysize);
736 return HTPreHashSearchIdx(table, hashed_key, key, idx);
739 void* HTGetCustom(OPHashTable* table,
OPHash hasher,
void* key)
741 const size_t keysize = table->keysize;
742 const size_t valsize = table->valsize;
743 const size_t bucket_size = keysize + valsize + 1;
744 uint8_t* buckets =
OPRef2Ptr(table, table->bucket_ref);
746 if (HTSearchIdx(table, hasher, key, &idx))
748 return &buckets[idx*bucket_size + keysize + 1];
753 int HTGetProbeCustom(OPHashTable* table,
OPHash hasher,
void* key)
756 if (HTSearchIdx(table, hasher, key, &idx))
758 return findprobe(table, hasher, idx);
764 HTPreHashDeleteCustom(OPHashTable* table,
OPHash hasher,
765 uint64_t hashed_key,
void* key)
772 const size_t keysize = table->keysize;
773 const size_t valsize = table->valsize;
774 const size_t bucket_size = keysize + valsize + 1;
775 uint8_t* restrict buckets;
779 if (table->objcnt < table->objcnt_low &&
782 if (!HTSizeDown(table, hasher))
786 if (!HTPreHashSearchIdx(table, hashed_key, key, &idx))
789 buckets =
OPRef2Ptr(table, table->bucket_ref);
792 record_probe = findprobe(table, hasher, idx);
793 if (record_probe < PROBE_STATS_SIZE)
794 table->stats[record_probe]--;
796 OP_LOG_WARN(logger,
"Large probe: %d\n", record_probe);
798 if (record_probe == table->longest_probes &&
799 table->stats[record_probe] == 0)
801 for (
int i = table->longest_probes; i >= 0; i--)
805 table->longest_probes = i;
817 buckets[idx * bucket_size] = 2;
818 return &buckets[idx * bucket_size + 1 + keysize];
821 void* HTDeleteCustom(OPHashTable* table,
OPHash hasher,
void* key)
824 hashed_key = hasher(key, table->keysize);
825 return HTPreHashDeleteCustom(table, hasher, hashed_key, key);
828 void HTIterate(OPHashTable* table,
OPHashIterator iterator,
void* context)
830 const size_t keysize = table->keysize;
831 const size_t valsize = table->valsize;
832 const size_t bucket_size = keysize + valsize + 1;
833 uint8_t* buckets =
OPRef2Ptr(table, table->bucket_ref);
834 uint64_t capacity = HTCapacity(table);
836 for (uint64_t idx = 0; idx < capacity; idx++)
838 if (buckets[idx*bucket_size] == 1)
840 iterator(&buckets[idx*bucket_size + 1],
841 &buckets[idx*bucket_size + 1 + keysize],
842 keysize, valsize, context);
847 void HTPrintStat(OPHashTable* table)
849 for (
int i = 0; i < PROBE_STATS_SIZE; i++)
851 printf(
"probe %02d: %d\n", i, table->stats[i]);
854 uint32_t HTMaxProbe(OPHashTable* table)
856 return table->longest_probes;
859 uint32_t HTProbeStat(OPHashTable* table, uint32_t idx)
861 if (idx < PROBE_STATS_SIZE)
862 return table->stats[idx];
866 HTFunnel* HTFunnelNewCustom(OPHashTable* table,
OPHash hasher,
868 size_t slotsize,
size_t partition_size)
874 funnel = malloc(
sizeof(HTFunnel));
875 bucketsize = table->keysize + table->valsize + 1;
876 funnel->table = table;
877 funnel->hasher = hasher;
878 funnel->callback = callback;
879 funnel->slotsize = slotsize;
880 funnel->partition_clz = __builtin_clzll(partition_size / bucketsize);
881 funnel->capacity_clz = 0;
882 funnel->tubes = NULL;
883 funnel->flowheads = NULL;
884 if (funnel->partition_clz > table->capacity_clz)
886 funnel->capacity_clz = table->capacity_clz;
887 tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
888 funnel->tubes = malloc(tube_num * slotsize);
889 funnel->flowheads = malloc(tube_num *
sizeof(ptrdiff_t*));
890 for (
int i = 0; i < tube_num; i++)
891 funnel->flowheads[i] = i * funnel->slotsize;
896 void HTFunnelDestroy(HTFunnel* funnel)
903 if (funnel->flowheads)
904 free(funnel->flowheads);
908 void HTFunnelPreHashInsert(HTFunnel* funnel,
910 void* key,
void* value)
912 const size_t keysize = funnel->table->keysize;
913 const size_t valsize = funnel->table->valsize;
914 const size_t bucket_size = keysize + valsize + 1;
915 const size_t trip_bundle_size =
sizeof(hashed_key) + keysize + valsize;
918 int tube_num, old_tube_num, row_idx, probe;
920 ptrdiff_t flowhead, flowbase, tubeidx;
921 ptrdiff_t* old_flowheads;
922 uint8_t *old_tubes, *tube_key, *tube_val, *matched_bucket;
923 uint64_t* tube_hashed_key;
925 enum upsert_result_t upsert_result;
926 uint8_t bucket_cpy[bucket_size];
928 table = funnel->table;
930 if (funnel->capacity_clz != table->capacity_clz)
932 if (funnel->capacity_clz == 0)
937 if (funnel->partition_clz > table->capacity_clz)
939 funnel->capacity_clz = table->capacity_clz;
940 tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
941 funnel->tubes = malloc(tube_num * funnel->slotsize);
942 funnel->flowheads = malloc(tube_num *
sizeof(ptrdiff_t*));
943 for (row_idx = 0; row_idx < tube_num; row_idx++)
944 funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
948 HTPreHashInsertCustom(table, funnel->hasher,
949 hashed_key, key, value);
955 old_tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
956 old_tubes = funnel->tubes;
957 old_flowheads = funnel->flowheads;
959 funnel->capacity_clz = table->capacity_clz;
960 tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
961 funnel->tubes = malloc(tube_num * funnel->slotsize);
962 funnel->flowheads = malloc(tube_num *
sizeof(ptrdiff_t*));
963 for (row_idx = 0; row_idx < tube_num; row_idx++)
964 funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
966 for (row_idx = 0; row_idx < old_tube_num; row_idx++)
968 tubeidx = row_idx * funnel->slotsize;
969 flowhead = old_flowheads[row_idx];
970 while (tubeidx < flowhead)
972 tube_hashed_key = (uint64_t*)&old_tubes[tubeidx];
973 tubeidx +=
sizeof(uint64_t);
974 tube_key = &old_tubes[tubeidx];
976 tube_val = &old_tubes[tubeidx];
978 HTFunnelPreHashInsert(funnel,
988 mask = (1ULL << (64 - funnel->capacity_clz)) - 1;
989 row_idx = (hashed_key & mask) >> funnel->partition_clz;
990 flowhead = funnel->flowheads[row_idx];
991 flowbase = row_idx * funnel->slotsize;
994 if (trip_bundle_size + flowhead - flowbase > funnel->slotsize)
997 while (tubeidx < flowhead)
999 tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1000 tubeidx +=
sizeof(uint64_t);
1001 tube_key = &funnel->tubes[tubeidx];
1003 tube_val = &funnel->tubes[tubeidx];
1006 upsert_result = HTUpsertNewKey(table, funnel->hasher,
1011 switch (upsert_result)
1014 *matched_bucket = 1;
1015 memcpy(&matched_bucket[1], tube_key, keysize);
1017 memcpy(&matched_bucket[1 + keysize], tube_val, valsize);
1019 case UPSERT_PUSHDOWN:
1020 memcpy(bucket_cpy, matched_bucket, bucket_size);
1021 memcpy(&matched_bucket[1], tube_key, keysize);
1022 memcpy(&matched_bucket[1 + keysize], tube_val, valsize);
1023 HTUpsertPushDown(table, funnel->hasher, bucket_cpy, probe,
1024 matched_bucket, &resized);
1027 funnel->flowheads[row_idx] = flowbase;
1028 flowhead = flowbase;
1032 memcpy(&funnel->tubes[tubeidx], &hashed_key,
sizeof(uint64_t));
1033 tubeidx +=
sizeof(hashed_key);
1034 memcpy(&funnel->tubes[tubeidx], key, keysize);
1036 memcpy(&funnel->tubes[tubeidx], value, valsize);
1038 funnel->flowheads[row_idx] = tubeidx;
1041 void HTFunnelInsert(HTFunnel* funnel,
1042 void* key,
void* value)
1044 uint64_t hashed_key;
1045 hashed_key = funnel->hasher(key, funnel->table->keysize);
1046 HTFunnelPreHashInsert(funnel, hashed_key, key, value);
1049 void HTFunnelInsertFlush(HTFunnel* funnel)
1051 const size_t keysize = funnel->table->keysize;
1052 const size_t valsize = funnel->table->valsize;
1053 const size_t bucket_size = keysize + valsize + 1;
1056 int tube_num, row_idx, probe;
1057 ptrdiff_t flowhead, tubeidx;
1058 uint8_t *tube_key, *tube_val, *matched_bucket;
1059 uint64_t* tube_hashed_key;
1061 enum upsert_result_t upsert_result;
1062 uint8_t bucket_cpy[bucket_size];
1064 if (!funnel->tubes || !funnel->table)
1067 table = funnel->table;
1068 tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1070 for (row_idx = 0; row_idx < tube_num; row_idx++)
1072 tubeidx = row_idx * funnel->slotsize;
1073 flowhead = funnel->flowheads[row_idx];
1074 while (tubeidx < flowhead)
1076 tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1077 tubeidx +=
sizeof(uint64_t);
1078 tube_key = &funnel->tubes[tubeidx];
1080 tube_val = &funnel->tubes[tubeidx];
1083 upsert_result = HTUpsertNewKey(table, funnel->hasher,
1088 switch (upsert_result)
1091 *matched_bucket = 1;
1092 memcpy(&matched_bucket[1], tube_key, keysize);
1094 memcpy(&matched_bucket[1 + keysize], tube_val, valsize);
1096 case UPSERT_PUSHDOWN:
1097 memcpy(bucket_cpy, matched_bucket, bucket_size);
1098 memcpy(&matched_bucket[1], tube_key, keysize);
1099 memcpy(&matched_bucket[1 + keysize], tube_val, valsize);
1100 HTUpsertPushDown(table, funnel->hasher, bucket_cpy, probe,
1101 matched_bucket, &resized);
1104 funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
1108 void HTFunnelPreHashUpsert(HTFunnel* funnel,
1109 uint64_t hashed_key,
1110 void* key,
void* value,
1111 void* context,
size_t ctxsize_st)
1113 const size_t keysize = funnel->table->keysize;
1114 const size_t valsize = funnel->table->valsize;
1115 const size_t bucket_size = keysize + valsize + 1;
1117 size_t trip_bundle_size;
1119 OPFunnelUpsertCB upsertcb;
1120 int tube_num, old_tube_num, row_idx, probe;
1122 ptrdiff_t flowhead, flowbase, tubeidx;
1123 ptrdiff_t* old_flowheads;
1124 uint8_t *old_tubes, *tube_key, *tube_val, *tube_ctx, *matched_bucket;
1125 uint32_t* tube_ctxsize;
1127 uint64_t* tube_hashed_key;
1129 enum upsert_result_t upsert_result;
1130 uint8_t bucket_cpy[bucket_size];
1132 table = funnel->table;
1133 upsertcb = funnel->callback.upsertcb;
1134 ctxsize = (uint32_t)ctxsize_st;
1136 if (funnel->capacity_clz != table->capacity_clz)
1138 if (funnel->capacity_clz == 0)
1143 if (funnel->partition_clz > table->capacity_clz)
1145 funnel->capacity_clz = table->capacity_clz;
1146 tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1147 funnel->tubes = malloc(tube_num * funnel->slotsize);
1148 funnel->flowheads = malloc(tube_num *
sizeof(ptrdiff_t*));
1149 for (row_idx = 0; row_idx < tube_num; row_idx++)
1150 funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
1154 HTInsertCustom(table, funnel->hasher, key, value);
1155 upsert_result = HTUpsertNewKey(table, funnel->hasher,
1160 switch (upsert_result)
1163 *matched_bucket = 1;
1164 memcpy(&matched_bucket[1], key, keysize);
1166 upsertcb(&matched_bucket[1],
1167 &matched_bucket[1 + keysize],
1170 keysize, valsize, ctxsize,
1175 upsertcb(&matched_bucket[1],
1176 &matched_bucket[1 + keysize],
1179 keysize, valsize, ctxsize,
1182 case UPSERT_PUSHDOWN:
1183 memcpy(bucket_cpy, matched_bucket, bucket_size);
1184 memcpy(&matched_bucket[1], key, keysize);
1185 HTUpsertPushDown(table, funnel->hasher, bucket_cpy, probe,
1186 matched_bucket, &resized);
1193 matched_bucket = HTGetCustom(table, funnel->hasher, key);
1195 matched_bucket -= 1 + keysize;
1198 upsertcb(&matched_bucket[1],
1199 &matched_bucket[1 + keysize],
1202 keysize, valsize, ctxsize,
1210 old_tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1211 old_tubes = funnel->tubes;
1212 old_flowheads = funnel->flowheads;
1214 funnel->capacity_clz = table->capacity_clz;
1215 tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1216 funnel->tubes = malloc(tube_num * funnel->slotsize);
1217 funnel->flowheads = malloc(tube_num *
sizeof(ptrdiff_t*));
1218 for (row_idx = 0; row_idx < tube_num; row_idx++)
1219 funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
1221 for (row_idx = 0; row_idx < old_tube_num; row_idx++)
1223 tubeidx = row_idx * funnel->slotsize;
1224 flowhead = old_flowheads[row_idx];
1225 while (tubeidx < flowhead)
1227 tube_hashed_key = (uint64_t*)&old_tubes[tubeidx];
1228 tubeidx +=
sizeof(uint64_t);
1229 tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1230 tubeidx +=
sizeof(uint32_t);
1231 tube_key = &old_tubes[tubeidx];
1233 tube_val = &old_tubes[tubeidx];
1235 tube_ctx = &funnel->tubes[tubeidx];
1236 tubeidx += *tube_ctxsize;
1237 upsert_result = HTUpsertNewKey(table, funnel->hasher,
1242 switch (upsert_result)
1245 *matched_bucket = 1;
1246 memcpy(&matched_bucket[1], tube_key, keysize);
1248 upsertcb(&matched_bucket[1],
1249 &matched_bucket[1 + keysize],
1252 keysize, valsize, *tube_ctxsize,
1257 upsertcb(&matched_bucket[1],
1258 &matched_bucket[1 + keysize],
1261 keysize, valsize, *tube_ctxsize,
1264 case UPSERT_PUSHDOWN:
1265 memcpy(bucket_cpy, matched_bucket, bucket_size);
1266 memcpy(&matched_bucket[1], tube_key, keysize);
1267 HTUpsertPushDown(table, funnel->hasher, bucket_cpy, probe,
1268 matched_bucket, &resized);
1272 matched_bucket = HTGetCustom(table, funnel->hasher,
1275 matched_bucket -= 1 + keysize;
1278 upsertcb(&matched_bucket[1],
1279 &matched_bucket[1 + keysize],
1282 keysize, valsize, *tube_ctxsize,
1287 free(old_flowheads);
1292 mask = (1ULL << (64 - funnel->capacity_clz)) - 1;
1293 row_idx = (hashed_key & mask) >> funnel->partition_clz;
1294 flowhead = funnel->flowheads[row_idx];
1295 flowbase = row_idx * funnel->slotsize;
1296 trip_bundle_size =
sizeof(hashed_key) +
sizeof(uint32_t)
1297 + keysize + valsize + ctxsize;
1298 op_assert(trip_bundle_size < funnel->slotsize);
1301 if (trip_bundle_size + flowhead - flowbase > funnel->slotsize)
1304 while (tubeidx < flowhead)
1306 tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1307 tubeidx +=
sizeof(uint64_t);
1308 tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1309 tubeidx +=
sizeof(uint32_t);
1310 tube_key = &funnel->tubes[tubeidx];
1312 tube_val = &funnel->tubes[tubeidx];
1314 tube_ctx = &funnel->tubes[tubeidx];
1315 tubeidx += *tube_ctxsize;
1316 upsert_result = HTUpsertNewKey(table, funnel->hasher,
1321 switch (upsert_result)
1324 *matched_bucket = 1;
1325 memcpy(&matched_bucket[1], tube_key, keysize);
1327 upsertcb(&matched_bucket[1],
1328 &matched_bucket[1 + keysize],
1331 keysize, valsize, *tube_ctxsize,
1336 upsertcb(&matched_bucket[1],
1337 &matched_bucket[1 + keysize],
1340 keysize, valsize, *tube_ctxsize,
1343 case UPSERT_PUSHDOWN:
1344 memcpy(bucket_cpy, matched_bucket, bucket_size);
1345 memcpy(&matched_bucket[1], tube_key, keysize);
1346 HTUpsertPushDown(table, funnel->hasher, bucket_cpy, probe,
1347 matched_bucket, &resized);
1353 matched_bucket = HTGetCustom(table, funnel->hasher, tube_key);
1355 matched_bucket -= 1 + keysize;
1358 upsertcb(&matched_bucket[1],
1359 &matched_bucket[1 + keysize],
1362 keysize, valsize, *tube_ctxsize,
1366 funnel->flowheads[row_idx] = flowbase;
1367 flowhead = flowbase;
1371 memcpy(&funnel->tubes[tubeidx], &hashed_key,
sizeof(uint64_t));
1372 tubeidx +=
sizeof(hashed_key);
1373 memcpy(&funnel->tubes[tubeidx], &ctxsize,
sizeof(uint32_t));
1374 tubeidx +=
sizeof(uint32_t);
1375 memcpy(&funnel->tubes[tubeidx], key, keysize);
1377 memcpy(&funnel->tubes[tubeidx], value, valsize);
1379 memcpy(&funnel->tubes[tubeidx], context, ctxsize);
1381 funnel->flowheads[row_idx] = tubeidx;
1384 void HTFunnelUpsert(HTFunnel* funnel,
1385 void* key,
void* value,
void* context,
size_t ctxsize)
1387 uint64_t hashed_key;
1388 hashed_key = funnel->hasher(key, funnel->table->keysize);
1389 HTFunnelPreHashUpsert(funnel, hashed_key, key, value, context, ctxsize);
1392 void HTFunnelUpsertFlush(HTFunnel* funnel)
1394 const size_t keysize = funnel->table->keysize;
1395 const size_t valsize = funnel->table->valsize;
1396 const size_t bucket_size = keysize + valsize + 1;
1399 OPFunnelUpsertCB upsertcb;
1400 int tube_num, row_idx, probe;
1401 ptrdiff_t flowhead, tubeidx;
1402 uint8_t *tube_key, *tube_val, *tube_ctx, *matched_bucket;
1403 uint32_t* tube_ctxsize;
1404 uint64_t* tube_hashed_key;
1406 enum upsert_result_t upsert_result;
1407 uint8_t bucket_cpy[bucket_size];
1409 if (!funnel->tubes || !funnel->table)
1412 table = funnel->table;
1413 upsertcb = funnel->callback.upsertcb;
1414 tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1416 for (row_idx = 0; row_idx < tube_num; row_idx++)
1418 tubeidx = row_idx * funnel->slotsize;
1419 flowhead = funnel->flowheads[row_idx];
1420 while (tubeidx < flowhead)
1422 tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1423 tubeidx +=
sizeof(uint64_t);
1424 tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1425 tubeidx +=
sizeof(uint32_t);
1426 tube_key = &funnel->tubes[tubeidx];
1428 tube_val = &funnel->tubes[tubeidx];
1430 tube_ctx = &funnel->tubes[tubeidx];
1431 tubeidx += *tube_ctxsize;
1433 upsert_result = HTUpsertNewKey(table, funnel->hasher,
1438 switch (upsert_result)
1441 *matched_bucket = 1;
1442 memcpy(&matched_bucket[1], tube_key, keysize);
1444 upsertcb(&matched_bucket[1],
1445 &matched_bucket[1 + keysize],
1448 keysize, valsize, *tube_ctxsize,
1453 upsertcb(&matched_bucket[1],
1454 &matched_bucket[1 + keysize],
1457 keysize, valsize, *tube_ctxsize,
1460 case UPSERT_PUSHDOWN:
1461 memcpy(bucket_cpy, matched_bucket, bucket_size);
1462 memcpy(&matched_bucket[1], tube_key, keysize);
1463 HTUpsertPushDown(table, funnel->hasher, bucket_cpy, probe,
1464 matched_bucket, &resized);
1470 matched_bucket = HTGetCustom(table, funnel->hasher, tube_key);
1472 matched_bucket -= 1 + keysize;
1475 upsertcb(&matched_bucket[1],
1476 &matched_bucket[1 + keysize],
1479 keysize, valsize, *tube_ctxsize,
1483 funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
1487 void HTFunnelGet(HTFunnel* funnel,
void* key,
void* context,
size_t ctxsize)
1489 uint64_t hashed_key;
1490 hashed_key = funnel->hasher(key, funnel->table->keysize);
1491 HTFunnelPreHashGet(funnel, hashed_key, key, context, ctxsize);
1494 void HTFunnelPreHashGet(HTFunnel* funnel, uint64_t hashed_key,
1495 void* key,
void* context,
size_t ctxsize_st)
1497 const size_t keysize = funnel->table->keysize;
1498 const size_t valsize = funnel->table->valsize;
1499 const size_t bucket_size = keysize + valsize + 1;
1502 OPFunnelGetCB getcb;
1505 size_t trip_bundle_size;
1506 ptrdiff_t flowhead, flowbase, tubeidx;
1507 uintptr_t bucket_idx;
1508 uint8_t *tube_key, *tube_ctx;
1509 uint32_t* tube_ctxsize;
1511 uint64_t* tube_hashed_key;
1512 uint8_t* restrict buckets;
1514 table = funnel->table;
1515 buckets =
OPRef2Ptr(table, table->bucket_ref);
1516 getcb = funnel->callback.getcb;
1521 if (HTPreHashSearchIdx(table,
1527 getcb(&buckets[bucket_idx * bucket_size + 1],
1528 &buckets[bucket_idx * bucket_size + 1 + keysize],
1529 context, keysize, valsize, ctxsize_st);
1534 getcb(key, NULL, context, keysize, valsize, ctxsize_st);
1539 ctxsize = (uint32_t)ctxsize_st;
1540 mask = (1ULL << (64 - funnel->capacity_clz)) - 1;
1541 row_idx = (hashed_key & mask) >> funnel->partition_clz;
1542 flowhead = funnel->flowheads[row_idx];
1543 flowbase = row_idx * funnel->slotsize;
1544 trip_bundle_size =
sizeof(hashed_key) +
sizeof(uint32_t) + keysize + ctxsize;
1547 if (trip_bundle_size + flowhead - flowbase > funnel->slotsize)
1550 while (tubeidx < flowhead)
1552 tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1553 tubeidx +=
sizeof(uint64_t);
1554 tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1555 tubeidx +=
sizeof(uint32_t);
1556 tube_key = &funnel->tubes[tubeidx];
1558 tube_ctx = &funnel->tubes[tubeidx];
1559 tubeidx += *tube_ctxsize;
1560 if (HTPreHashSearchIdx(table,
1566 getcb(&buckets[bucket_idx * bucket_size + 1],
1567 &buckets[bucket_idx * bucket_size + 1 + keysize],
1568 tube_ctx, keysize, valsize, *tube_ctxsize);
1573 getcb(tube_key, NULL,
1574 tube_ctx, keysize, valsize, *tube_ctxsize);
1577 funnel->flowheads[row_idx] = flowbase;
1578 flowhead = flowbase;
1582 memcpy(&funnel->tubes[tubeidx], &hashed_key,
sizeof(uint64_t));
1583 tubeidx +=
sizeof(hashed_key);
1584 memcpy(&funnel->tubes[tubeidx], &ctxsize,
sizeof(uint32_t));
1585 tubeidx +=
sizeof(uint32_t);
1586 memcpy(&funnel->tubes[tubeidx], key, keysize);
1588 memcpy(&funnel->tubes[tubeidx], context, ctxsize);
1590 funnel->flowheads[row_idx] = tubeidx;
1593 void HTFunnelGetFlush(HTFunnel* funnel)
1595 const size_t keysize = funnel->table->keysize;
1596 const size_t valsize = funnel->table->valsize;
1597 const size_t bucket_size = keysize + valsize + 1;
1600 OPFunnelGetCB getcb;
1601 int tube_num, row_idx;
1602 ptrdiff_t flowhead, tubeidx;
1603 uintptr_t bucket_idx;
1604 uint8_t *tube_key, *tube_ctx;
1605 uint32_t* tube_ctxsize;
1606 uint64_t* tube_hashed_key;
1607 uint8_t* restrict buckets;
1609 if (!funnel->tubes || !funnel->table)
1612 table = funnel->table;
1613 buckets =
OPRef2Ptr(table, table->bucket_ref);
1614 getcb = funnel->callback.getcb;
1615 tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1617 for (row_idx = 0; row_idx < tube_num; row_idx++)
1619 tubeidx = row_idx * funnel->slotsize;
1620 flowhead = funnel->flowheads[row_idx];
1621 while (tubeidx < flowhead)
1623 tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1624 tubeidx +=
sizeof(uint64_t);
1625 tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1626 tubeidx +=
sizeof(uint32_t);
1627 tube_key = &funnel->tubes[tubeidx];
1629 tube_ctx = &funnel->tubes[tubeidx];
1630 tubeidx += *tube_ctxsize;
1631 if (HTPreHashSearchIdx(table,
1637 getcb(&buckets[bucket_idx * bucket_size + 1],
1638 &buckets[bucket_idx * bucket_size + 1 + keysize],
1639 tube_ctx, keysize, valsize, *tube_ctxsize);
1644 getcb(tube_key, NULL,
1645 tube_ctx, keysize, valsize, *tube_ctxsize);
1648 funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
1652 void HTFunnelDelete(HTFunnel* funnel,
void* key,
1653 void* context,
size_t ctxsize)
1655 uint64_t hashed_key;
1656 hashed_key = funnel->hasher(key, funnel->table->keysize);
1657 HTFunnelPreHashDelete(funnel, hashed_key, key, context, ctxsize);
1660 void HTFunnelPreHashDelete(HTFunnel* funnel, uint64_t hashed_key,
1661 void* key,
void* context,
size_t ctxsize_st)
1663 const size_t keysize = funnel->table->keysize;
1664 const size_t valsize = funnel->table->valsize;
1667 OPFunnelDeleteCB deletecb;
1670 size_t trip_bundle_size;
1671 ptrdiff_t flowhead, flowbase, tubeidx;
1672 uint8_t *tube_key, *tube_ctx, *deleted_key, *deleted_val;
1673 uint32_t* tube_ctxsize;
1675 uint64_t* tube_hashed_key;
1677 table = funnel->table;
1678 deletecb = funnel->callback.deletecb;
1683 deleted_val = HTPreHashDeleteCustom(table,
1688 deleted_key = deleted_val - keysize;
1690 deletecb(deleted_key, deleted_val,
1691 context, keysize, valsize, ctxsize_st);
1696 deletecb(key, NULL, context, keysize, valsize, ctxsize_st);
1701 ctxsize = (uint32_t)ctxsize_st;
1702 mask = (1ULL << (64 - funnel->capacity_clz)) - 1;
1703 row_idx = (hashed_key & mask) >> funnel->partition_clz;
1704 flowhead = funnel->flowheads[row_idx];
1705 flowbase = row_idx * funnel->slotsize;
1706 trip_bundle_size =
sizeof(hashed_key) +
sizeof(uint32_t) + keysize + ctxsize;
1709 if (trip_bundle_size + flowhead - flowbase > funnel->slotsize)
1712 while (tubeidx < flowhead)
1714 tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1715 tubeidx +=
sizeof(uint64_t);
1716 tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1717 tubeidx +=
sizeof(uint32_t);
1718 tube_key = &funnel->tubes[tubeidx];
1720 tube_ctx = &funnel->tubes[tubeidx];
1721 tubeidx += *tube_ctxsize;
1722 deleted_val = HTPreHashDeleteCustom(table,
1724 *tube_hashed_key, tube_key);
1727 deleted_key = deleted_val - keysize;
1729 deletecb(deleted_key, deleted_val,
1730 tube_ctx, keysize, valsize, *tube_ctxsize);
1735 deletecb(tube_key, NULL,
1736 tube_ctx, keysize, valsize, *tube_ctxsize);
1739 funnel->flowheads[row_idx] = flowbase;
1740 flowhead = flowbase;
1744 memcpy(&funnel->tubes[tubeidx], &hashed_key,
sizeof(uint64_t));
1745 tubeidx +=
sizeof(hashed_key);
1746 memcpy(&funnel->tubes[tubeidx], &ctxsize,
sizeof(uint32_t));
1747 tubeidx +=
sizeof(uint32_t);
1748 memcpy(&funnel->tubes[tubeidx], key, keysize);
1750 memcpy(&funnel->tubes[tubeidx], context, ctxsize);
1752 funnel->flowheads[row_idx] = tubeidx;
1755 void HTFunnelDeleteFlush(HTFunnel* funnel)
1757 const size_t keysize = funnel->table->keysize;
1758 const size_t valsize = funnel->table->valsize;
1761 OPFunnelDeleteCB deletecb;
1762 int tube_num, row_idx;
1763 ptrdiff_t flowhead, tubeidx;
1764 uint8_t *tube_key, *tube_ctx, *deleted_key, *deleted_val;
1765 uint32_t* tube_ctxsize;
1766 uint64_t* tube_hashed_key;
1768 if (!funnel->tubes || !funnel->table)
1771 table = funnel->table;
1772 deletecb = funnel->callback.deletecb;
1773 tube_num = 1 << (funnel->partition_clz - funnel->capacity_clz);
1775 for (row_idx = 0; row_idx < tube_num; row_idx++)
1777 tubeidx = row_idx * funnel->slotsize;
1778 flowhead = funnel->flowheads[row_idx];
1779 while (tubeidx < flowhead)
1781 tube_hashed_key = (uint64_t*)&funnel->tubes[tubeidx];
1782 tubeidx +=
sizeof(uint64_t);
1783 tube_ctxsize = (uint32_t*)&funnel->tubes[tubeidx];
1784 tubeidx +=
sizeof(uint32_t);
1785 tube_key = &funnel->tubes[tubeidx];
1787 tube_ctx = &funnel->tubes[tubeidx];
1788 tubeidx += *tube_ctxsize;
1789 deleted_val = HTPreHashDeleteCustom(table,
1791 *tube_hashed_key, tube_key);
1794 deleted_key = deleted_val - keysize;
1796 deletecb(deleted_key, deleted_val,
1797 tube_ctx, keysize, valsize, *tube_ctxsize);
1802 deletecb(tube_key, NULL,
1803 tube_ctx, keysize, valsize, *tube_ctxsize);
1806 funnel->flowheads[row_idx] = row_idx * funnel->slotsize;
Opaque object for memory allocation.
static opref_t OPPtr2Ref(void *addr)
Converts a pointer allocated in OPHeap to an opref_t.
A general hashmap/hashset/hashmultimap implemantation using robin hood hashing.
uintptr_t opref_t
The "pointer type" used within objects created by OPHeap.
static void * OPRef2Ptr(void *ptr_in_heap, opref_t ref)
Converts an opref_t reference to a regular pointer.
void(* OPHashIterator)(void *key, void *value, size_t keysize, size_t valsize, void *context)
HashTable iterator interface.
uint64_t(* OPHash)(void *key, size_t size)
Hash function interface.