首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >最快的内装自旋锁

最快的内装自旋锁
EN

Stack Overflow用户
提问于 2012-08-14 19:26:00
回答 5查看 13.7K关注 0票数 26

我正在用c++编写一个多线程应用程序,性能是至关重要的。在复制线程之间的小结构时,我需要使用大量的锁定,为此,我选择了使用自旋锁。

我对此进行了一些研究和速度测试,发现大多数实现的速度大致相同:

  • Microsofts CRITICAL_SECTION,SpinCount设置为1000,得分约140个时间单位。
  • 用Microsofts 这种算法实现InterlockedCompareExchange评分约95个时间单位
  • 我还尝试使用一些类似于__asm {}的内联程序集,比如这段代码,它的记分约为70个时间单位,,但我不确定是否已经创建了适当的内存屏障。

编辑:这里给出的时间是两个线程锁定和解锁自旋锁1,000,000次所需的时间。

我知道这并没有太大的区别,但由于spinlock是一个被大量使用的对象,人们可能会认为程序员已经同意了制作自旋锁的最快方法。然而,谷歌搜索导致了许多不同的方法。我认为如果使用内联程序集和使用指令上述方法 (而不是比较32位寄存器)来实现,那么CMPXCHG8B将是最快的。--此外,还必须考虑到内存障碍,这可以通过锁CMPXHG8B (我认为?)来实现,它保证内核之间共享内存的“独占权限”。最后,有人建议,对于繁忙的等待应该伴随着NOP:REP,这将使超线程处理器能够切换到另一个线程,但我不确定这是否正确?

从我对不同自旋锁的性能测试中可以看出,两者之间并没有太大的差别,但为了纯粹的学术目的,我想知道哪一个是最快的。但是,由于我在汇编语言和内存屏障方面的经验非常有限,如果有人能为我在以下模板中提供的锁CMPXCHG8B和适当的内存屏障的最后一个示例编写汇编代码,我会很高兴的:

代码语言:javascript
复制
__asm
{
     spin_lock:
         ;locking code.
     spin_unlock:
         ;unlocking code.
}
EN

回答 5

Stack Overflow用户

回答已采纳

发布于 2012-08-14 19:36:12

看看这里:使用cmpxchg的x86自旋锁

感谢科里·纳尔逊

代码语言:javascript
复制
__asm{
spin_lock:
xorl %ecx, %ecx
incl %ecx
spin_lock_retry:
xorl %eax, %eax
lock; cmpxchgl %ecx, (lock_addr)
jnz spin_lock_retry
ret

spin_unlock:
movl $0 (lock_addr)
ret
}

另一个消息来源说:http://www.geoffchappell.com/studies/windows/km/cpu/cx8.htm

代码语言:javascript
复制
       lock    cmpxchg8b qword ptr [esi]
is replaceable with the following sequence

try:
        lock    bts dword ptr [edi],0
        jnb     acquired
wait:
        test    dword ptr [edi],1
        je      try
        pause                   ; if available
        jmp     wait

acquired:
        cmp     eax,[esi]
        jne     fail
        cmp     edx,[esi+4]
        je      exchange

fail:
        mov     eax,[esi]
        mov     edx,[esi+4]
        jmp     done

exchange:
        mov     [esi],ebx
        mov     [esi+4],ecx

done:
        mov     byte ptr [edi],0

下面是关于无锁和锁实现的讨论:http://newsgroups.derkeiler.com/Archive/Comp/comp.programming.threads/2011-10/msg00009.html

票数 3
EN

Stack Overflow用户

发布于 2012-08-16 04:22:56

虽然已经有了一个被接受的答案,但是有几件事情被忽略了,可以用来改进所有的答案,摘自这篇Intel文章,以上都是快速锁实现。

  1. 在易失性读取(而不是原子指令)上旋转,这避免了不必要的总线锁定,特别是在高度竞争的锁上。
  2. 对竞争激烈的锁使用回退。
  3. 内联锁,最好是内置asm有害的编译器(基本上是MSVC)。
票数 10
EN

Stack Overflow用户

发布于 2012-10-19 17:57:10

我通常不会抱怨有人在努力实现快速代码:这通常是一个非常好的练习,它能更好地理解编程和更快的代码。

