首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >快速排序大阵列高斯分布数据的方法

快速排序大阵列高斯分布数据的方法
EN

Code Golf用户
提问于 2011-06-07 12:58:58
回答 22查看 12.1K关注 0票数 71

继对这个问题的兴趣之后,我认为通过提出一个竞赛来使答案更加客观和定量会很有趣。

这个想法很简单:我已经生成了一个二进制文件,包含5000万个高斯分布的双倍(平均为0,stdev 1)。我们的目标是制作一个程序,尽可能快地在内存中对这些内容进行排序。python中的一个非常简单的引用实现需要1m4s来完成。我们能走多低?

规则如下:使用打开文件"gaussian.dat“并对内存中的数字进行排序(不需要输出它们)的程序回答,以及构建和运行程序的说明。该程序必须能够在我的Arch机器上工作(这意味着您可以使用任何编程语言或库,这些语言或库可以很容易地安装在这个系统上)。

该程序必须是合理的可读性,以便我可以确保它是安全的启动(没有汇编程序的解决方案,请!)。

我将在我的机器上运行答案(四核,4GB内存)。最快的解决方案将得到公认的答案和100分赏金:)

用于生成数字的程序:

代码语言:javascript
复制
#!/usr/bin/env python
import random
from array import array
from sys import argv
count=int(argv[1])
a=array('d',(random.gauss(0,1) for x in xrange(count)))
f=open("gaussian.dat","wb")
a.tofile(f)

简单的参考实现:

代码语言:javascript
复制
#!/usr/bin/env python
from array import array
from sys import argv
count=int(argv[1])
a=array('d')
a.fromfile(open("gaussian.dat"),count)
print "sorting..."
b=sorted(a)

只有4GB的RAM可用。

请注意,竞赛的目的是看我们是否可以使用有关数据的先前信息。它不应该是不同编程语言实现之间令人讨厌的匹配!

EN

回答 22

Code Golf用户

回答已采纳

发布于 2011-06-09 12:36:31

下面是C++中的一个解决方案,它首先将数字划分为具有相同期望元素数的桶,然后分别对每个桶进行排序。它根据维基百科的一些公式预先计算一个累积分布函数的表,然后从这个表中插值值,得到一个快速的近似。

在多个线程中运行几个步骤来使用这四个核心。

代码语言:javascript
复制
#include <cstdlib>
#include <math.h>
#include <stdio.h>
#include <algorithm>

#include <tbb/parallel_for.h>

using namespace std;

typedef unsigned long long ull;

double signum(double x) {
    return (x<0) ? -1 : (x>0) ? 1 : 0;
}

const double fourOverPI = 4 / M_PI;

double erf(double x) {
    double a = 0.147;
    double x2 = x*x;
    double ax2 = a*x2;
    double f1 = -x2 * (fourOverPI + ax2) / (1 + ax2);
    double s1 = sqrt(1 - exp(f1));
    return signum(x) * s1;
}

const double sqrt2 = sqrt(2);

double cdf(double x) {
    return 0.5 + erf(x / sqrt2) / 2;
}

const int cdfTableSize = 200;
const double cdfTableLimit = 5;
double* computeCdfTable(int size) {
    double* res = new double[size];
    for (int i = 0; i < size; ++i) {
        res[i] = cdf(cdfTableLimit * i / (size - 1));
    }
    return res;
}
const double* const cdfTable = computeCdfTable(cdfTableSize);

double cdfApprox(double x) {
    bool negative = (x < 0);
    if (negative) x = -x;
    if (x > cdfTableLimit) return negative ? cdf(-x) : cdf(x);
    double p = (cdfTableSize - 1) * x / cdfTableLimit;
    int below = (int) p;
    if (p == below) return negative ? -cdfTable[below] : cdfTable[below];
    int above = below + 1;
    double ret = cdfTable[below] +
            (cdfTable[above] - cdfTable[below])*(p - below);
    return negative ? 1 - ret : ret;
}

void print(const double* arr, int len) {
    for (int i = 0; i < len; ++i) {
        printf("%e; ", arr[i]);
    }
    puts("");
}

void print(const int* arr, int len) {
    for (int i = 0; i < len; ++i) {
        printf("%d; ", arr[i]);
    }
    puts("");
}

void fillBuckets(int N, int bucketCount,
        double* data, int* partitions,
        double* buckets, int* offsets) {
    for (int i = 0; i < N; ++i) {
        ++offsets[partitions[i]];
    }

    int offset = 0;
    for (int i = 0; i < bucketCount; ++i) {
        int t = offsets[i];
        offsets[i] = offset;
        offset += t;
    }
    offsets[bucketCount] = N;

    int next[bucketCount];
    memset(next, 0, sizeof(next));
    for (int i = 0; i < N; ++i) {
        int p = partitions[i];
        int j = offsets[p] + next[p];
        ++next[p];
        buckets[j] = data[i];
    }
}

class Sorter {
public:
    Sorter(double* data, int* offsets) {
        this->data = data;
        this->offsets = offsets;
    }

