计算机求职八股文

亲亲 :smiling_face_with_three_hearts:

1 Like

九大服务架构性能优化方式 - 腾讯云开发者社区 - 腾讯云 (tencent.com)

# 无锁循环队列

内存屏障/原子操作的一种常见应用

单生产单消费

KFIFO

Linux 内核队列改写的 c++ 版本

不需要使用任何原子变量,巧妙应用内存屏障,在读/写结束后才更新头尾指针,保证了当前线程正在读/写的部分对于对家线程不可见

template <class ValueType>
class kfifo {
private:
	ValueType* buffer;	/* the buffer holding the data */
	unsigned int size;	/* the size of the allocated buffer */
	unsigned int in;	/* data is added at offset (in % size) */
	unsigned int out;	/* data is extracted from off. (out % size) */

public:
    kfifo(unsigned int sz) : size((1<<sz)), in(0), out(0) {
        buffer = new ValueType[size];
    }

    kfifo(const kfifo& ) = delete;
    kfifo(const kfifo&& ) = delete;
    void operator = (const kfifo&& ) = delete;

    ~kfifo() {
        delete[] buffer;
    }
    
    bool push(const char *item) {

/*******************两种自旋方式都是正确的************************/
        
// MODE1:        
        std::atomic_thread_fence(std::memory_order_acquire);
        if (size - in + out == 0) {
            return false;
        } // 函数外 while 自旋
// MODE2:        
        // while (size - in + out == 0){
        //     std::atomic_thread_fence(std::memory_order_acquire);

        // } // 函数内 while 自旋

        std::atomic_thread_fence(std::memory_order_acq_rel);

        std::memcpy(buffer+(in & (size - 1)), item, sizeof(ValueType));

        std::atomic_thread_fence(std::memory_order_release);

        ++ in;
        return true;
    }

    bool pop(char *item) {
// MODE1:  
        std::atomic_thread_fence(std::memory_order_acquire);
        if (in == out) {
            return false;
        } // 函数外 while 自旋
// MODE2:  
        // while (in == out){
        //     std::atomic_thread_fence(std::memory_order_acquire);
    
        // } // 函数内 while 自旋

        std::atomic_thread_fence(std::memory_order_acquire);

        std::memcpy(item, buffer+(out & (size - 1)), sizeof(ValueType));

        std::atomic_thread_fence(std::memory_order_acq_rel);

        ++ out;
        return true;
    }
};

如果一次性拷贝的长度较大或是长度不一可以写成以下形式

    unsigned int push(const char *buf, unsigned int len)
    {
        unsigned int l;

        len = min(len, size - in + out);

        std::atomic_thread_fence(std::memory_order_acq_rel);

        l = min(len, size - (in & (size - 1)));
        memcpy(buffer + (in & (size - 1)), buf, l);

        memcpy(buffer, buf + l, len - l);

        std::atomic_thread_fence(std::memory_order_release);

        in += len;

        return len;
    }

    unsigned int pop(char *buf, unsigned int len)
    {
        unsigned int l;

        len = min(len, in - out);

        std::atomic_thread_fence(std::memory_order_acquire);

        l = min(len, size - (out & (size - 1)));
        memcpy(buf, buffer + (out & (size - 1)), l);

        memcpy(buf + l, buffer, len - l);

        std::atomic_thread_fence(std::memory_order_acq_rel);

        out += len;

        return len;
    }

SPSC

使用CAS (compare and swap) 原子操作实现

template <typename T>
class spsc {
    T *data;
    std::atomic<size_t> head{0}, tail{0};
    size_t Cap;
public:
    spsc(size_t siz): Cap(1<<siz) {
        data = new T[Cap];
    }
    spsc(const spsc&) = delete;
    spsc &operator=(const spsc&) = delete;
    spsc &operator=(const spsc&) volatile = delete;