我也不会在这里抱怨,但我可以毫不含糊地声明,快速自旋锁3指令长或多几条的问题--至少在x86结构上--是徒劳的追求。

原因如下:

用典型的代码序列调用自旋锁

代码语言:javascript
复制
lock_variable DW 0    ; 0 <=> free

mov ebx,offset lock_variable
mov eax,1
xchg eax,[ebx]

; if eax contains 0 (no one owned it) you own the lock,
; if eax contains 1 (someone already does) you don't

释放自旋锁是很简单的。

代码语言:javascript
复制
mov ebx,offset lock_variable
mov dword ptr [ebx],0

xchg指令在处理器上引发锁销,这实际上意味着我希望在接下来的几个时钟周期中使用总线。这个信号通过缓存和下到最慢的总线控制设备,通常是PCI总线。当每个母线控制装置完成后,锁(锁定确认)信号被发送回来。然后进行实际的交换。问题是锁/locka序列需要很长的时间。PCI总线可以在33 may上运行,有几个延迟周期。在3.3pci CPU上,这意味着每个GHz总线周期需要100个CPU周期。

根据经验,我假设一个锁需要300到3000个CPU周期才能完成,最后我不知道我是否会拥有这个锁。因此,您可以节省的几个周期“快速”自旋锁将是一个海市蜃楼,因为没有锁是一样的下一个,它将取决于您的巴士情况在这短短的时间。

________________EDIT________________

我刚读到自旋锁是一个“大量使用的对象”。显然,您不明白spinlock在每次调用时都会消耗大量的CPU周期。或者,换句话说,每次调用它时,都会丢失大量的处理能力。

当使用自旋锁(或它们更大的兄弟姐妹,关键部分)时的诀窍是尽可能少地使用它们,同时仍然实现预期的程序功能。在整个地方使用它们是很容易的,结果你的表现会很糟糕。

这不仅仅是编写快速代码,还包括组织您的数据。当您编写“在线程之间复制小结构”时,您应该意识到锁可能需要比实际复制花费数百倍的时间完成。

________________EDIT________________

当您计算平均锁定时间时,它可能会说得很少,因为它是在您的机器上测量的,这可能不是预期的目标(这可能具有完全不同的总线使用特性)。对于您的机器,平均将由个别非常快的时间(当总线控制活动没有干扰)一直到非常慢的时候(当总线控制干扰是显着的)。

您可以引入确定最快和最慢的情况的代码,并计算商数,以了解自旋锁时间的变化有多大。

________________EDIT________________

2016年5月更新。

Peter提出了这样一种观点,即“在非竞争情况下调整锁是有意义的”,并且在现代CPU上不会出现数百个时钟周期的锁定时间,除非锁变量不对齐。我开始怀疑,我的以前的测试程序 --用32位Watcom C编写的--在64位操作系统上运行时是否会受到WOW64的阻碍: Windows 7。

所以我写了一个64位的程序,用TDM的gcc 5.3编写了它.该程序使用隐式总线锁定指令变量"XCHG,m“进行锁定,并使用简单的赋值"MOV,r”解锁。在某些锁变体中,对锁变量进行了预测试,以确定是否可以尝试锁(使用简单的比较"CMP r,m",可能不会在L3之外冒险)。下面是:

代码语言:javascript
复制
// compiler flags used:

// -O1 -m64 -mthreads -mtune=k8 -march=k8 -fwhole-program -freorder-blocks -fschedule-insns -falign-functions=32 -g3 -Wall -c -fmessage-length=0

#define CLASSIC_BUS_LOCK
#define WHILE_PRETEST
//#define SINGLE_THREAD

typedef unsigned char      u1;
typedef unsigned short     u2;
typedef unsigned long      u4;
typedef unsigned int       ud;
typedef unsigned long long u8;
typedef   signed char      i1;
typedef          short     i2;
typedef          long      i4;
typedef          int       id;
typedef          long long i8;
typedef          float     f4;
typedef          double    f8;

#define usizeof(a) ((ud)sizeof(a))

#define LOOPS 25000000

#include <stdio.h>
#include <windows.h>