    static void radixSort(double* arr, int len) {
        ull* encoded = (ull*)arr;
        for (int i = 0; i < len; ++i) {
            ull n = encoded[i];
            if (n & signBit) {
                n ^= allBits;
            } else {
                n ^= signBit;
            }
            encoded[i] = n;
        }

        const int step = 11;
        const ull mask = (1ull << step) - 1;
        int offsets[8][1ull << step];
        memset(offsets, 0, sizeof(offsets));

        for (int i = 0; i < len; ++i) {
            for (int b = 0, j = 0; b < 64; b += step, ++j) {
                int p = (encoded[i] >> b) & mask;
                ++offsets[j][p];
            }
        }

        int sum[8] = {0};
        for (int i = 0; i <= mask; i++) {
            for (int b = 0, j = 0; b < 64; b += step, ++j) {
                int t = sum[j] + offsets[j][i];
                offsets[j][i] = sum[j];
                sum[j] = t;
            }
        }

        ull* copy = new ull[len];
        ull* current = encoded;
        for (int b = 0, j = 0; b < 64; b += step, ++j) {
            for (int i = 0; i < len; ++i) {
                int p = (current[i] >> b) & mask;
                copy[offsets[j][p]] = current[i];
                ++offsets[j][p];
            }

            ull* t = copy;
            copy = current;
            current = t;
        }

        if (current != encoded) {
            for (int i = 0; i < len; ++i) {
                encoded[i] = current[i];
            }
        }

        for (int i = 0; i < len; ++i) {
            ull n = encoded[i];
            if (n & signBit) {
                n ^= signBit;
            } else {
                n ^= allBits;
            }
            encoded[i] = n;
        }
    }

    void operator() (tbb::blocked_range<int>& range) const {
        for (int i = range.begin(); i < range.end(); ++i) {
            double* begin = &data[offsets[i]];
            double* end = &data[offsets[i+1]];
            //std::sort(begin, end);
            radixSort(begin, end-begin);
        }
    }

private:
    double* data;
    int* offsets;
    static const ull signBit = 1ull << 63;
    static const ull allBits = ~0ull;
};

void sortBuckets(int bucketCount, double* data, int* offsets) {
    Sorter sorter(data, offsets);
    tbb::blocked_range<int> range(0, bucketCount);
    tbb::parallel_for(range, sorter);
    //sorter(range);
}

class Partitioner {
public:
    Partitioner(int bucketCount, double* data, int* partitions) {
        this->data = data;
        this->partitions = partitions;
        this->bucketCount = bucketCount;
    }

    void operator() (tbb::blocked_range<int>& range) const {
        for (int i = range.begin(); i < range.end(); ++i) {
            double d = data[i];
            int p = (int) (cdfApprox(d) * bucketCount);
            partitions[i] = p;
        }
    }

private:
    double* data;
    int* partitions;
    int bucketCount;
};

const int bucketCount = 512;
int offsets[bucketCount + 1];

int main(int argc, char** argv) {
    if (argc != 2) {
        printf("Usage: %s N\n N = the size of the input\n", argv[0]);
        return 1;
    }

    puts("initializing...");
    int N = atoi(argv[1]);
    double* data = new double[N];
    double* buckets = new double[N];
    memset(offsets, 0, sizeof(offsets));
    int* partitions = new int[N];

    puts("loading data...");
    FILE* fp = fopen("gaussian.dat", "rb");
    if (fp == 0 || fread(data, sizeof(*data), N, fp) != N) {
        puts("Error reading data");
        return 1;
    }
    //print(data, N);

    puts("assigning partitions...");
    tbb::parallel_for(tbb::blocked_range<int>(0, N),
            Partitioner(bucketCount, data, partitions));

    puts("filling buckets...");
    fillBuckets(N, bucketCount, data, partitions, buckets, offsets);
    data = buckets;

    puts("sorting buckets...");
    sortBuckets(bucketCount, data, offsets);

    puts("done.");

    /*
    for (int i = 0; i < N-1; ++i) {
        if (data[i] > data[i+1]) {
            printf("error at %d: %e > %e\n", i, data[i], data[i+1]);
        }
    }
    */

    //print(data, N);

    return 0;
}

要编译和运行它,请使用以下命令:

代码语言:javascript
复制
g++ -O3 -ltbb -o gsort gsort.cpp && time ./gsort 50000000

编辑:所有桶现在都放在同一个数组中,以消除将桶复制回数组的需要。此外,具有预先计算值的表的大小也减小了,因为这些值足够精确。不过,如果我更改256以上的桶数,程序运行的时间要比这个桶数的运行时间长。

编辑:相同的算法,不同的编程语言。我使用的是C++而不是Java,在我的机器上运行时间从3.2s缩短到2.35s。最理想的桶数仍在256左右(同样,在我的计算机上)。

顺便说一下,tbb真的很棒。