    bool push(const T &val) { 
        
/*************************注释部分为错误版本***********************************/
        // size_t t;
        // do {
        //     t = tail.load(std::memory_order_acquire);
        // } while ((t + 1) & (Cap-1) == head.load(std::memory_order_acquire));

        size_t t = tail.load(std::memory_order_relaxed);
        if ((t + 1) % Cap == head.load(std::memory_order_acquire)) return false;
        std::memcpy(data + t, &val, sizeof(T));
        tail.store((t + 1) & (Cap-1), std::memory_order_release);
        return true;
    }

    bool pop(T &val) { 
        // size_t h;
        // do {
        //     h = head.load(std::memory_order_acquire);
        // } while (h == tail.load(std::memory_order_acquire));
        size_t h = head.load(std::memory_order_relaxed);
        if (h == tail.load(std::memory_order_acquire)) return false;
        std::memcpy(&val, data + h, sizeof(T));
        head.store((h + 1) & (Cap-1), std::memory_order_release);
        return true;
    }
};

多生产多消费

Disruptor

实现原理

4 个位置变量

  • lastRead 最后一个已读内容位置
  • lastWrote 最后一个已写内容位置
  • lastDispatch 最后一个派发给消费者的槽位序号
  • writableSeq 当前可写的槽位序号

记录 readableSeq=lastDispatch+1 表示当前可读槽位序号

他们从小到大依次为:lastRead,readableSeq,lastWrote,writableSeq

实现逻辑为

  • 对于生产者而言,先申请 writableSeq 槽位,更新 writableSeq;写完之后,等待在它之前的槽位都被写完才更新lastWrote

  • 对于消费者而言,先申请 readableSeq 槽位,更新 readableSeq;读完之后,等待在它之前的槽位都被读完才更新lastRead

由以上两点可知

  • lastRead~readableSeq 为正在被消费的部分
  • readableSeq~lastWrote 为已经生产完的部分,对于消费者可见
  • lastWrote~writableSeq 为正在被生产的部分
  • writableSeq~lastRead 为已经消费完的部分,对于生产者可见

因此只有被更新完毕的部分才对对家线程可见

一些优化

  • 首先是将常用的变量用 alignas(64) 进行内存对齐,以防交替冲突

  • 加速取余,将队列大小 N 设置为 2 的次幂的编译时常量,将 x%N 优化为x & (N-1)

  • 避免取余,将 4 个位置变量设置为unsigned int,自动处理溢出

template<class ValueType , size_t N = DefaultRingBufferSize>
class Disruptor
{
public:
    Disruptor() : _lastRead(-1L) , _lastWrote(-1L), _lastDispatch(-1L), _writableSeq(0L) , _stopWorking(false){};
    ~Disruptor() {};

    Disruptor(const Disruptor&) = delete;
    Disruptor(const Disruptor&&) = delete;
    void operator=(const Disruptor&) = delete;

    static_assert(((N > 0) && ((N& (~N + 1)) == N)),
        "RingBuffer's size must be a positive power of 2");

    template<typename T>
    bool push(ValueType& val)
    {
        const uint64_t writableSeq = _writableSeq.fetch_add(1);
        while (writableSeq - _lastRead > N)
        {
            if (_stopWorking.load()) {
                // throw std::runtime_error("writting when stopped disruptor");
                return false;
            }
            std::this_thread::yield();
        }
        std::memcpy(_ringBuf + (writableSeq & (N - 1)), &val, sizeof(ValueType));

        while (writableSeq - 1 != _lastWrote);
        _lastWrote = writableSeq;
        return true;
    };

    bool pop(ValueType& val) {
        const uint64_t readableSeq = _lastDispatch.fetch_add(1) + 1;
        while (readableSeq > _lastWrote)
        {
            if (_stopWorking.load())
            {
                // throw std::runtime_error("reading when stopped disruptor");
                return false; 
            }
            std::this_thread::yield();
        }
        std::memcpy(&val, _ringBuf + (readableSeq & (N-1)), sizeof(ValueType));
        while (readableSeq - 1 != _lastRead);
        _lastRead = readableSeq;
        return true;
    }

