多线程---解析无锁队列的原理与实现_多线程无锁队列-程序员宅基地

技术标签: C/C++  c++  多线程  后台开发  

前言

为什么需要无锁队列呢?我们知道,多核心优化是现在游戏开发的一个重点课题,无论是工程实践也好,研究算法也罢,将工作并行化交由多线程去做是一个非常普遍的场景。对于这种场景,我们通常会采用线程池+命令队列的方式去实现,其中的命令队列就会使用互斥锁或是无锁队列。并且由于命令队列的读写是较轻量级的操作,采用无锁队列的性能要高于有锁的操作。因此,实现无锁队列等无锁数据结构,可以看作是迈入多线程编程的基石。

推荐视频:https://www.bilibili.com/video/BV1354y1b7nz/

问题描述

无锁队列的典型应用场景是同时存在单(多)线程写入与单(多)线程读取。那么,为什么原始的队列会存在问题呢?我们来看原始队列的入队与出队伪代码(省略了边界判断)

struct Node{
    
    void *data;
    Node *next;
};

void Enqueue(Node *node){
    
    m_Tail->next = node;
    m_Tail = node;
}

Node* Dequeue(){
    
    Node * res = m_Head;
    m_Head = m_Head->next;
    return res;
}

入队与出队皆有两步操作,如果存在两个以上相同的线程同时进行写入或读取,便可能会出现在完成了第一步操作后,其他线程修改了Head或Tail指针,导致完全无法预料的结果。例如下列情况,两个线程同时写入,导致Tail指针失去与队列的链接,后加的节点从Head开始就访问不到了。

//           线程A                     线程B                   实际情况
                                                       nodeC            tail = nodeC
      head->next = nodeA                               nodeC -> nodeA   tail = nodeC
                               tail->next = nodeB      nodeC -> nodeB   tail = nodeC
                               tail = nodeB            nodeC -> nodeB   tail = nodeB
      tail = nodeA                                     nodeC -> nodeB   tail = nodeA

解决方法是在读写之前加锁,确保同一时间只有一个线程在进行读写,或是使用CPU提供的原子操作(atomic operation),一次性完成对Head或Tail指针的读写,实现无锁同步。

原子操作

在质子中子发现之前,人们认为原子就是世界上最基本的粒子了,原子一词便有了“不可分割”的含义。顾名思义,原子操作就是指不可分割的操作,CPU的一个线程在执行原子操作时,不会被其他线程中断或抢占。

典型的原子操作及示意代码如下:

  • Load / Store: 读取与保存。
  • Test and Set:针对bool变量,如果为true则返回true,如果为false,则将变量置为true并返回false。
bool TestAndSet(bool * flag){
    
    bool res = *flag;
    *flag = true;
    return res;
}
  • Clear: 将bool变量设为false。
  • Exchange:将指定位置的值设置为传入值,并返回其旧值。
template <typename T>
T Exchange(T* addr, const T& newVal){
    
    T oldVal = *addr;
    *addr = newVal;
    return oldVal;
}

Compare And Swap(CAS):将指定位置的值与期望值比较,如果相等则赋值为新值,如果不等则将期望值设置为自身。返回是否设置成功。

template <typename T>
bool CompareAndSwap(T* addr, T& expected, const T& desired){
    
    if(*addr == expected){
    
        *addr = desired;
        return true;
    }
    expected = *addr;
    return false;
}
  • 注意CAS有weak和strong两种,weak版本在某些情况下即使满足条件也会返回false,所以一般只会在循环判断中使用,其他情况都使用strong版本。
  • Fetch And 加减乘除系列:对指定位置的值使用传入参数执行加减乘除,并返回旧值。

这里使用最简单的i++操作来比较说明一下为什么需要原子操作,C++中一条i++语句,会生成三条汇编指令。

int i = 0;
i++;

mov         eax,dword ptr [i]   // 将i加载到eax寄存器
add         eax,1  // eax中的值加一
mov         dword ptr [i],eax  // 将eax中的值赋值到i的地址

如果有两个线程同时对一个变量i=0执行i++操作,最终结果很可能是1而不是2,因为多线程并行时,i的值会加载到不同的寄存器,然后分别对寄存器中的值加一并取出,导致落后的线程覆盖了领先线程的结果。这种现象被称为竞争条件(Race condition)。如果我们使用windows提供的原子自增函数_InterlockedIncrement,生成的汇编代码如下

