首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >优化未对齐的SSE2/AVX2异或

优化未对齐的SSE2/AVX2异或
EN

Stack Overflow用户
提问于 2013-07-25 00:26:02
回答 1查看 1.3K关注 0票数 5

在我的代码中,我必须处理websocket数据包的“去屏蔽”,这本质上意味着对任意长度的未对齐数据进行XOR运算。多亏了SO (Websocket data unmasking / multi byte xor),我已经发现了如何(希望)使用SSE2/AVX2扩展来加速这一过程,但现在看起来,我对未对齐数据的处理完全是次优的。有没有什么方法可以优化我的代码,或者至少在性能相同的情况下让它变得更简单,或者我的代码已经是性能最好的了?

以下是代码的重要部分(对于这个问题,我假设数据总是至少足够运行一次AVX2周期,但同时它最多只运行几次):

代码语言:javascript
复制
// circular shift left for uint32
int cshiftl_u32(uint32_t num, uint8_t shift) {
   return (num << shift) | (num >> (32 - shift));                                                                       
}                                                                                                                     

// circular shift right for uint32
int cshiftr_u32(uint32_t num, uint8_t shift) {
   return (num >> shift) | (num << (32 - shift));                                                                       
}                                                                                                                     

void optimized_xor_32( uint32_t mask, uint8_t *ds, uint8_t *de ) {
   if (ds == de) return; // zero data len -> nothing to do

   uint8_t maskOffset = 0;

// process single bytes till 4 byte alignment ( <= 3 )
   for (; ds < de && ( (uint64_t)ds & (uint64_t)3 ); ds++) {
      *ds ^= *((uint8_t *)(&mask) + maskOffset);
      maskOffset = (maskOffset + 1) & (uint8_t)3;
   }

   if (ds == de) return; // done, return

   if (maskOffset != 0) { // circular left-shift mask around so it works for other instructions
      mask = cshiftl_u32(mask, maskOffset);

      maskOffset = 0;
   }

// process 4 byte block till 8 byte alignment ( <= 1 )
   uint8_t *de32 = (uint8_t *)((uint64_t)de & ~((uint64_t)31));

   if ( ds < de32 && ( (uint64_t)de & (uint64_t)7 ) ) {
      *(uint32_t *)ds ^= mask; // mask is uint32_t

      if (++ds == de) return;
   }

// process 8 byte block till 16 byte alignment ( <= 1 )
   uint64_t mask64 = mask | (mask << 4);
   uint8_t *de64 = (uint8_t *)((uint64_t)de & ~((uint64_t)63));

   if ( ds < de64 && ( (uint64_t)ds & (uint64_t)15 ) ) {
      *(uint64_t *)ds ^= mask64;

      if (++ds == de) return; // done, return
   }


// process 16 byte block till 32 byte alignment ( <= 1) (if supported)
#ifdef CPU_SSE2 
   __m128i v128, v128_mask;
   v128_mask = _mm_set1_epi32(mask);

   uint8_t *de128 = (uint8_t *)((uint64_t)de & ~((uint64_t)127));

   if ( ds < de128 && ( (uint64_t)ds & (uint64_t)31 ) ) {
      v128 = _mm_load_si128((__m128i *)ds);
      v128 = _mm_xor_si128(v128, v128_mask);
      _mm_store_si128((__m128i *)ds, v128);

      if (++ds == de) return; // done, return
   }

#endif
#ifdef CPU_AVX2 // process 32 byte blocks (if supported -> haswell upwards)
   __m256i v256, v256_mask;
   v256_mask = _mm256_set1_epi32(mask);

   uint8_t *de256 = (uint8_t *)((uint64_t)de & ~((uint64_t)255));

   for (; ds < de256; ds+=32) {
      v256 = _mm256_load_si256((__m256i *)ds);
      v256 = _mm256_xor_si256(v256, v256_mask);
      _mm256_store_si256((__m256i *)ds, v256);
   }

   if (ds == de) return; // done, return
#endif
#ifdef CPU_SSE2 // process remaining 16 byte blocks (if supported)
   for (; ds < de128; ds+=16) {
      v128 = _mm_load_si128((__m128i *)ds);
      v128 = _mm_xor_si128(v128, v128_mask);
      _mm_store_si128((__m128i *)ds, v128);
   }

   if (ds == de) return; // done, return

#endif
   // process remaining 8 byte blocks 
   // this should always be supported, so remaining can be assumed to be executed <= 1 times
   for (; ds < de64; ds += 8) {
      *(uint64_t *)ds ^= mask64;
   }

   if (ds == de) return; // done, return

   // process remaining 4 byte blocks ( <= 1)
   if (ds < de32) {
      *(uint32_t *)ds ^= mask;

      if (++ds == de) return; // done, return
   }


   // process remaining bytes ( <= 3)

   for (; ds < de; ds ++) {
      *ds ^= *((uint8_t *)(&mask) + maskOffset);
      maskOffset = (maskOffset + 1) & (uint8_t)3;
   }

}