#ifndef bool
typedef signed char bool;
#endif

u8 CPU_rdtsc (void)
{
  ud tickl, tickh;
  __asm__ __volatile__("rdtsc":"=a"(tickl),"=d"(tickh));
  return ((u8)tickh << 32)|tickl;
}

volatile u8 bus_lock (volatile u8 * block, u8 value)
{
  __asm__ __volatile__( "xchgq %1,%0" : "=r" (value) : "m" (*block), "0" (value) : "memory");

  return value;
}

void bus_unlock (volatile u8 * block, u8 value)
{
  __asm__ __volatile__( "movq %0,%1" : "=r" (value) : "m" (*block), "0" (value) : "memory");
}

void rfence (void)
{
  __asm__ __volatile__( "lfence" : : : "memory");
}

void rwfence (void)
{
  __asm__ __volatile__( "mfence" : : : "memory");
}

void wfence (void)
{
  __asm__ __volatile__( "sfence" : : : "memory");
}

volatile bool LOCK_spinlockPreTestIfFree (const volatile u8 *lockVariablePointer)
{
  return (bool)(*lockVariablePointer == 0ull);
}

volatile bool LOCK_spinlockFailed (volatile u8 *lockVariablePointer)
{
  return (bool)(bus_lock (lockVariablePointer, 1ull) != 0ull);
}

void LOCK_spinlockLeave (volatile u8 *lockVariablePointer)
{
  *lockVariablePointer = 0ull;
}

static volatile u8 lockVariable = 0ull,
                   lockCounter =  0ull;

static volatile i8 threadHold = 1;

static u8 tstr[4][32];    /* 32*8=256 bytes for each thread's parameters should result in them residing in different cache lines */

struct LOCKING_THREAD_STRUCTURE
{
  u8 numberOfFailures, numberOfPreTests;
  f8 clocksPerLock, failuresPerLock, preTestsPerLock;
  u8 threadId;
  HANDLE threadHandle;
  ud idx;
} *lts[4] = {(void *)tstr[0], (void *)tstr[1], (void *)tstr[2], (void *)tstr[3]};

DWORD WINAPI locking_thread (struct LOCKING_THREAD_STRUCTURE *ltsp)
{
  ud n = LOOPS;
  u8 clockCycles;

  SetThreadAffinityMask (ltsp->threadHandle, 1ull<<ltsp->idx);

  while (threadHold) {}

  clockCycles = CPU_rdtsc ();
  while (n)
  {
    Sleep (0);

#ifdef CLASSIC_BUS_LOCK
    while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
    while (1)
    {
      do
      {
        ++ltsp->numberOfPreTests;
      } while (!LOCK_spinlockPreTestIfFree (&lockVariable));

      if (!LOCK_spinlockFailed (&lockVariable)) break;
      ++ltsp->numberOfFailures;
    }
#else
    while (1)
    {
      ++ltsp->numberOfPreTests;
      if (LOCK_spinlockPreTestIfFree (&lockVariable))
      {
        if (!LOCK_spinlockFailed (&lockVariable)) break;
        ++ltsp->numberOfFailures;
      }
    }
#endif
#endif
    ++lockCounter;
    LOCK_spinlockLeave (&lockVariable);

#ifdef CLASSIC_BUS_LOCK
    while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
    while (1)
    {
      do
      {
        ++ltsp->numberOfPreTests;
      } while (!LOCK_spinlockPreTestIfFree (&lockVariable));

      if (!LOCK_spinlockFailed (&lockVariable)) break;
      ++ltsp->numberOfFailures;
    }
#else
    while (1)
    {
      ++ltsp->numberOfPreTests;
      if (LOCK_spinlockPreTestIfFree (&lockVariable))
      {
        if (!LOCK_spinlockFailed (&lockVariable)) break;
        ++ltsp->numberOfFailures;
      }
    }
#endif
#endif
    --lockCounter;
    LOCK_spinlockLeave (&lockVariable);

    n-=2;
  }
  clockCycles = CPU_rdtsc ()-clockCycles;

  ltsp->clocksPerLock =   (f8)clockCycles/           (f8)LOOPS;
  ltsp->failuresPerLock = (f8)ltsp->numberOfFailures/(f8)LOOPS;
  ltsp->preTestsPerLock = (f8)ltsp->numberOfPreTests/(f8)LOOPS;

//rwfence ();

  ltsp->idx = 4u;

  ExitThread (0);
  return 0;
}

