首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >需要在C#中使用多线程、AVX、GPU或其他方式进行快速数据解码。

需要在C#中使用多线程、AVX、GPU或其他方式进行快速数据解码。
EN

Stack Overflow用户
提问于 2016-12-14 09:06:32
回答 2查看 655关注 0票数 9

我有一个非常简单的功能,它可以分解从板中获取的数据。因此,数据来自帧,每帧由多个信号组成,作为一个1昏暗的阵列,我需要转换成交错的阵列,每个信号一个。基本上如下:

我在C#中工作,但我在C中有一个核心函数,它可以处理单个信号:

代码语言:javascript
复制
void Functions::Demux(short*pMux, short* pDemux, int nSignals, int signalIndex, int nValuesPerSignal)
{
    short* pMuxStart = pMux + signalIndex;
    for (size_t i = 0; i < nValuesPerSignal; i++)
        *pDemux++ = *(pMuxStart + i * nSignals);
}

然后我通过C++/CLI (使用pin_ptr<short>,所以没有副本)从C#调用它,并使用并行方法:

代码语言:javascript
复制
Parallel.For(0, nSignals, (int i) =>
{
    Core.Functions.Demux(muxed, demuxed[i], nSignals, i, nFramesPerSignal);
});

所采集的数据来自16k信号( 16位分辨率),每个信号有20k个采样/s,数据速率为16k * 20k *2=640 16/s,当在2 Xeon E5-2620 v4工作站上运行代码(总共16核@2.1GHz)时,需要大约115%的解复用( 10s的数据需要11.5s)。

我至少要去一半的时间。有没有人知道某种方式,也许使用AVX技术,或更好的高性能库的这一点?或者也许有办法使用GPU?我宁愿不改进CPU硬件,因为这可能会花费更多。

编辑请考虑nSignalsnValuesPerSignal可以更改,交织数组必须在nSignals单独的数组中拆分,才能在C#中进一步处理。

编辑:进一步测试

同时,在科迪·格雷的评论之后,我用一个核心测试了:

代码语言:javascript
复制
void _Functions::_Demux(short*pMux, short** pDemux, int nSignals, int nValuesPerSignal)
{
    for (size_t i = 0; i < nSignals; i++)
    {
        for (size_t j = 0; j < nValuesPerSignal; j++)
            pDemux[i][j] = *pMux++;
    }
}

从C++/CLI调用:

代码语言:javascript
复制
int nSignals = demuxValues->Length;
int nValuesPerSignal = demuxValues[0]->Length;

pin_ptr<short> pMux = &muxValues[0];

array<GCHandle>^ pins = gcnew array<GCHandle>(nSignals);
for (size_t i = 0; i < nSignals; i++)
    pins[i] = GCHandle::Alloc(demuxValues[i], GCHandleType::Pinned);

try
{
    array<short*>^ arrays = gcnew array<short*>(nSignals);
    for (int i = 0; i < nSignals; i++)
        arrays[i] = static_cast<short*>(pins[i].AddrOfPinnedObject().ToPointer());

    pin_ptr<short*> pDemux = &arrays[0];

    _Functions::_Demux(pMux, pDemux, nSignals, nValuesPerSignal);
}
finally
{ foreach (GCHandle pin in pins) pin.Free();    }

我的计算时间约为105%,这太过了,但清楚地表明,Parallel.For不是正确的选择。根据您的答复,我想唯一可行的解决方案是SSE/AVX。我从来没有为此写过代码,你们中的一些人能为我指明正确的方向吗?我认为我们可以假设处理器将始终支持AVX2。

编辑:我的初始代码与Matt的解决方案

在我的机器上,我比较了我的初始代码(我只是使用Parallel.For并调用了一个C函数来去除单个信号)和machine提出的代码(仍然使用Parallel.For,但方式更聪明)。请参阅Parallel.For中使用的任务数量(我有32个线程)的结果( ms):

代码语言:javascript
复制
N.Taks   MyCode   MattCode
4        1649     841
8        997      740
16       884      497
32       810      290

因此性能有了很大的提高。但是,我仍然会对AVX的想法做一些测试。

EN

回答 2

Stack Overflow用户

发布于 2016-12-14 10:56:25

正如我在一条评论中提到的,您很可能在这里使用Parallel.For来射击自己。对于这个简单的操作来说,多线程的开销太大了。如果您非常需要原始速度,以致于要在C++中实现这一点,那么您就不应该在性能关键的任何事情上使用C#。

相反,您应该让C++代码一次并行地处理多个信号。一个好的C++编译器有一个比C# JIT编译器强大得多的优化器,所以它应该能够自动地向量化代码,允许您快速地编写可读的东西。编译器开关允许您轻松地指示目标计算机上可用的指令集: SSE2、SSSE3、AVX、AVX2等。编译器将自动发出适当的指令。