附注:请忽略使用#ifdef而不是cpuid或类似的cpu标志检测。

EN

回答 1

Stack Overflow用户

发布于 2013-09-14 19:38:02

与手册中所说的不同,大多数英特尔处理器实际上非常擅长处理未对齐的数据。由于您使用的是英特尔的编译器内建来进行矢量处理,因此我假设您能够访问相当新版本的icc

如果您不能自然地对齐您的数据,那么我担心您所做的就是尽可能接近最大性能。就使代码在Xeon Phi(64字节向量寄存器)/Future较长的向量处理器上更具可读性和可部署性而言,我建议您开始使用Intel Cilk Plus

示例:

代码语言:javascript
复制
void intel_cilk_xor(uint32_t mask, uint8_t *d, size_t length) {
    while (length & 0x3) {
        *(d++) ^= mask;
        asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left
        length--;
    }

    // switch to 4 bytes per block
    uint32_t _d = d;
    length >>= 2;

    // Intel Cilk Plus Array Notation
    // Should expand automatically to the best possible SIMD instructions
    // you are compiling for
    _d[0:length] ^= mask;
}

请注意,我没有测试此代码,因为我目前无法访问英特尔编译器。如果你遇到问题,我可以在下周回到办公室时再复习一下。

如果你更喜欢内部函数,那么正确使用预处理器宏可以大大简化你的生活:

代码语言:javascript
复制
#if defined(__MIC__)
// intel Xeon Phi
#define VECTOR_BLOCKSIZE 64
// I do not remember the correct types/instructions right now
#error "TODO: MIC handling"
#elif defined(CPU_AVX2)
#define VECTOR_BLOCKSIZE 32
typedef __m256i my_vector_t;
#define VECTOR_LOAD_MASK _mm256_set1_epi32
#define VECTOR_XOR(d, mask) _mm_store_si256(d, _mm256_set1_epi32(_mm256_load_si256(d), mask))
#elif defined(CPU_SSE2) 
#define VECTOR_BLOCKSIZE 16
typedef __m128i my_vector_t;
#define VECTOR_LOAD_MASK _mm128_set1_epi32
#define VECTOR_XOR(d, mask) _mm_store_si128(d, _mm128_set1_epi32(_mm128_load_si128(d), mask))
#else
#define VECTOR_BLOCKSIZE 8
#define VECTOR_LOAD_MASK(mask) ((mask) << 32 | (mask))
#define VECTOR_XOR(d, mask) (*(d)) ^= (mask)
typedef uint64_t my_vector_t;
#fi

void optimized_xor_32( uint32_t mask, uint8_t *d, size_t length ) {
    size_t i;

    // there really is no point in having extra
    // branches for different vector lengths if they are
    // executed at most once
    // branch prediction is your friend here
    // so we do one byte at a time until the block size
    // is reached

    while (length && (d & (VECTOR_BLOCKSIZE - 1))) {
        *(d++) ^= mask;
        asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left
        length--;
    }

    my_vector_t * d_vector = (my_vector_t *)d;
    my_vector_t vector_mask = VECTOR_LOAD_MASK(mask);

    size_t vector_legth = length / VECTOR_BLOCKSIZE; // compiler will optimise this to a bitshift
    length &= VECTOR_BLOCKSIZE -1; // remaining length

    for (i = 0; i < vector_legth; i++) {
      VECTOR_XOR(d_vector + i, vector_mask);
    }

    // process the tail
    d = (uint8_t*)(d_vector + i);
    for (i = 0; i < length; i++) {
      d[i] ^= mask;
      asm ("rold $8, %0" : "+g" (mask) :: "cc");
    }

}

注意:您可能希望使用x86 rotate指令而不是位移位来旋转mask

代码语言:javascript
复制
#define asm_rol(var, bits) asm ("rol %1, %0" : "+r" (var) : "c" ((uint8_t)bits) : "cc")
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/17839685

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档