int i = 0;
_InterlockedIncrement((volatile long *)&i);

mov         eax,1     // eax加载1
lock xadd   dword ptr [i],eax   // xadd的作用是交换两个操作数的值,并将相加结果保存到前者
// 完成对i所在地址数据的自增操作

一条指令便完成了i++的操作,并且lock指令还会锁定操作的内存地址,避免了可能存在的竞争条件。(同时也可以看出,原子操作在CPU内部的实现还是加锁,不过是硬件层面的加锁,开销较小)

使用CAS实现无锁队列

现在我们开始实现无锁队列吧。先定义数据结构

#pragma once
#include <windows.h>
#include <windef.h>
#include <intrin.h>
#include <emmintrin.h>

using AtomicWord = intptr_t;

struct AtomicNode
{
    
    volatile AtomicWord _next;
    void* data;
};

class AtomicQueue
{
    
    volatile AtomicWord _tail;
    volatile AtomicWord _head;
public:
    AtomicQueue();
    ~AtomcQueue();
    void Enqueue(AtomicNode* node);
    AtomicNode* Dequeue();
}

使用intptr_t保存我们的指针变量,每个节点使用指针保存数据,队列包含一个头指针和一个尾指针。会用到两种原子操作(PS:本文基于64位环境,指针长度为64位,如需在32位环境下使用请自行替换为32位版本函数):

static inline AtomicWord AtomicExchangeExplicit(volatile AtomicWord* p, AtomicWord val)
{
    
    return (AtomicWord)_InterlockedExchange64((volatile LONGLONG*)p, (LONGLONG)val);
}

static inline bool AtomicCompareExchangeStrongExplicit(volatile AtomicWord* p, AtomicWord* oldval, AtomicWord newval)
{
    
    return _InterlockedCompareExchange64((volatile LONGLONG*)p, (LONGLONG)newval, (LONGLONG)*oldval) != 0;
}

我们使用一个dummy节点,这样可以省去许多的边界判断

AtomicQueue() {
    
        AtomicNode* dummy = new AtomicNode();
        dummy->_next = 0;
        _tail = (AtomicWord)dummy;
        _head = (AtomicWord)dummy;
    }
    ~AtomicQueue() {
    
        AtomicNode* dummy = (AtomicNode*)_head;
        delete dummy;
    }

入队使用Exchange,出队使用CAS自旋

 void Enqueue(AtomicNode* node) {
    
        AtomicNode* prev;
        node->_next = 0;
        prev = (AtomicNode*)AtomicExchangeExplicit(&_tail, (AtomicWord)node);
        prev->_next = (AtomicWord)node;
    }

    AtomicNode* Dequeue() {
    
        AtomicNode* res, * next;
        void* data;
        AtomicWord head = _head;
        AtomicWord newHead;
        do
        {
    
            // 出队的时候最后剩下的是最后一次入队的节点
            res = (AtomicNode*)head;
            next = (AtomicNode*)res->_next;
            if (next == nullptr)
                return nullptr;
            data = next->data;
            newHead = (AtomicWord)next;
            //  比较_head指针是否是我们之前获取的,成功则设置newHead,失败则自旋
        } while (!AtomicCompareExchangeStrongExplicit(&_head, &head, newHead));

        res->data = data;
        return res;
    }

从原理上来说并不难理解,入队就是使用新的节点与原来的尾节点交换,出队就是使用CAS判断我们缓存的头节点是否与队列头节点相同(不同的话说明被其他线程修改了)。

不过我们的Head与Tail指针都加了volatile关键字,这是什么意思呢?

volatile与常量优化

C++编译器会为我们做许多优化,但某些时候这些优化会造成意外的结果。我们在出队时使用了循环判断

do{
    
    res = head;
    newHead = res->next;
}
while(!CAS(_head, head, newHead));

也就是说我们一直在判断_head 是否等于head,而head最初也是等于_head的。编译器并不知道可能有另外的线程在修改_head的值,因此可能会将_head与head的比较优化掉,只从内存中读取一次_head的值存放进寄存器,随后便一直使用寄存器中的数据,使得我们的自旋等待失效。这便是常量优化。