    bool empty() {return _writableSeq - _lastRead == 1;}

    //通知 disruptor 停止工作,调用该函数后,若 buffer 已经全部处理完,那么获取可读下标时只会获取到 -1L
    void stop() {_stopWorking.store(true);}

private:
    
    alignas(64) std::atomic_bool _stopWorking;
    
    alignas(64) std::atomic_uint64_t _lastRead;
    
    alignas(64) std::atomic_uint64_t _lastWrote;
    
    alignas(64) std::atomic_uint64_t _lastDispatch;
    
    alignas(64) std::atomic_uint64_t _writableSeq;
    
    ValueType _ringBuf[N];
};

RingBuffer

使用CAS (compare and swap) 操作实现

实现原理

将非原子过程用do while循环包起来,最后的判断设计为一个 CAS 操作。如果 CAS 操作失败,意味着该过程被其他线程抢占,那么重新执行该过程,不断重试直到强占到执行权。

注意,必须先读取再更新指针才能保证并发正确,但考虑到

  • push 时数据拷贝必须放在循环体外,否则强占到执行权的线程写入的数据有可能被其他未强占到执行权的线程修改
  • pop 时在循环体内拷贝数据不会导致错误,但没有强占到执行权的线程会反复的拷贝数据,对 CPU 资源消耗很大

因此引入两个新指针

  • write,表示 push 操作写完的位置
  • read,表示 pop 操作读完的位置

由此可知

  • read~head 部分正在被读
  • head~write 部分可读
  • write~tail 部分正在被写
  • tail~read 部分可写