如果这还不够快,您可以考虑使用本质来手动编写代码,以便发出所需的SIMD指令。从你的问题中还不清楚输入有多大的变数。帧数是不变的吗?每个信号的数值是多少?

假设您的输入看起来与图解完全一样,您可以利用PSHUFB指令(由SSSE3及更高版本支持)在C++中编写以下实现:

代码语言:javascript
复制
static const __m128i mask = _mm_setr_epi8(0, 1,   6,  7,  12, 13,
                                          2, 3,   8,  9,  14, 15,
                                          4, 5,  10, 11);

void Demux(short* pMuxed, short* pDemuxed, size_t count)
{
   for (size_t i = 0; i <= (count % 8); ++i)
   {
      _mm_store_si128((__m128i*)pDemuxed,
                      _mm_shuffle_epi8(_mm_load_si128((const __m128i*)pMuxed),
                                       mask));
      pMuxed   += 8;
      pDemuxed += 8;
   }
}

在128位SSE寄存器中,我们可以打包8个不同的16位short值.因此,在循环内部,这段代码从输入数组加载下8个short,重新调整它们,使它们符合所需的顺序,然后将结果序列存储回输出数组中。它必须循环足够的次数,以便对输入数组中所有8short的集合执行此操作,因此我们执行count % 8时间。

生成的程序集代码如下所示:

代码语言:javascript
复制
    mov     edx,  DWORD PTR [esp+12]       ; load parameters into registers (count)
    mov     ecx,  DWORD PTR [esp+8]        ; (pMuxed)
    mov     eax,  DWORD PTR [esp+4]        ; (pDemuxed)
    movdqa  xmm0, XMMWORD PTR [mask]       ; load 'mask' into XMM register
    and     edx,  7                        ; count % 8
    sub     ecx,  eax
    inc     edx

 Demux:
    movdqa  xmm1, XMMWORD PTR [ecx+eax]    ; load next 8 shorts from input array
    pshufb  xmm1, xmm0                     ; re-shuffle them
    movdqa  XMMWORD PTR [eax], xmm1        ; store these 8 shorts in output array

    add     eax, 16                        ; increment pointer
    dec     edx                            ; decrement counter...
    jne     Demux                          ;  and keep looping if necessary

(我编写了这段代码,假设输入和输出数组都在16字节的边界上对齐,这允许使用对齐的负载和存储。在旧处理器上,这将比未对齐负载更快;在新一代处理器上,对未对齐负载的惩罚几乎不存在。这在C/C++中很容易确保和执行,但我不确定您是如何在C#调用程序中为这些数组分配内存的。如果您控制分配,那么您应该能够控制对齐。如果不是,或者您只针对不惩罚未对齐负载的后期处理器,则可以更改代码以执行未对齐负载。使用_mm_storeu_si128_mm_loadu_si128本质,这将导致发出MOVDQU指令,而不是MOVDQA。)

循环中只有3个SIMD指令,所需的循环开销最小。这应该是相对较快的,虽然几乎可以肯定有办法使它更快。

一个重要的优化将是避免重复加载和存储数据。特别是要避免将输出存储到输出数组中。取决于您将如何处理被取消的输出,将其留在SSE寄存器中并在那里工作将更有效。但是,这与托管代码不能很好地互操作(如果有的话),因此如果必须将结果传递给C#调用方,则会受到很大的限制。

要编写真正有效的SIMD代码,您需要有一个高的计算负载/存储比。换句话说,您希望对加载和存储之间的数据进行大量操作。在这里,您只对加载和存储之间的数据执行一次操作(洗牌)。不幸的是,除非您能够插入后续的“处理”代码,否则无法避免这种情况。解调只需要一个操作,这意味着您的瓶颈将不可避免地是读取输入和写入输出所需的时间。

另一个可能的优化是手动展开循环。但这会带来许多潜在的复杂情况,并要求您了解输入的性质。如果输入数组通常是短的,展开就没有意义了。如果它们有时是短的,有时是长的,展开可能仍然没有意义,因为您必须显式地处理输入数组短的情况,提前脱离循环。如果输入数组总是相当长,那么展开可能是性能的胜利。但不一定;正如上面提到的,这里的循环开销非常小。

如果需要根据每个信号的帧数和值数进行参数化,则很可能需要编写多个例程。或者,至少是各种不同的mask。这将大大增加代码的复杂性,从而增加维护成本(以及潜在的性能,因为您需要的指令和数据不太可能出现在缓存中),所以除非您真的可以做一些比C++编译器更优化的事情,否则您应该考虑让编译器生成代码。

票数 5
EN

Stack Overflow用户

发布于 2016-12-14 15:18:58

好消息!您不需要多个核心、SIMD或花哨的包来解决这个问题。你甚至不需要打电话给C。

您的瓶颈是内存带宽,因为您正在低效率地使用它。

使用该CPU,您的内存可能足够快,可以使用一个核心来解出大于3GB/s的内存,但是每次您需要从RAM中获取一个示例时,CPU将获取64个字节来填充缓存行,而您只使用其中的2个字节。这64个字节将在缓存中停留一段时间,其他一些线程可能会使用其中的一些,但是访问模式仍然非常糟糕。

你真正需要做的就是好好利用这64个字节。有很多种方法。例如:

1)在C#中尝试一个简单的循环。从头到尾运行您的输入缓冲区,将您遇到的每个示例放在它所属的位置。这将使用所有64个字节,每次您在读取中填充缓存行,并且您的16K输出通道足够少,您正在写入的块大部分将保持缓存。这可能够快了。