常量优化的原因是寄存器的读写速度远高于内存,编译器会减少从内存读取数据的次数。而volatile关键字就是告诉编译器,不要对这个变量进行常量优化,每次都去内存中读取。

ABA问题

虽然上面我们已经实现了可用的无锁队列,但还有一个潜在的问题没有解决,那就是ABA问题。什么是ABA问题呢,照样用一个例子来说明

//         线程A                    线程B                        队列
                                                       A(0x114)->B(0x514)  Head = B
   head = B, newhead = C                            
                                出队B, delete B         
                                出队A, delete A
                                new D(0x514), 入队D
                                new E(0x810), 入队E    E(0x810)->D(0x514)  Head = D
CAS(Head, head, newhead)通过                           E(0x810)->D(0x514)  Head = C

因为我们使用CAS比较的是地址,而使用new和delete管理内存,内存地址在删除后可能被操作系统回收后重新分配,这样就会出现先获取了Head地址,在CAS的时候由于新的Head是回收后重新分配出来的相同地址,比较通过,但指向地址的内容却不同,因而出错的情况。

解决ABA问题有两种思路,一种是使用环形缓冲,其实就是预先分配了内存的数组,将Head和Tail指针替换为下标来移动,避免内存重复分配。另一种则是使用Double CAS。

Double CAS

Double CAS的思想是为地址增加引用计数,使用双倍大小的头指针,在原指针后附加一个计数器,每次出队时将计数器加一。这样即使出现ABA问题,由于计数器对不上,CAS也就不会通过了。

//         线程A                         线程B                        队列
                                                            A(0x114)->B(0x514)  Head = (B,0)
   head = (B,0), newhead = (C,1)                            
                                     出队B, delete B         
                                     出队A, delete A
                                     new D(0x514), 入队D
                                     new E(0x810), 入队E    E(0x810)->D(0x514)  Head = (D,2)
   CAS(Head, head, newhead) 失败   

Double CAS 无锁队列

PS:下面实现基于64位环境,如需在32位环境使用请替换为64位版函数

// 必须16字节对齐,否则_InterlockedCompareExchange128会报错
struct alignas(16) AtomicWord2
{
    
    AtomicWord lo, hi;
};

static inline bool AtomicCompareExchangeStrongExplicit(volatile AtomicWord2* p, AtomicWord2* oldval, AtomicWord2 newval)
{
    
    return _InterlockedCompareExchange128((volatile LONGLONG*)p, (LONGLONG)newval.hi, (LONGLONG)newval.lo, (LONGLONG*)oldval) != 0;
}

static inline AtomicWord2 AtomicExchangeExplicit(volatile AtomicWord2* p, AtomicWord2 newval)
{
    AtomicWord2 oldval;
    oldval.lo = 0;
    oldval.hi = newval.hi - 1;
    // 没有128位的Exchange函数了,只能用CAS封装一下
    while (!AtomicCompareExchangeStrongExplicit(p, &oldval, newval));
    return oldval;
}

修改后的队列操作

class AtomicQueue
{
    
    volatile AtomicWord _tail;
    volatile AtomicWord2 _head;
    ......
}

    AtomicQueue() {
    
        AtomicNode* dummy = new AtomicNode();
        AtomicWord2 w;
        w.lo = (AtomicWord)dummy;
        w.hi = 0;
        dummy->_next = 0;
        _head = w;
        _tail = (AtomicWord)dummy;
    }

    void Enqueue(AtomicNode* node) {
    
        AtomicNode* prev;
        node->_next = 0;
        prev = (AtomicNode*)AtomicExchangeExplicit(&_tail, (AtomicWord)node);
        prev->_next = (AtomicWord)node;
    }

    AtomicNode* Dequeue() {
    
        AtomicNode* res, * next;
        void* data;
        AtomicWord2 head = _head;
        AtomicWord2 newHead;
        do
        {
    
            res = (AtomicNode*)head.lo;
            next = (AtomicNode*)res->_next;
            if (next == nullptr)
                return nullptr;
            data = next->data;
            newHead.lo = (AtomicWord)next;
            newHead.hi = head.hi + 1;

        } while (!AtomicCompareExchangeStrongExplicit(&_head, &head, newHead));

        res->data = data;
        return res;
    }