(:其实和 Disruptor 是一样的

template <typename T>
class RingBuffer {
public:
    RingBuffer(size_t siz): Cap(1<<siz){ data = new T[Cap];}
    RingBuffer(const RingBuffer&) = delete;
    RingBuffer &operator=(const RingBuffer&) = delete;
    RingBuffer &operator=(const RingBuffer&) volatile = delete;

    bool push(const T &val) {
        size_t t, w;
        do {
            t = tail.load();
            if ((t + 1) % Cap == head.load())
                return false;
        } while (!tail.compare_exchange_weak(t, (t + 1) % Cap));
        std::memcpy(data+t, &val, sizeof(T));
        do {
            w = t;
        } while (!write.compare_exchange_weak(w, (w + 1) % Cap));
        return true;
    }

    bool pop(T &val) {
        size_t h;
        do {
            h = head.load();
            if (h == write.load()) 
                return false;
            std::memcpy(&val, data+h, sizeof(T));
        } while (!head.compare_exchange_strong(h, (h + 1) % Cap));
        return true;
    }
private:
    T* data;
    std::atomic<size_t> head{0}, tail{0}, write{0};
    uint64_t Cap;
};

测试

using ValueType = char[64],以 512bit 的字符数组为一单位元素进行测试

性能测试

线程均绑定隔离核

节选结果如下:

*:~/fastQ/bin$ ./test_sng --op 3e9
Sng_Queue Performance Test for Queue size 1<<10, 1 producers, 1 consumers, 3000000000 operations per producer, 0 nanoseconds per production, 0 nanoseconds per consumption
Mutex CircularQueue Elapsed time: 350.593 seconds
KFIFO Elapsed time: 34.319 seconds
SPSC Elapsed time: 32.0383 seconds

*:~/fastQ/bin$ ./test_muti --pd 3 --cs 3 --op 1e9
Muti_Queue Performance Test for Queue size 1<<10, 3 producers, 3 consumers, 1000000000 operations per producer, 0 nanoseconds per production, 0 nanoseconds per consumption
Muti-ptr Disruptor Elapsed time: 321.647 seconds
Atomic RingBuffer Elapsed time: 710.012 seconds
Mutex CircularQueue Elapsed time: 650.429 seconds

*:~/fastQ/bin$ ./test_muti --pd 3 --cs 1 --op 1e9
Muti_Queue Performance Test for Queue size 1<<10, 3 producers, 1 consumers, 1000000000 operations per producer, 0 nanoseconds per production, 0 nanoseconds per consumption
Muti-ptr Disruptor Elapsed time: 234.385 seconds
Atomic RingBuffer Elapsed time: 430.647 seconds
Mutex CircularQueue Elapsed time: 732.851 seconds

kfifospsc 速度比加锁快一个数量级,spsc 更优一些

Disruptor 速度约为加锁的 2~3 倍

RingBuffer 在消费者数量相对较多时表现不佳,怀疑是 CAS 操作导致竞争过于激烈浪费了 CPU 资源

同等数据量的下,单生产单消费的 kfifo 的速度比多生产多消费的 Disruptor 快 7~8 倍

同时,多生产多消费情况存在竞争,当竞争过于激烈时,无锁的性能就相对下降了

正确性测试

-O3 的情况下也都能通过,除了 spsc 注释中标注的错误方法,暂时还没有解决(:

在性能测试时 spsc的表现比 kfifo更优秀一些,但开-O3 后,如果将自旋的 while 改到 pushpop 函数内会出现死循环。有个现象是在自旋中sleep for 1 nanosecond就可以正确运行,不懂 ovo

局限

  • 存储智能指针时它指向的对象并不能被及时的析构,出队后对象仍然在 data 数组里,并没有立即销毁
  • 为保证性能数组大小 N 应当在编译时确定,无法动态扩容。
2 Likes

亲亲:kissing_heart:

最近发现了 CRTP 的一个神秘应用。

考虑如下一种很常见的场景,你有一个很复杂很复杂的类 D,你尝试将其划分出以下模块:

  • 共享信息 A,这部分信息在其他功能模块(B 和 C)都需要使用
  • 功能模块 B
  • 功能模块 C
  • 功能模块 B 和 C 之间的交互和整体接口,称为 D

为了防止你把一大坨屎山拉在一个文件内,你决定每个文件各拉一坨把他们拆分到不同的文件里去实现。既然你使用了 C++,你希望这个划分满足零开销抽象原则,也就是在时间上和空间上不引入额外的开销。

我们考虑以下几种方法:

  1. .h 和 .cpp

C++ 可以把声明放在 .h 中,实现放在不同的 .cpp 里,只要合理拆分 .cpp 那么理论上是可行的,但是这个还是很丑陋,比如你要编译就得把所有 .cpp 丢进编译器(只 include.h 会链接失败)。以及函数声明仍然一坨摆在一个文件里。

另外分离接口和实现会阻止某些优化,你需要开启 -flto 启用跨文件优化。

  1. 拆成四个类然后多继承

写一个公共类 A,B 和 C 继承 A 并且 D 多继承 B 和 C。

这种方法的问题在于出现了菱形继承,于是你会发现 A 的成员变量在 D 中出现了两次,这很不好,不仅导致了歧义还浪费了空间。

  1. 加上虚继承

加上虚继承解决了空间上的浪费,但很不幸,引入了虚表。这导致时间上的浪费。

  1. 最终解决方法:CRTP

说到底,实际上你只需要拿到一个指向共有信息 x 的指针,如果你把共有信息写在 A 中并使用多继承,那么无论是时间还是空间总得有地方妥协,因为 C++ 需要保证 B 和 C 的可用性(但你实际上并不会单独使用 B 和 C)。

所以不妨把共有信息写在 D 中,然后通过 CRTP 的方式拿到 D 的指针进行操作。

1 Like

既然 B 和 C 都得用 A,那肯定不是 B 或者 C 拥有 A 的所有权,而是 B 和 C 的所有权拥有者拥有 A 的所有权。

1 Like