首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >有效调整堆数

有效调整堆数
EN

Code Review用户
提问于 2022-03-08 16:32:49
回答 3查看 264关注 0票数 2

简介与上下文

我正在开发一个小型应用程序,它涉及将20 as文件作为操作之一进行排序。由于内存限制,这是通过将它们分解为~1GB块,在内存中排序,然后写回磁盘来完成的。然后,作为第二阶段,读取每个文件的头,选择最小/最大的文件并流到最后的输出文件。

这属于外部排序的领域。

这个CR将只关注其中的一小部分,当流到最终文件时,这部分内容涉及到“从哪个块中提取下一个元素”。结果表明,这一部分,特别是元素比较,是CPU约束和瓶颈。被排序的元素是std::array<std::byte, 20>,所以这是一个非常重要的比较,它可以编译成memcmp

编辑结果表明,通过一些非常简单的“大64位加载、字节交换和比较”,我们可以比memcmp快得多。

见下面的自我答案-编辑

因此,我们需要一个数据结构,它可以方便地选择下一个元素。通常大约有20个块,所以我的第一次尝试只是使用std::min_element,因为线性搜索对于少量元素来说是非常快的。当我发现比较是我使用std::priority_queue的CPU瓶颈时。这使比较减少了>2倍,因此这是一个很大的收获。

为了澄清,比较次数的减少,也使整个应用程序的速度提高了2倍,从20×1GB文件合并阶段的6分钟下降到3分钟。但是在3分钟时,CPU仍然是@ 100%,并且几乎全部在20字节数组的operator<中。

这个CR是关于我们是否还能做得更好,通过进一步减少比较的次数。

您不能在std::priority_queue中更改/更新元素,这正是我们需要做的。这是临界回路的算法。

代码语言:javascript
复制
  std::priority_queue<head, std::vector<head>, decltype(cmp)> heads(cmp);

  while (!heads.empty()) {
    const head& t = heads.top();
    sorted.write(t.value);
    std::size_t chunk_idx = t.idx;
    heads.pop();
    if (auto& chunk = chunks[chunk_idx]; chunk.current != chunk.end) {
      heads.push({*(chunk.current), chunk_idx});
      ++(chunk.current);
    }
  }

因此,调用queue.pop()queue.push()似乎效率很低,两者都需要在堆中移动元素。仅仅“修改”.top() (读过它并将其写入输出)、转换为新的值(实质上在该块的迭代器上调用std::next++ ),然后“重新堆”似乎更有效。

让我们躲在std::priority_queue的引擎盖下。使用c++ STL堆算法进行此操作的规范方法是:

代码语言:javascript
复制
  // Note I am using a max-heap, but the comparator is customisable below
  std::pop_heap(heap.begin(), heap.end(), comp);
  heap.pop_back();
  heap.push_back(newval);
  std::push_heap(heap.begin(), heap.end(), comp);

大多数堆实现在下面使用原语,称为“冒泡/筛选向上/向下”。我们真正需要的是bubble_down实现,并在已更改的top()元素上调用它。

c++标准不提供原语,但是实现并不困难,下面的代码中有2(一个迭代和一个递归)。

调查焦点

  1. 相对较小的堆(3个=> 100元素,但在下面的代码中很容易调整)
  2. 操作:只更新heap.top()元素
  3. 优化准则=再加热的比较次数

这是一个非常简单的案例。不存在“知道要更改的元素在堆中的位置”的问题--这是这种方法的典型关注点。所以我们肯定能做得更好对吧?

下面的代码研究了4种实现:

  1. STL方式如上(使用libstdc++-11)。这是基本情况。(请注意,我认为libc++在这方面没有那么复杂)。
  2. 递归的“气泡下降”
  3. 迭代式的“气泡下降”
  4. A libstd++ __adjust_heap__push_heap algo

我生成随机堆大小、内容和替换值,并迭代1M次。