编辑:我受到Alexandru的伟大解决方案的启发,在最后一个阶段用他的基排序的修改版本取代了std::sort。我确实使用了一种不同的方法来处理正负数,尽管它需要更多地通过数组。我还决定对数组进行精确排序,并删除插入排序。稍后,我将花费一些时间测试这些更改如何影响性能,并可能恢复它们。而用基排序,时间从~2.35s缩短到~1.63s。

票数 14
EN

Code Golf用户

发布于 2011-06-07 16:52:00

在没有变得聪明的情况下,仅仅为了提供一个更快的天真排序器,这里有一个在C中的,它应该与您的Python相当:

代码语言:javascript
复制
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int cmp(const void* av, const void* bv) {
    double a = *(const double*)av;
    double b = *(const double*)bv;
    return a < b ? -1 : a > b ? 1 : 0;
}
int main(int argc, char** argv) {
    if (argc <= 1)
        return puts("No argument!");
    unsigned count = atoi(argv[1]);

    double *a = malloc(count * sizeof *a);

    FILE *f = fopen("gaussian.dat", "rb");
    if (fread(a, sizeof *a, count, f) != count)
        return puts("fread failed!");
    fclose(f);

    puts("sorting...");
    double *b = malloc(count * sizeof *b);
    memcpy(b, a, count * sizeof *b);
    qsort(b, count, sizeof *b, cmp);
    return 0;
}

gcc -O3编译,在我的机器上,这比Python少花了一分钟多: s大约11人,而S 87人。

票数 13
EN

Code Golf用户

发布于 2011-06-07 21:10:37

我根据标准偏差将其划分为几个段,最好将其分解为4个。编辑:在http://en.wikipedia.org/wiki/Error_function#Table_的_值中基于x值重写为分区

http://www.wolframalpha.com/input/?i=percentages+by++normal+distribution

我试过使用更小的桶,但一旦超出可用内核的数量,它的效果似乎就很小了。如果没有任何并行的集合,在我的盒子上需要37秒,并行的收集要花24秒。如果通过分发进行分区,则不能只使用数组,因此有更多的开销。我不清楚在scala中什么时候会对值进行装箱/取消装箱。

我使用Scala2.9作为并行集合。您可以下载它的tar.gz发行版。

编译: scalac SortFile.scala (我只是直接在scala/bin文件夹中复制它。

要运行: JAVA_OPTS="-Xmx4096M“./scala SortFile (我运行它时使用了2组ram,并获得了大约相同的时间)

编辑:删除allocateDirect,比只分配慢。删除了数组缓冲区初始大小的启动。实际上,它读取了整个50000000的值。重写以避免自动装箱问题(仍然慢于朴素的c)

代码语言:javascript
复制
import java.io.FileInputStream;
import java.nio.ByteBuffer
import java.nio.ByteOrder
import scala.collection.mutable.ArrayBuilder


object SortFile {

//used partition numbers from Damascus' solution
val partList = List(0, 0.15731, 0.31864, 0.48878, 0.67449, 0.88715, 1.1503, 1.5341)

val listSize = partList.size * 2;
val posZero = partList.size;
val neg = partList.map( _ * -1).reverse.zipWithIndex
val pos = partList.map( _ * 1).zipWithIndex.reverse

def partition(dbl:Double): Int = { 

//for each partition, i am running through the vals in order
//could make this a binary search to be more performant... but our list size is 4 (per side)

  if(dbl < 0) { return neg.find( dbl < _._1).get._2  }
  if(dbl > 0) { return posZero  + pos.find( dbl > _._1).get._2  }
      return posZero; 

}

  def main(args: Array[String])
    { 

    var l = 0
    val dbls = new Array[Double](50000000)
    val partList = new Array[Int](50000000)
    val pa = Array.fill(listSize){Array.newBuilder[Double]}
    val channel = new FileInputStream("../../gaussian.dat").getChannel()
    val bb = ByteBuffer.allocate(50000000 * 8)
    bb.order(ByteOrder.LITTLE_ENDIAN)
    channel.read(bb)
    bb.rewind
    println("Loaded" + System.currentTimeMillis())
    var dbl = 0.0
    while(bb.hasRemaining)
    { 
      dbl = bb.getDouble
      dbls.update(l,dbl) 

      l+=1
    }
    println("Beyond first load" + System.currentTimeMillis());

    for( i <- (0 to 49999999).par) { partList.update(i, partition(dbls(i)))}

    println("Partition computed" + System.currentTimeMillis() )
    for(i <- (0 to 49999999)) { pa(partList(i)) += dbls(i) }
    println("Partition completed " + System.currentTimeMillis())
    val toSort = for( i <- pa) yield i.result()
    println("Arrays Built" + System.currentTimeMillis());
    toSort.par.foreach{i:Array[Double] =>scala.util.Sorting.quickSort(i)};

    println("Read\t" + System.currentTimeMillis());

  }
}
票数 10
EN
页面原文内容由Code Golf提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codegolf.stackexchange.com/questions/2774

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档