大功告成!

仍然存在的一些问题

我们使用了dummy节点,这样可以省去一些边界条件的判断,但代价是每次出队的节点并不是入队的那一个,而是数据指针装在前一个入队的节点里出队。因此我们不能太快回收节点使用的内存,否则就会造成访问冲突。比较好的方法是使用对象池来缓存节点,不够用的时候就申请新节点,每次出队使用完成后将旧节点放回池中等待下一次使用。

结语

实现了无锁队列,只是迈向多线程开发的第一步。一通研究下来,最大的感触就是计算机的基础知识真的是非常重要,尤其是计算机体系结构、操作系统等方向的知识。

整理了一些最新LinuxC/C++服务器开发/架构师面试题、学习资料、教学视频和学习路线脑图(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享有需要的可以自行添加学习交流群960994558或VX:lingsheng_1314领取!~

在这里插入图片描述

希望这篇文章能够帮到你,如果有写的不对的地方,也欢迎在评论区指出和交流。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_52622200/article/details/115487219

智能推荐

什么是内部类?成员内部类、静态内部类、局部内部类和匿名内部类的区别及作用?_成员内部类和局部内部类的区别-程序员宅基地

文章浏览阅读3.4k次,点赞8次,收藏42次。一、什么是内部类?or 内部类的概念内部类是定义在另一个类中的类;下面类TestB是类TestA的内部类。即内部类对象引用了实例化该内部对象的外围类对象。public class TestA{ class TestB {}}二、 为什么需要内部类?or 内部类有什么作用?1、 内部类方法可以访问该类定义所在的作用域中的数据,包括私有数据。2、内部类可以对同一个包中的其他类隐藏起来。3、 当想要定义一个回调函数且不想编写大量代码时,使用匿名内部类比较便捷。三、 内部类的分类成员内部_成员内部类和局部内部类的区别

分布式系统_分布式系统运维工具-程序员宅基地

文章浏览阅读118次。分布式系统要求拆分分布式思想的实质搭配要求分布式系统要求按照某些特定的规则将项目进行拆分。如果将一个项目的所有模板功能都写到一起,当某个模块出现问题时将直接导致整个服务器出现问题。拆分按照业务拆分为不同的服务器,有效的降低系统架构的耦合性在业务拆分的基础上可按照代码层级进行拆分(view、controller、service、pojo)分布式思想的实质分布式思想的实质是为了系统的..._分布式系统运维工具

用Exce分析l数据极简入门_exce l趋势分析数据量-程序员宅基地

文章浏览阅读174次。1.数据源准备2.数据处理step1:数据表处理应用函数:①VLOOKUP函数; ② CONCATENATE函数终表:step2:数据透视表统计分析(1) 透视表汇总不同渠道用户数, 金额(2)透视表汇总不同日期购买用户数,金额(3)透视表汇总不同用户购买订单数,金额step3:讲第二步结果可视化, 比如, 柱形图(1)不同渠道用户数, 金额(2)不同日期..._exce l趋势分析数据量

宁盾堡垒机双因素认证方案_horizon宁盾双因素配置-程序员宅基地

文章浏览阅读3.3k次。堡垒机可以为企业实现服务器、网络设备、数据库、安全设备等的集中管控和安全可靠运行,帮助IT运维人员提高工作效率。通俗来说,就是用来控制哪些人可以登录哪些资产(事先防范和事中控制),以及录像记录登录资产后做了什么事情(事后溯源)。由于堡垒机内部保存着企业所有的设备资产和权限关系,是企业内部信息安全的重要一环。但目前出现的以下问题产生了很大安全隐患:密码设置过于简单,容易被暴力破解;为方便记忆,设置统一的密码,一旦单点被破,极易引发全面危机。在单一的静态密码验证机制下,登录密码是堡垒机安全的唯一_horizon宁盾双因素配置

谷歌浏览器安装(Win、Linux、离线安装)_chrome linux debian离线安装依赖-程序员宅基地

文章浏览阅读7.7k次,点赞4次,收藏16次。Chrome作为一款挺不错的浏览器,其有着诸多的优良特性,并且支持跨平台。其支持(Windows、Linux、Mac OS X、BSD、Android),在绝大多数情况下,其的安装都很简单,但有时会由于网络原因,无法安装,所以在这里总结下Chrome的安装。Windows下的安装:在线安装:离线安装:Linux下的安装:在线安装:离线安装:..._chrome linux debian离线安装依赖

烤仔TVの尚书房 | 逃离北上广?不如押宝越南“北上广”-程序员宅基地

文章浏览阅读153次。中国发达城市榜单每天都在刷新,但无非是北上广轮流坐庄。北京拥有最顶尖的文化资源,上海是“摩登”的国际化大都市,广州是活力四射的千年商都。GDP和发展潜力是衡量城市的数字指...

随便推点

java spark的使用和配置_使用java调用spark注册进去的程序-程序员宅基地

文章浏览阅读3.3k次。前言spark在java使用比较少,多是scala的用法,我这里介绍一下我在项目中使用的代码配置详细算法的使用请点击我主页列表查看版本jar版本说明spark3.0.1scala2.12这个版本注意和spark版本对应,只是为了引jar包springboot版本2.3.2.RELEASEmaven<!-- spark --> <dependency> <gro_使用java调用spark注册进去的程序

汽车零部件开发工具巨头V公司全套bootloader中UDS协议栈源代码,自己完成底层外设驱动开发后,集成即可使用_uds协议栈 源代码-程序员宅基地

文章浏览阅读4.8k次。汽车零部件开发工具巨头V公司全套bootloader中UDS协议栈源代码,自己完成底层外设驱动开发后,集成即可使用,代码精简高效,大厂出品有量产保证。:139800617636213023darcy169_uds协议栈 源代码

AUTOSAR基础篇之OS(下)_autosar 定义了 5 种多核支持类型-程序员宅基地

文章浏览阅读4.6k次,点赞20次,收藏148次。AUTOSAR基础篇之OS(下)前言首先,请问大家几个小小的问题,你清楚:你知道多核OS在什么场景下使用吗?多核系统OS又是如何协同启动或者关闭的呢?AUTOSAR OS存在哪些功能安全等方面的要求呢?多核OS之间的启动关闭与单核相比又存在哪些异同呢?。。。。。。今天,我们来一起探索并回答这些问题。为了便于大家理解,以下是本文的主题大纲:[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JCXrdI0k-1636287756923)(https://gite_autosar 定义了 5 种多核支持类型

VS报错无法打开自己写的头文件_vs2013打不开自己定义的头文件-程序员宅基地

文章浏览阅读2.2k次,点赞6次,收藏14次。原因:自己写的头文件没有被加入到方案的包含目录中去,无法被检索到,也就无法打开。将自己写的头文件都放入header files。然后在VS界面上,右键方案名,点击属性。将自己头文件夹的目录添加进去。_vs2013打不开自己定义的头文件

【Redis】Redis基础命令集详解_redis命令-程序员宅基地

文章浏览阅读3.3w次,点赞80次,收藏342次。此时,可以将系统中所有用户的 Session 数据全部保存到 Redis 中,用户在提交新的请求后,系统先从Redis 中查找相应的Session 数据,如果存在,则再进行相关操作,否则跳转到登录页面。此时,可以将系统中所有用户的 Session 数据全部保存到 Redis 中,用户在提交新的请求后,系统先从Redis 中查找相应的Session 数据,如果存在,则再进行相关操作,否则跳转到登录页面。当数据量很大时,count 的数量的指定可能会不起作用,Redis 会自动调整每次的遍历数目。_redis命令

URP渲染管线简介-程序员宅基地

文章浏览阅读449次,点赞3次,收藏3次。URP的设计目标是在保持高性能的同时,提供更多的渲染功能和自定义选项。与普通项目相比,会多出Presets文件夹,里面包含着一些设置,包括本色,声音,法线,贴图等设置。全局只有主光源和附加光源,主光源只支持平行光,附加光源数量有限制,主光源和附加光源在一次Pass中可以一起着色。URP:全局只有主光源和附加光源,主光源只支持平行光,附加光源数量有限制,一次Pass可以计算多个光源。可编程渲染管线:渲染策略是可以供程序员定制的,可以定制的有:光照计算和光源,深度测试,摄像机光照烘焙,后期处理策略等等。_urp渲染管线

推荐文章

热门文章

相关标签