发现

  1. STL是好的
  2. 递归和迭代的冒泡向下流几乎是相同的(因为编译器在-O3上“尾调用”优化了递归,而且,令人惊讶的是,即使在这种非常特殊的情况下,仍然有比STL更多的比较。所以仅仅使用堆原语并不能给你带来任何好处。
  3. 我们可以用.STL通过复制未发布函数的代码,“清理它们”(下划线等),并以一种专门的方式使用它们来解决这个有限的、专门的问题。收益在10-20%之间.一个适度的,但意义重大的收益?

典型结果:

代码语言:javascript
复制
method                              avg_cmp_cnt
std::heap / std::priority_queue     7.568285
bubble up recursively               8.031054
bubble up iteratively               8.047352
libstc++ __adjust_heap              6.327297

守则:

代码语言:javascript
复制
#include "fmt/core.h"
#include <algorithm>
#include <cassert>
#include <concepts>
#include <cstddef>
#include <cstdlib>
#include <execution>
#include <iostream>
#include <random>
#include <stdexcept>

template <std::unsigned_integral T>
T log2(T x) {
  T log = 0;
  while (x >>= 1U) ++log;
  return log;
}

template <typename T, typename Comp = std::less<>>
void print_heap(const std::vector<T>& heap, Comp comp = {}) {
  std::size_t levels = log2(heap.size()) + 1;
  unsigned    width  = 6 * (1U << (levels - 1U));
  std::cout << "\n\n";
  for (const auto& e: heap) std::cout << e << " ";
  std::cout << "\n";
  std::cout << fmt::format("is_heap = {:}\n\n", std::is_heap(heap.begin(), heap.end(), comp));
  if (heap.empty()) {
    std::cout << "<empty heap>\n";
    return;
  }
  unsigned idx  = 0;
  bool     done = false;
  for (unsigned l = 0; l != levels; ++l) {
    for (unsigned e = 0; e != 1U << l; ++e) {
      std::cout << fmt::format("{:^{}}", heap[idx], width);
      ++idx;
      if (idx == heap.size()) {
        done = true;
        break;
      }
    }
    width /= 2;
    std::cout << "\n\n";
    if (done) break;
  }
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_stl(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  std::pop_heap(heap.begin(), heap.end(), comp);
  heap.pop_back();
  heap.push_back(newval);
  std::push_heap(heap.begin(), heap.end(), comp);
}

template <typename T, typename Comp = std::less<>>
// NOLINTNEXTLINE recursion is tailcall eliminated by compiler
void bubble_down_recursively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
  const auto left  = 2 * i + 1;
  const auto right = 2 * i + 2;
  const auto n     = heap.size();

  using std::swap; // enable ADL

  if (left >= n) { // no children
    return;
  } else if (right >= n) { // left exists right does not.  NOLINT else after return
    if (comp(heap[i], heap[left])) {
      swap(heap[i], heap[left]);
      bubble_down_recursively(heap, left, comp);
    }
  } else { // both children exist
    // 'larger' is only well named if comp = std::less<>{}
    auto larger = comp(heap[right], heap[left]) ? left : right;
    if (comp(heap[i], heap[larger])) {
      swap(heap[i], heap[larger]);
      bubble_down_recursively(heap, larger, comp);
    }
  }
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_recursively(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  heap[0] = newval;
  bubble_down_recursively(heap, 0, comp);
}

template <typename T, typename Comp = std::less<>>
void bubble_down_iteratively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
  const auto n = heap.size();

  while (true) {
    const std::size_t left  = 2 * i + 1;
    const std::size_t right = 2 * i + 2;

    std::size_t largest = i;

    if ((left < n) && comp(heap[largest], heap[left])) {
      largest = left;
    }
    if ((right < n) && comp(heap[largest], heap[right])) {
      largest = right;
    }

    if (largest == i) {
      break;
    }

    using std::swap; // enable ADL
    swap(heap[i], heap[largest]);
    i = largest;
  }
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_iteratively(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  heap[0] = newval;                       // stick it in anyway
  bubble_down_iteratively(heap, 0, comp); // and fix the heap
}

// borrowed from libstdc++ __push_heap
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void push_heap(RandomAccessIterator first, Distance holeIndex, Distance topIndex,
                         Tp value, Compare& comp) {
  Distance parent = (holeIndex - 1) / 2;
  while (holeIndex > topIndex && comp(*(first + parent), value)) {
    *(first + holeIndex) = *(first + parent);
    holeIndex            = parent;
    parent               = (holeIndex - 1) / 2;
  }
  *(first + holeIndex) = std::move(value);
}

// borrowed from libstdc++ __adjust_heap
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void adjust_heap(RandomAccessIterator first, Distance holeIndex, Distance len, Tp value,
                           Compare comp) {
  const Distance topIndex    = holeIndex;
  Distance       secondChild = holeIndex;
  while (secondChild < (len - 1) / 2) {
    secondChild = 2 * (secondChild + 1);
    if (comp(*(first + secondChild), *(first + (secondChild - 1)))) secondChild--;
    *(first + holeIndex) = *(first + secondChild);
    holeIndex            = secondChild;
  }
  if ((len & 1) == 0 && secondChild == (len - 2) / 2) {
    secondChild          = 2 * (secondChild + 1);
    *(first + holeIndex) = *(first + (secondChild - 1));
    holeIndex            = secondChild - 1;
  }
  push_heap(first, holeIndex, topIndex, value, comp);
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_adjust_heap(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  heap[0] = newval;
  adjust_heap(heap.begin(), 0L, heap.end() - heap.begin(), newval, comp);
}

template <typename T>
struct cmp_counter {
  static std::size_t cmpcount; // NOLINT must be static because STL takes Comp by value
  bool               operator()(T a, T b) {
    ++cmpcount;
    return a < b; // effectively std::less<>{};
  }
  static void reset() { cmpcount = 0; }
};
template <typename T>
std::size_t cmp_counter<T>::cmpcount = 0; // NOLINT global static

int main() {

  using ValueType = int;
  
  struct method {
    using cb_t = void (*)(std::vector<ValueType>&, ValueType, cmp_counter<ValueType>);
    std::string label;
    cb_t        cb;
  };
  auto methods = std::vector<method>{
      {"std::heap / std::priority_queue", &replace_top_using_stl},
      {"bubble up recursively", &replace_top_using_bubble_down_recursively},
      {"bubble up iteratively", &replace_top_using_bubble_down_iteratively},
      {"libstc++ __adjust_heap", &replace_top_using_adjust_heap},
  };

  std::cout << fmt::format("{:35s} {:s}\n", "method", "avg_cmp_cnt");
  for (auto& method: methods) {
    auto prng              = std::mt19937_64(1); // NOLINT fixed seed for repeatability
    auto heap_element_dist = std::uniform_int_distribution<>(1, 100);
    auto heap_size_dist    = std::uniform_int_distribution<std::size_t>(3, 100);

    const std::size_t number_of_trials = 1'000'000;

    std::size_t total_cmpcount = 0;
    cmp_counter<ValueType> comp;
    for (unsigned i = 0; i != number_of_trials; ++i) {
      std::vector<int> h(heap_size_dist(prng));
      std::generate(h.begin(), h.end(), [&] { return ValueType(heap_element_dist(prng)); });

      std::make_heap(h.begin(), h.end(), comp);

      auto newval = ValueType(heap_element_dist(prng));
      cmp_counter<ValueType>::reset();
      method.cb(h, newval, comp);
      total_cmpcount += cmp_counter<ValueType>::cmpcount;

      if (!std::is_heap(h.begin(), h.end(), comp)) {
        std::cerr << method.label << "NOT A HEAP ANYMORE!!\n";
        return EXIT_FAILURE;
      }
    }
    std::cout << fmt::format("{:35s} {:f}\n", method.label,
                             double(total_cmpcount) / number_of_trials);
  }
}
EN

回答 3

Code Review用户

回答已采纳

发布于 2022-03-08 21:14:14

最小化比较

您测试过的所有算法都需要O(\log N)比较。__adjust_heap()方法似乎平均节省了一次比较。我不确定您是否能做得更好,使用堆或树排序优先级队列。

也许可以做一个空间和时间的权衡。如果您能够找到一种方法来压缩键,以便将索引压缩到一个比块数大几倍的桶数组中,那么从该数组中插入或删除一个键平均不需要进行任何比较,因为一个桶在插入时很可能是空的,或者删除时有一个元素。剩下的问题是找到第一个桶,其中有一个项目,这将有自己的成本,规模与桶的数量,但这可能是非常便宜的。然后,诀窍是找到一个平衡,所以维护这个数组比维护一个堆更便宜。

至于守则本身:

使用std::ranges

您正在使用C++20特性,但我忽略了std::ranges的使用,这将简化一些代码。

使用std::bit_width()实现log2()

我有点惊讶,GCC和Clang都没能优化你天真的整数基数-2对数的实现。但是,如果您使用std::bit_width() - 1,它们将设法将循环替换为x86体系结构上的一个lzcnt指令。

使您的代码更加通用的

通过在容器的值类型和比较函数上模板化代码,您确实做出了很大的努力使代码通用化,但是您应该能够使它在std::vector以外的容器上工作。我只想复制std::ranges::pop_heap()的接口和相关的函数。

为了更多的乐趣,您可以让cmp_counter()成为一个函数适配器,默认情况下它会适应std::less,但这将允许您将它添加到任何比较操作符中。

利用front()back()

特别是如果您使代码更通用,您不应该假设容器是随机访问的,并且避免类似于heap[0] = ...的代码。相反,您可以编写如下内容:

代码语言:javascript
复制
heap.front() = newval;

您可以将pop()push()操作替换为replace_top_using_stl()中的单个heap.back() = newval

添加更多的staticconst

我已经看到了const的良好使用,但在cmp_counter之外没有static。考虑让大多数函数static。此外,您可以在更多的情况下使用const。特别是,methods可以成为一个static const数组。

不一致使用fmt::format()

我在几个地方看到了fmt::format(),但在其他情况下,您只是使用<<连接输出。我建议坚持使用fmt::format()。更好的是,避免使用std::cout <<编写fmt::print()

票数 2
EN

Code Review用户

发布于 2022-03-09 02:14:14

不要以为你不能打败memcmp

这并不是对上述算法聚焦CR的直接回答,然而,它成功地挑战了“我们无法击败memcmp”的隐含假设--取得了惊人的结果。

如果std::array<std::byte, 20>::operator<比较是瓶颈,那么我们可以:

  1. 减少比较次数
  2. 使这些比较更快

以上CR及其公认的答案,集中于1。

事实证明,改进的潜力更大。

换句话说,在进行这些比较时,“从硬件中挤出更多的东西”是相对容易的。要比memcmp更快,使用64位总线和寄存器,而不是一次比较一个字节,或者您的memcmp版本所做的任何事情。

字符串Intrinsics是快速

下面的代码并不漂亮,也不是完全可移植的。它很可能是可移植的,只要付出一些努力。

第一种方法是对前16个字节使用_mm_cmpestri 内禀 (可与-msse4.2一起使用),然后对最后4个字节使用uint32_t比较:

代码语言:javascript
复制
  bool operator<(const pawned_pw& rhs) const {

    const __m128i simd_a = _mm_loadu_si128((__m128i*)(&hash));
    const __m128i simd_b = _mm_loadu_si128((__m128i*)(&rhs.hash));

    auto res = static_cast<unsigned>(_mm_cmpestri(simd_a, 16, simd_b, 16,
                                                  unsigned(_SIDD_CMP_EQUAL_EACH) |
                                                      unsigned(_SIDD_NEGATIVE_POLARITY) |
                                                      unsigned(_SIDD_UBYTE_OPS)));

    if (res != 16) {
      return hash[res] < rhs.hash[res]; // something in first 16 bytes differs
    }

    // this compiles to a load and `bswap` which should be cheap
    std::uint32_t tail     = __builtin_bswap32(*(std::uint32_t*)(&hash[16]));
    std::uint32_t rhs_tail = __builtin_bswap32(*(std::uint32_t*)(&rhs.hash[16]));
    return tail < rhs_tail;
  }

简单,64位无符号int是更快的

一旦我们知道了如何在典型的小endian系统上进行必要的字节顺序反转,我们就可以一路走下去,并使用2x64位比较来完成前16个字节。事实证明,这比更复杂的字符串比较内在的速度要快得多。

代码语言:javascript
复制
  bool operator<(const pawned_pw& rhs) const {

    // this compiles to a load and `bswap` which should be cheap
    // measured > 33% faster than hash < rhs.hash
    std::uint64_t head     = __builtin_bswap64(*(std::uint64_t*)(&hash[0]));
    std::uint64_t rhs_head = __builtin_bswap64(*(std::uint64_t*)(&rhs.hash[0]));
    if (head != rhs_head) return head < rhs_head;

    std::uint64_t mid     = __builtin_bswap64(*(std::uint64_t*)(&hash[8]));
    std::uint64_t rhs_mid = __builtin_bswap64(*(std::uint64_t*)(&rhs.hash[8]));
    if (mid != rhs_mid) return mid < rhs_mid;

    std::uint32_t tail     = __builtin_bswap32(*(std::uint32_t*)(&hash[16]));
    std::uint32_t rhs_tail = __builtin_bswap32(*(std::uint32_t*)(&rhs.hash[16]));
    return tail < rhs_tail;
  }

这个operator<的实现比调用memcmp的朴素hash < rhs.hash快50%(或2x)。

这意味着主应用程序的合并阶段现在比以前快了33%。从3分钟到1:50分钟;非常重要的收获。这仍然在使用std::priority_queue,这样我们也可以从__adjust_heap中获取收益,并且它们将是加性的。

虽然还没有完全增加,但我们现在处于合并阶段的愉快状态,不再受CPU限制。CPU现在在75-80%左右.

保持灵活性

稍微擦亮一下,我们可以注意到,本质比嵌入式组装要好得多,因为它们使我们能够轻松地将现代c++特性与低级机器指令相结合。在这里,我们可以稍微修改上面的内容,以支持c++20 operator<=>

代码语言:javascript
复制
  std::strong_ordering operator<=>(const pawned_pw& rhs) const {

    // this compiles to a load and `bswap` which should be cheap
    // measured > 33% faster than hash < rhs.hash
    std::uint64_t head     = __builtin_bswap64(*(std::uint64_t*)(&hash[0]));     // NOLINT
    std::uint64_t rhs_head = __builtin_bswap64(*(std::uint64_t*)(&rhs.hash[0])); // NOLINT
    if (head != rhs_head) return head <=> rhs_head;

    std::uint64_t mid     = __builtin_bswap64(*(std::uint64_t*)(&hash[8]));     // NOLINT
    std::uint64_t rhs_mid = __builtin_bswap64(*(std::uint64_t*)(&rhs.hash[8])); // NOLINT
    if (mid != rhs_mid) return mid <=> rhs_mid;

    std::uint32_t tail     = __builtin_bswap32(*(std::uint32_t*)(&hash[16]));     // NOLINT
    std::uint32_t rhs_tail = __builtin_bswap32(*(std::uint32_t*)(&rhs.hash[16])); // NOLINT
    return tail <=> rhs_tail;
  }

当然,我们也会以同样的方式来做operator==,但我们不需要为相等的比较而费心使用bswap

代码语言:javascript
复制
  bool operator==(const pawned_pw& rhs) const {
    std::uint64_t head     = *(std::uint64_t*)(&hash[0]);     // NOLINT
    std::uint64_t rhs_head = *(std::uint64_t*)(&rhs.hash[0]); // NOLINT
    if (head != rhs_head) return false;

    std::uint64_t mid     = *(std::uint64_t*)(&hash[8]);     // NOLINT
    std::uint64_t rhs_mid = *(std::uint64_t*)(&rhs.hash[8]); // NOLINT
    if (mid != rhs_mid) return false;

    std::uint32_t tail     = *(std::uint32_t*)(&hash[16]);     // NOLINT
    std::uint32_t rhs_tail = *(std::uint32_t*)(&rhs.hash[16]); // NOLINT
    return tail == rhs_tail;
  }
票数 3
EN

Code Review用户

发布于 2022-05-29 09:44:48

减少堆操作中的比较数

引入到堆中的条目中大约有一半是叶子。

一项建议是对此要实事求是

在查看新的键之前,先确定叶子的路径。

代码语言:javascript
复制
/** re-heapify 0-based heap when heap[root] may break ordering below
 *    (not have priority over all its children). 
 * switched shall return true if its right operand has priority over its left. */
// index runs towards bottom of heap and bounces up to root, again
void 
bounce_iteratively(std::vector<T>& heap, std::size_t root, Comp switched = {}) {
    const auto
        n = heap.size(),
        first_leaf = n / 2,
        last_parent_of_two = (n-3) / 2;
    if (first_leaf <= root)
        return;
    std::size_t i = root;
    while (i <= last_parent_of_two) { // a first comparison per level
        const std::size_t
            left  = 2 * i + 1,
            right = left + 1;
        i = switched(heap[right], heap[left])) ? left : right;
    }
    if (i < first_leaf))// parent of one ...
        i = 2 * i + 1;  // repeated expression  // left;
  // all below can be replaced by call to (std::?)push_heap()??
    T addition = heap[root];
    using std::swap; // enable ADL
    for ( ; root < i ; i = (i-1)/2)
        if (switched(addition, heap[i])) {  // the second comparison per level
            swap(addition, heap[i]);
            break;
        }
    for ( ; root <= i ; i = (i-1)/2)
        swap(addition, heap[i]);  // no more comparisons
}

(这实际上接近于我预期的replace_top_using_stl()'s pop_heap;pop_back;push_back;push_heap会做的事情,尤其是比较的顺序。)

  • 减少比较的最明显的方法是使用非比较排序。

即使最初只将键空间划分为20个部分(保留其数量)或32个部分(考虑到计算part#的不均匀分布或感知到的困难),您也可以继续编写/连接,而不是合并。

  • 对于n= 20,我将在有序数组中计算二进制搜索和插入。

  • 您可以将alignof状态为报告4字节对齐。

虽然我不知道如何破坏8字节对齐与偶数([buffered] 1000 at a time)为24(?)-byte struct pawned_pw,使用alignas(8)是显式的。(对于后期的机械大容量存储设备和系统调用来说,24K似乎有点低。)

票数 1
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/274756

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档