int main (int argc, char *argv[])
{
  u8 processAffinityMask, systemAffinityMask;

  memset (tstr, 0u, usizeof(tstr));

  lts[0]->idx = 3;
  lts[1]->idx = 2;
  lts[2]->idx = 1;
  lts[3]->idx = 0;

  GetProcessAffinityMask (GetCurrentProcess(), &processAffinityMask, &systemAffinityMask);

  SetPriorityClass (GetCurrentProcess(), HIGH_PRIORITY_CLASS);
  SetThreadAffinityMask (GetCurrentThread (), 1ull);

  lts[0]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[0], 0, (void *)&lts[0]->threadId);
#ifndef SINGLE_THREAD
  lts[1]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[1], 0, (void *)&lts[1]->threadId);
  lts[2]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[2], 0, (void *)&lts[2]->threadId);
  lts[3]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[3], 0, (void *)&lts[3]->threadId);
#endif

  SetThreadAffinityMask (GetCurrentThread (), processAffinityMask);

  threadHold = 0;

#ifdef SINGLE_THREAD
  while (lts[0]->idx<4u) {Sleep (1);}
#else
  while (lts[0]->idx+lts[1]->idx+lts[2]->idx+lts[3]->idx<16u) {Sleep (1);}
#endif

  printf ("T0:%1.1f,%1.1f,%1.1f\n", lts[0]->clocksPerLock, lts[0]->failuresPerLock, lts[0]->preTestsPerLock);
  printf ("T1:%1.1f,%1.1f,%1.1f\n", lts[1]->clocksPerLock, lts[1]->failuresPerLock, lts[1]->preTestsPerLock);
  printf ("T2:%1.1f,%1.1f,%1.1f\n", lts[2]->clocksPerLock, lts[2]->failuresPerLock, lts[2]->preTestsPerLock);
  printf ("T3:%1.1f,%1.1f,%1.1f\n", lts[3]->clocksPerLock, lts[3]->failuresPerLock, lts[3]->preTestsPerLock);

  printf ("T*:%1.1f,%1.1f,%1.1f\n", (lts[0]->clocksPerLock+  lts[1]->clocksPerLock+  lts[2]->clocksPerLock+  lts[3]->clocksPerLock)/  4.,
                                    (lts[0]->failuresPerLock+lts[1]->failuresPerLock+lts[2]->failuresPerLock+lts[3]->failuresPerLock)/4.,
                                    (lts[0]->preTestsPerLock+lts[1]->preTestsPerLock+lts[2]->preTestsPerLock+lts[3]->preTestsPerLock)/4.);

  printf ("LC:%u\n", (ud)lockCounter);

  return 0;
}

该程序运行在一台基于DELL i5-4310U的计算机上,DDR3-800,2核/2 HTs,2.7GHz和一个普通的L3缓存。

首先,WOW64的影响似乎可以忽略不计。

执行非竞争锁/解锁的单个线程能够每110周期执行一次。调优未竞争的锁是无用的:为增强单个XCHG指令而添加的任何代码只会使其速度变慢。

随着四个HTs轰击锁变量的锁尝试,情况发生了根本的变化。成功实现锁跳转到994个周期所需的时间,其中很大一部分可归因于2.2失败的锁尝试。换句话说,在高争用情况下,必须尝试平均3.2锁才能获得成功的锁。显然,110周期没有变成110*3.2,而是更接近110*9。因此,其他机制在这里发挥作用,就像在旧机器上的测试一样。此外,平均994个周期包括716至1157个周期。

实现预测试的锁变体需要大约95%的周期由最简单的变体(XCHG)更新。他们平均将执行17个CMPs,以发现尝试1.75个锁是可行的,其中1个成功了。我建议使用预测试,这不仅是因为它更快:它对总线锁定机制施加的压力更小(3.2-1.75=1.45 ),尽管它稍微增加了复杂性。

票数 8
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/11959374

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档