2)继续调用您的C函数,但以2MB块处理输入缓冲区,而不必费心使用多个核。这2MB块中的每一个都很小,可以一直缓存直到完成为止。这可能会比(1)快一点。

3)如果上面的速度不够快(可能很接近),那么您可以使用多线程。使用方法(2),但对块进行并行处理。这样,每个核心就可以完成整个2MB块,充分利用它的缓存,并且它们不会相互竞争。最多使用4个线程,否则您可能再次开始强调缓存。如果您确实需要使用4个以上的线程,那么将工作划分得更精细一些,在每个2MB块中分成1024个通道。但你不需要这么做。

编辑:

哦,对不起--选项(1)很难在不安全的C#中实现,因为每个fixed语句只修复几个指针,而且使用托管数组太慢了。但是,选项(2)在不安全的C#中很容易,而且仍然工作正常。我写了一个测试:

代码语言:javascript
复制
public static unsafe void doit(short[] inarray, short[][] demux, int nSignals, int nSamples)
{
    fixed (short *pin=inarray)
    {
        for(int block=0; block < nSamples; block+=64)
        {
            for(int sig=0; sig<nSignals; ++sig)
            {
                fixed(short *psig = demux[sig])
                {
                    short* s = pin + block * nSignals + sig;
                    short* d = psig + block;
                    short* e = d + Math.Min(64, nSamples - block);
                    while(d<e)
                    {
                        *d++ = *s;
                        s += nSignals;
                    }
                }
            }
        }
    }
}
public static void Main()
{
    int nSignals = 16384;
    int nSamples = 20000;

    short[][] demux = new short[nSignals][];
    for (int i = 0; i < demux.Length; ++i)
    {
        demux[i] = new short[nSamples];
    }
    short[] mux = new short[nSignals * nSamples];
    //warm up
    doit(mux, demux, nSignals, nSamples);
    doit(mux, demux, nSignals, nSamples);
    doit(mux, demux, nSignals, nSamples);
    //time it
    var watch = System.Diagnostics.Stopwatch.StartNew();
    doit(mux, demux, nSignals, nSamples);
    watch.Stop();
    Console.WriteLine("Time (ms): " + watch.ElapsedMilliseconds);
}

那是一秒的数据,在我的盒子上写着:

代码语言:javascript
复制
Time (ms): 724

嗯..。这比实时好,但速度不是实时的两倍。你的盒子看起来比我的快一点,所以也许没问题。让我们尝试并行版本(3)。主要职能是相同的:

代码语言:javascript
复制
public static unsafe void dopart(short[] inarray, short[][] demux, int offset, int nSignals, int nSamples)
{
    fixed (short* pin = inarray)
    {
        for (int block = 0; block < nSamples; block += 64)
        {
            for (int sig = 0; sig < nSignals; ++sig)
            {
                fixed (short* psig = demux[sig])
                {
                    short* s = pin + (offset + block) * nSignals + sig;
                    short* d = psig + offset + block;
                    short* e = d + Math.Min(64, nSamples - block);
                    while (d < e)
                    {
                        *d++ = *s;
                        s += nSignals;
                    }
                }
            }
        }
    }
}
public static unsafe void doit(short[] inarray, short[][] demux, int nSignals, int nSamples)
{
    int steps = (nSamples + 1023) / 1024;
    ParallelOptions options = new ParallelOptions();
    options.MaxDegreeOfParallelism = 4;
    Parallel.For(0, steps, options, step => {
        int offset = (int)(step * 1024);
        int size = Math.Min(1024, nSamples - offset);
        dopart(inarray, demux, offset, nSignals, size);
    });
}

更好的是:

代码语言:javascript
复制
Time (ms): 283

在这种情况下,读取和写入的数据量约为4.6GB/s,这比理论上的最大6.4GB/s略差一些,而且我只有4个真正的核心,所以我可能可以通过调用C来稍微降低一些数据,但是没有太多的改进余地。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41138596

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档