我正在开发一个小型应用程序,它涉及将20 as文件作为操作之一进行排序。由于内存限制,这是通过将它们分解为~1GB块,在内存中排序,然后写回磁盘来完成的。然后,作为第二阶段,读取每个文件的头,选择最小/最大的文件并流到最后的输出文件。
这属于外部排序的领域。
这个CR将只关注其中的一小部分,当流到最终文件时,这部分内容涉及到“从哪个块中提取下一个元素”。结果表明,这一部分,特别是元素比较,是CPU约束和瓶颈。被排序的元素是std::array<std::byte, 20>,所以这是一个非常重要的比较,它可以编译成memcmp。
编辑结果表明,通过一些非常简单的“大64位加载、字节交换和比较”,我们可以比memcmp快得多。
见下面的自我答案-编辑
因此,我们需要一个数据结构,它可以方便地选择下一个元素。通常大约有20个块,所以我的第一次尝试只是使用std::min_element,因为线性搜索对于少量元素来说是非常快的。当我发现比较是我使用std::priority_queue的CPU瓶颈时。这使比较减少了>2倍,因此这是一个很大的收获。
为了澄清,比较次数的减少,也使整个应用程序的速度提高了2倍,从20×1GB文件合并阶段的6分钟下降到3分钟。但是在3分钟时,CPU仍然是@ 100%,并且几乎全部在20字节数组的operator<中。
这个CR是关于我们是否还能做得更好,通过进一步减少比较的次数。
您不能在std::priority_queue中更改/更新元素,这正是我们需要做的。这是临界回路的算法。
std::priority_queue<head, std::vector<head>, decltype(cmp)> heads(cmp);
while (!heads.empty()) {
const head& t = heads.top();
sorted.write(t.value);
std::size_t chunk_idx = t.idx;
heads.pop();
if (auto& chunk = chunks[chunk_idx]; chunk.current != chunk.end) {
heads.push({*(chunk.current), chunk_idx});
++(chunk.current);
}
}因此,调用queue.pop()和queue.push()似乎效率很低,两者都需要在堆中移动元素。仅仅“修改”.top() (读过它并将其写入输出)、转换为新的值(实质上在该块的迭代器上调用std::next或++ ),然后“重新堆”似乎更有效。
让我们躲在std::priority_queue的引擎盖下。使用c++ STL堆算法进行此操作的规范方法是:
// Note I am using a max-heap, but the comparator is customisable below
std::pop_heap(heap.begin(), heap.end(), comp);
heap.pop_back();
heap.push_back(newval);
std::push_heap(heap.begin(), heap.end(), comp);大多数堆实现在下面使用原语,称为“冒泡/筛选向上/向下”。我们真正需要的是bubble_down实现,并在已更改的top()元素上调用它。
c++标准不提供原语,但是实现并不困难,下面的代码中有2(一个迭代和一个递归)。
heap.top()元素这是一个非常简单的案例。不存在“知道要更改的元素在堆中的位置”的问题--这是这种方法的典型关注点。所以我们肯定能做得更好对吧?
下面的代码研究了4种实现:
__adjust_heap接__push_heap algo我生成随机堆大小、内容和替换值,并迭代1M次。
-O3上“尾调用”优化了递归,而且,令人惊讶的是,即使在这种非常特殊的情况下,仍然有比STL更多的比较。所以仅仅使用堆原语并不能给你带来任何好处。典型结果:
method avg_cmp_cnt
std::heap / std::priority_queue 7.568285
bubble up recursively 8.031054
bubble up iteratively 8.047352
libstc++ __adjust_heap 6.327297守则:
#include "fmt/core.h"
#include <algorithm>
#include <cassert>
#include <concepts>
#include <cstddef>
#include <cstdlib>
#include <execution>
#include <iostream>
#include <random>
#include <stdexcept>
template <std::unsigned_integral T>
T log2(T x) {
T log = 0;
while (x >>= 1U) ++log;
return log;
}
template <typename T, typename Comp = std::less<>>
void print_heap(const std::vector<T>& heap, Comp comp = {}) {
std::size_t levels = log2(heap.size()) + 1;
unsigned width = 6 * (1U << (levels - 1U));
std::cout << "\n\n";
for (const auto& e: heap) std::cout << e << " ";
std::cout << "\n";
std::cout << fmt::format("is_heap = {:}\n\n", std::is_heap(heap.begin(), heap.end(), comp));
if (heap.empty()) {
std::cout << "<empty heap>\n";
return;
}
unsigned idx = 0;
bool done = false;
for (unsigned l = 0; l != levels; ++l) {
for (unsigned e = 0; e != 1U << l; ++e) {
std::cout << fmt::format("{:^{}}", heap[idx], width);
++idx;
if (idx == heap.size()) {
done = true;
break;
}
}
width /= 2;
std::cout << "\n\n";
if (done) break;
}
}
template <typename T, typename Comp = std::less<>>
void replace_top_using_stl(std::vector<T>& heap, T newval, Comp comp = {}) {
if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");
assert(std::is_heap(heap.begin(), heap.end(), comp));
std::pop_heap(heap.begin(), heap.end(), comp);
heap.pop_back();
heap.push_back(newval);
std::push_heap(heap.begin(), heap.end(), comp);
}
template <typename T, typename Comp = std::less<>>
// NOLINTNEXTLINE recursion is tailcall eliminated by compiler
void bubble_down_recursively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
const auto left = 2 * i + 1;
const auto right = 2 * i + 2;
const auto n = heap.size();
using std::swap; // enable ADL
if (left >= n) { // no children
return;
} else if (right >= n) { // left exists right does not. NOLINT else after return
if (comp(heap[i], heap[left])) {
swap(heap[i], heap[left]);
bubble_down_recursively(heap, left, comp);
}
} else { // both children exist
// 'larger' is only well named if comp = std::less<>{}
auto larger = comp(heap[right], heap[left]) ? left : right;
if (comp(heap[i], heap[larger])) {
swap(heap[i], heap[larger]);
bubble_down_recursively(heap, larger, comp);
}
}
}
template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_recursively(std::vector<T>& heap, T newval, Comp comp = {}) {
if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");
assert(std::is_heap(heap.begin(), heap.end(), comp));
heap[0] = newval;
bubble_down_recursively(heap, 0, comp);
}
template <typename T, typename Comp = std::less<>>
void bubble_down_iteratively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
const auto n = heap.size();
while (true) {
const std::size_t left = 2 * i + 1;
const std::size_t right = 2 * i + 2;
std::size_t largest = i;
if ((left < n) && comp(heap[largest], heap[left])) {
largest = left;
}
if ((right < n) && comp(heap[largest], heap[right])) {
largest = right;
}
if (largest == i) {
break;
}
using std::swap; // enable ADL
swap(heap[i], heap[largest]);
i = largest;
}
}
template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_iteratively(std::vector<T>& heap, T newval, Comp comp = {}) {
if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");
assert(std::is_heap(heap.begin(), heap.end(), comp));
heap[0] = newval; // stick it in anyway
bubble_down_iteratively(heap, 0, comp); // and fix the heap
}
// borrowed from libstdc++ __push_heap
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void push_heap(RandomAccessIterator first, Distance holeIndex, Distance topIndex,
Tp value, Compare& comp) {
Distance parent = (holeIndex - 1) / 2;
while (holeIndex > topIndex && comp(*(first + parent), value)) {
*(first + holeIndex) = *(first + parent);
holeIndex = parent;
parent = (holeIndex - 1) / 2;
}
*(first + holeIndex) = std::move(value);
}
// borrowed from libstdc++ __adjust_heap
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void adjust_heap(RandomAccessIterator first, Distance holeIndex, Distance len, Tp value,
Compare comp) {
const Distance topIndex = holeIndex;
Distance secondChild = holeIndex;
while (secondChild < (len - 1) / 2) {
secondChild = 2 * (secondChild + 1);
if (comp(*(first + secondChild), *(first + (secondChild - 1)))) secondChild--;
*(first + holeIndex) = *(first + secondChild);
holeIndex = secondChild;
}
if ((len & 1) == 0 && secondChild == (len - 2) / 2) {
secondChild = 2 * (secondChild + 1);
*(first + holeIndex) = *(first + (secondChild - 1));
holeIndex = secondChild - 1;
}
push_heap(first, holeIndex, topIndex, value, comp);
}
template <typename T, typename Comp = std::less<>>
void replace_top_using_adjust_heap(std::vector<T>& heap, T newval, Comp comp = {}) {
if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");
assert(std::is_heap(heap.begin(), heap.end(), comp));
heap[0] = newval;
adjust_heap(heap.begin(), 0L, heap.end() - heap.begin(), newval, comp);
}
template <typename T>
struct cmp_counter {
static std::size_t cmpcount; // NOLINT must be static because STL takes Comp by value
bool operator()(T a, T b) {
++cmpcount;
return a < b; // effectively std::less<>{};
}
static void reset() { cmpcount = 0; }
};
template <typename T>
std::size_t cmp_counter<T>::cmpcount = 0; // NOLINT global static
int main() {
using ValueType = int;
struct method {
using cb_t = void (*)(std::vector<ValueType>&, ValueType, cmp_counter<ValueType>);
std::string label;
cb_t cb;
};
auto methods = std::vector<method>{
{"std::heap / std::priority_queue", &replace_top_using_stl},
{"bubble up recursively", &replace_top_using_bubble_down_recursively},
{"bubble up iteratively", &replace_top_using_bubble_down_iteratively},
{"libstc++ __adjust_heap", &replace_top_using_adjust_heap},
};
std::cout << fmt::format("{:35s} {:s}\n", "method", "avg_cmp_cnt");
for (auto& method: methods) {
auto prng = std::mt19937_64(1); // NOLINT fixed seed for repeatability
auto heap_element_dist = std::uniform_int_distribution<>(1, 100);
auto heap_size_dist = std::uniform_int_distribution<std::size_t>(3, 100);
const std::size_t number_of_trials = 1'000'000;
std::size_t total_cmpcount = 0;
cmp_counter<ValueType> comp;
for (unsigned i = 0; i != number_of_trials; ++i) {
std::vector<int> h(heap_size_dist(prng));
std::generate(h.begin(), h.end(), [&] { return ValueType(heap_element_dist(prng)); });
std::make_heap(h.begin(), h.end(), comp);
auto newval = ValueType(heap_element_dist(prng));
cmp_counter<ValueType>::reset();
method.cb(h, newval, comp);
total_cmpcount += cmp_counter<ValueType>::cmpcount;
if (!std::is_heap(h.begin(), h.end(), comp)) {
std::cerr << method.label << "NOT A HEAP ANYMORE!!\n";
return EXIT_FAILURE;
}
}
std::cout << fmt::format("{:35s} {:f}\n", method.label,
double(total_cmpcount) / number_of_trials);
}
}发布于 2022-03-08 21:14:14
您测试过的所有算法都需要O(\log N)比较。__adjust_heap()方法似乎平均节省了一次比较。我不确定您是否能做得更好,使用堆或树排序优先级队列。
也许可以做一个空间和时间的权衡。如果您能够找到一种方法来压缩键,以便将索引压缩到一个比块数大几倍的桶数组中,那么从该数组中插入或删除一个键平均不需要进行任何比较,因为一个桶在插入时很可能是空的,或者删除时有一个元素。剩下的问题是找到第一个桶,其中有一个项目,这将有自己的成本,规模与桶的数量,但这可能是非常便宜的。然后,诀窍是找到一个平衡,所以维护这个数组比维护一个堆更便宜。
至于守则本身:
std::ranges您正在使用C++20特性,但我忽略了std::ranges的使用,这将简化一些代码。
std::bit_width()实现log2()我有点惊讶,GCC和Clang都没能优化你天真的整数基数-2对数的实现。但是,如果您使用std::bit_width() - 1,它们将设法将循环替换为x86体系结构上的一个lzcnt指令。
通过在容器的值类型和比较函数上模板化代码,您确实做出了很大的努力使代码通用化,但是您应该能够使它在std::vector以外的容器上工作。我只想复制std::ranges::pop_heap()的接口和相关的函数。
为了更多的乐趣,您可以让cmp_counter()成为一个函数适配器,默认情况下它会适应std::less,但这将允许您将它添加到任何比较操作符中。
front()和back()特别是如果您使代码更通用,您不应该假设容器是随机访问的,并且避免类似于heap[0] = ...的代码。相反,您可以编写如下内容:
heap.front() = newval;您可以将pop()和push()操作替换为replace_top_using_stl()中的单个heap.back() = newval。
static和const我已经看到了const的良好使用,但在cmp_counter之外没有static。考虑让大多数函数static。此外,您可以在更多的情况下使用const。特别是,methods可以成为一个static const数组。
fmt::format()我在几个地方看到了fmt::format(),但在其他情况下,您只是使用<<连接输出。我建议坚持使用fmt::format()。更好的是,避免使用std::cout <<编写fmt::print()。
发布于 2022-03-09 02:14:14
memcmp这并不是对上述算法聚焦CR的直接回答,然而,它成功地挑战了“我们无法击败memcmp”的隐含假设--取得了惊人的结果。
如果std::array<std::byte, 20>::operator<比较是瓶颈,那么我们可以:
以上CR及其公认的答案,集中于1。
事实证明,改进的潜力更大。
换句话说,在进行这些比较时,“从硬件中挤出更多的东西”是相对容易的。要比memcmp更快,使用64位总线和寄存器,而不是一次比较一个字节,或者您的memcmp版本所做的任何事情。
下面的代码并不漂亮,也不是完全可移植的。它很可能是可移植的,只要付出一些努力。
第一种方法是对前16个字节使用_mm_cmpestri 内禀 (可与-msse4.2一起使用),然后对最后4个字节使用uint32_t比较:
bool operator<(const pawned_pw& rhs) const {
const __m128i simd_a = _mm_loadu_si128((__m128i*)(&hash));
const __m128i simd_b = _mm_loadu_si128((__m128i*)(&rhs.hash));
auto res = static_cast<unsigned>(_mm_cmpestri(simd_a, 16, simd_b, 16,
unsigned(_SIDD_CMP_EQUAL_EACH) |
unsigned(_SIDD_NEGATIVE_POLARITY) |
unsigned(_SIDD_UBYTE_OPS)));
if (res != 16) {
return hash[res] < rhs.hash[res]; // something in first 16 bytes differs
}
// this compiles to a load and `bswap` which should be cheap
std::uint32_t tail = __builtin_bswap32(*(std::uint32_t*)(&hash[16]));
std::uint32_t rhs_tail = __builtin_bswap32(*(std::uint32_t*)(&rhs.hash[16]));
return tail < rhs_tail;
}。
一旦我们知道了如何在典型的小endian系统上进行必要的字节顺序反转,我们就可以一路走下去,并使用2x64位比较来完成前16个字节。事实证明,这比更复杂的字符串比较内在的速度要快得多。
bool operator<(const pawned_pw& rhs) const {
// this compiles to a load and `bswap` which should be cheap
// measured > 33% faster than hash < rhs.hash
std::uint64_t head = __builtin_bswap64(*(std::uint64_t*)(&hash[0]));
std::uint64_t rhs_head = __builtin_bswap64(*(std::uint64_t*)(&rhs.hash[0]));
if (head != rhs_head) return head < rhs_head;
std::uint64_t mid = __builtin_bswap64(*(std::uint64_t*)(&hash[8]));
std::uint64_t rhs_mid = __builtin_bswap64(*(std::uint64_t*)(&rhs.hash[8]));
if (mid != rhs_mid) return mid < rhs_mid;
std::uint32_t tail = __builtin_bswap32(*(std::uint32_t*)(&hash[16]));
std::uint32_t rhs_tail = __builtin_bswap32(*(std::uint32_t*)(&rhs.hash[16]));
return tail < rhs_tail;
}这个operator<的实现比调用memcmp的朴素hash < rhs.hash快50%(或2x)。
这意味着主应用程序的合并阶段现在比以前快了33%。从3分钟到1:50分钟;非常重要的收获。这仍然在使用std::priority_queue,这样我们也可以从__adjust_heap中获取收益,并且它们将是加性的。
虽然还没有完全增加,但我们现在处于合并阶段的愉快状态,不再受CPU限制。CPU现在在75-80%左右.
稍微擦亮一下,我们可以注意到,本质比嵌入式组装要好得多,因为它们使我们能够轻松地将现代c++特性与低级机器指令相结合。在这里,我们可以稍微修改上面的内容,以支持c++20 operator<=>:
std::strong_ordering operator<=>(const pawned_pw& rhs) const {
// this compiles to a load and `bswap` which should be cheap
// measured > 33% faster than hash < rhs.hash
std::uint64_t head = __builtin_bswap64(*(std::uint64_t*)(&hash[0])); // NOLINT
std::uint64_t rhs_head = __builtin_bswap64(*(std::uint64_t*)(&rhs.hash[0])); // NOLINT
if (head != rhs_head) return head <=> rhs_head;
std::uint64_t mid = __builtin_bswap64(*(std::uint64_t*)(&hash[8])); // NOLINT
std::uint64_t rhs_mid = __builtin_bswap64(*(std::uint64_t*)(&rhs.hash[8])); // NOLINT
if (mid != rhs_mid) return mid <=> rhs_mid;
std::uint32_t tail = __builtin_bswap32(*(std::uint32_t*)(&hash[16])); // NOLINT
std::uint32_t rhs_tail = __builtin_bswap32(*(std::uint32_t*)(&rhs.hash[16])); // NOLINT
return tail <=> rhs_tail;
}当然,我们也会以同样的方式来做operator==,但我们不需要为相等的比较而费心使用bswap:
bool operator==(const pawned_pw& rhs) const {
std::uint64_t head = *(std::uint64_t*)(&hash[0]); // NOLINT
std::uint64_t rhs_head = *(std::uint64_t*)(&rhs.hash[0]); // NOLINT
if (head != rhs_head) return false;
std::uint64_t mid = *(std::uint64_t*)(&hash[8]); // NOLINT
std::uint64_t rhs_mid = *(std::uint64_t*)(&rhs.hash[8]); // NOLINT
if (mid != rhs_mid) return false;
std::uint32_t tail = *(std::uint32_t*)(&hash[16]); // NOLINT
std::uint32_t rhs_tail = *(std::uint32_t*)(&rhs.hash[16]); // NOLINT
return tail == rhs_tail;
}发布于 2022-05-29 09:44:48
减少堆操作中的比较数
引入到堆中的条目中大约有一半是叶子。
一项建议是对此要实事求是和
在查看新的键之前,先确定叶子的路径。
/** re-heapify 0-based heap when heap[root] may break ordering below
* (not have priority over all its children).
* switched shall return true if its right operand has priority over its left. */
// index runs towards bottom of heap and bounces up to root, again
void
bounce_iteratively(std::vector<T>& heap, std::size_t root, Comp switched = {}) {
const auto
n = heap.size(),
first_leaf = n / 2,
last_parent_of_two = (n-3) / 2;
if (first_leaf <= root)
return;
std::size_t i = root;
while (i <= last_parent_of_two) { // a first comparison per level
const std::size_t
left = 2 * i + 1,
right = left + 1;
i = switched(heap[right], heap[left])) ? left : right;
}
if (i < first_leaf))// parent of one ...
i = 2 * i + 1; // repeated expression // left;
// all below can be replaced by call to (std::?)push_heap()??
T addition = heap[root];
using std::swap; // enable ADL
for ( ; root < i ; i = (i-1)/2)
if (switched(addition, heap[i])) { // the second comparison per level
swap(addition, heap[i]);
break;
}
for ( ; root <= i ; i = (i-1)/2)
swap(addition, heap[i]); // no more comparisons
}(这实际上接近于我预期的replace_top_using_stl()'s pop_heap;pop_back;push_back;push_heap会做的事情,尤其是比较的顺序。)
即使最初只将键空间划分为20个部分(保留其数量)或32个部分(考虑到计算part#的不均匀分布或感知到的困难),您也可以继续编写/连接,而不是合并。
alignof状态为报告4字节对齐。虽然我不知道如何破坏8字节对齐与偶数([buffered] 1000 at a time)为24(?)-byte struct pawned_pw,使用alignas(8)是显式的。(对于后期的机械大容量存储设备和系统调用来说,24K似乎有点低。)
https://codereview.stackexchange.com/questions/274756
复制相似问题