亲亲
内存屏障/原子操作的一种常见应用
单生产单消费
KFIFO
由 Linux
内核队列改写的 c++ 版本
不需要使用任何原子变量,巧妙应用内存屏障,在读/写结束后才更新头尾指针,保证了当前线程正在读/写的部分对于对家线程不可见
template <class ValueType>
class kfifo {
private:
ValueType* buffer; /* the buffer holding the data */
unsigned int size; /* the size of the allocated buffer */
unsigned int in; /* data is added at offset (in % size) */
unsigned int out; /* data is extracted from off. (out % size) */
public:
kfifo(unsigned int sz) : size((1<<sz)), in(0), out(0) {
buffer = new ValueType[size];
}
kfifo(const kfifo& ) = delete;
kfifo(const kfifo&& ) = delete;
void operator = (const kfifo&& ) = delete;
~kfifo() {
delete[] buffer;
}
bool push(const char *item) {
/*******************两种自旋方式都是正确的************************/
// MODE1:
std::atomic_thread_fence(std::memory_order_acquire);
if (size - in + out == 0) {
return false;
} // 函数外 while 自旋
// MODE2:
// while (size - in + out == 0){
// std::atomic_thread_fence(std::memory_order_acquire);
// } // 函数内 while 自旋
std::atomic_thread_fence(std::memory_order_acq_rel);
std::memcpy(buffer+(in & (size - 1)), item, sizeof(ValueType));
std::atomic_thread_fence(std::memory_order_release);
++ in;
return true;
}
bool pop(char *item) {
// MODE1:
std::atomic_thread_fence(std::memory_order_acquire);
if (in == out) {
return false;
} // 函数外 while 自旋
// MODE2:
// while (in == out){
// std::atomic_thread_fence(std::memory_order_acquire);
// } // 函数内 while 自旋
std::atomic_thread_fence(std::memory_order_acquire);
std::memcpy(item, buffer+(out & (size - 1)), sizeof(ValueType));
std::atomic_thread_fence(std::memory_order_acq_rel);
++ out;
return true;
}
};
如果一次性拷贝的长度较大或是长度不一可以写成以下形式
unsigned int push(const char *buf, unsigned int len)
{
unsigned int l;
len = min(len, size - in + out);
std::atomic_thread_fence(std::memory_order_acq_rel);
l = min(len, size - (in & (size - 1)));
memcpy(buffer + (in & (size - 1)), buf, l);
memcpy(buffer, buf + l, len - l);
std::atomic_thread_fence(std::memory_order_release);
in += len;
return len;
}
unsigned int pop(char *buf, unsigned int len)
{
unsigned int l;
len = min(len, in - out);
std::atomic_thread_fence(std::memory_order_acquire);
l = min(len, size - (out & (size - 1)));
memcpy(buf, buffer + (out & (size - 1)), l);
memcpy(buf + l, buffer, len - l);
std::atomic_thread_fence(std::memory_order_acq_rel);
out += len;
return len;
}
SPSC
使用CAS (compare and swap) 原子操作实现
template <typename T>
class spsc {
T *data;
std::atomic<size_t> head{0}, tail{0};
size_t Cap;
public:
spsc(size_t siz): Cap(1<<siz) {
data = new T[Cap];
}
spsc(const spsc&) = delete;
spsc &operator=(const spsc&) = delete;
spsc &operator=(const spsc&) volatile = delete;
bool push(const T &val) {
/*************************注释部分为错误版本***********************************/
// size_t t;
// do {
// t = tail.load(std::memory_order_acquire);
// } while ((t + 1) & (Cap-1) == head.load(std::memory_order_acquire));
size_t t = tail.load(std::memory_order_relaxed);
if ((t + 1) % Cap == head.load(std::memory_order_acquire)) return false;
std::memcpy(data + t, &val, sizeof(T));
tail.store((t + 1) & (Cap-1), std::memory_order_release);
return true;
}
bool pop(T &val) {
// size_t h;
// do {
// h = head.load(std::memory_order_acquire);
// } while (h == tail.load(std::memory_order_acquire));
size_t h = head.load(std::memory_order_relaxed);
if (h == tail.load(std::memory_order_acquire)) return false;
std::memcpy(&val, data + h, sizeof(T));
head.store((h + 1) & (Cap-1), std::memory_order_release);
return true;
}
};
多生产多消费
Disruptor
实现原理
4 个位置变量
- lastRead 最后一个已读内容位置
- lastWrote 最后一个已写内容位置
- lastDispatch 最后一个派发给消费者的槽位序号
- writableSeq 当前可写的槽位序号
记录 readableSeq=lastDispatch+1
表示当前可读槽位序号
他们从小到大依次为:lastRead
,readableSeq
,lastWrote
,writableSeq
实现逻辑为
-
对于生产者而言,先申请
writableSeq
槽位,更新writableSeq
;写完之后,等待在它之前的槽位都被写完才更新lastWrote
-
对于消费者而言,先申请
readableSeq
槽位,更新readableSeq
;读完之后,等待在它之前的槽位都被读完才更新lastRead
由以上两点可知
lastRead
~readableSeq
为正在被消费的部分readableSeq
~lastWrote
为已经生产完的部分,对于消费者可见lastWrote
~writableSeq
为正在被生产的部分writableSeq
~lastRead
为已经消费完的部分,对于生产者可见
因此只有被更新完毕的部分才对对家线程可见
一些优化
-
首先是将常用的变量用
alignas(64)
进行内存对齐,以防交替冲突 -
加速取余,将队列大小 N 设置为 2 的次幂的编译时常量,将
x%N
优化为x & (N-1)
-
避免取余,将 4 个位置变量设置为
unsigned int
,自动处理溢出
template<class ValueType , size_t N = DefaultRingBufferSize>
class Disruptor
{
public:
Disruptor() : _lastRead(-1L) , _lastWrote(-1L), _lastDispatch(-1L), _writableSeq(0L) , _stopWorking(false){};
~Disruptor() {};
Disruptor(const Disruptor&) = delete;
Disruptor(const Disruptor&&) = delete;
void operator=(const Disruptor&) = delete;
static_assert(((N > 0) && ((N& (~N + 1)) == N)),
"RingBuffer's size must be a positive power of 2");
template<typename T>
bool push(ValueType& val)
{
const uint64_t writableSeq = _writableSeq.fetch_add(1);
while (writableSeq - _lastRead > N)
{
if (_stopWorking.load()) {
// throw std::runtime_error("writting when stopped disruptor");
return false;
}
std::this_thread::yield();
}
std::memcpy(_ringBuf + (writableSeq & (N - 1)), &val, sizeof(ValueType));
while (writableSeq - 1 != _lastWrote);
_lastWrote = writableSeq;
return true;
};
bool pop(ValueType& val) {
const uint64_t readableSeq = _lastDispatch.fetch_add(1) + 1;
while (readableSeq > _lastWrote)
{
if (_stopWorking.load())
{
// throw std::runtime_error("reading when stopped disruptor");
return false;
}
std::this_thread::yield();
}
std::memcpy(&val, _ringBuf + (readableSeq & (N-1)), sizeof(ValueType));
while (readableSeq - 1 != _lastRead);
_lastRead = readableSeq;
return true;
}
bool empty() {return _writableSeq - _lastRead == 1;}
//通知 disruptor 停止工作,调用该函数后,若 buffer 已经全部处理完,那么获取可读下标时只会获取到 -1L
void stop() {_stopWorking.store(true);}
private:
alignas(64) std::atomic_bool _stopWorking;
alignas(64) std::atomic_uint64_t _lastRead;
alignas(64) std::atomic_uint64_t _lastWrote;
alignas(64) std::atomic_uint64_t _lastDispatch;
alignas(64) std::atomic_uint64_t _writableSeq;
ValueType _ringBuf[N];
};
RingBuffer
使用CAS (compare and swap) 操作实现
实现原理
将非原子过程用do while
循环包起来,最后的判断设计为一个 CAS 操作。如果 CAS 操作失败,意味着该过程被其他线程抢占,那么重新执行该过程,不断重试直到强占到执行权。
注意,必须先读取再更新指针才能保证并发正确,但考虑到
- push 时数据拷贝必须放在循环体外,否则强占到执行权的线程写入的数据有可能被其他未强占到执行权的线程修改
- pop 时在循环体内拷贝数据不会导致错误,但没有强占到执行权的线程会反复的拷贝数据,对 CPU 资源消耗很大
因此引入两个新指针
write
,表示 push 操作写完的位置read
,表示 pop 操作读完的位置
由此可知
read
~head
部分正在被读head
~write
部分可读write
~tail
部分正在被写tail
~read
部分可写
(:其实和 Disruptor 是一样的
template <typename T>
class RingBuffer {
public:
RingBuffer(size_t siz): Cap(1<<siz){ data = new T[Cap];}
RingBuffer(const RingBuffer&) = delete;
RingBuffer &operator=(const RingBuffer&) = delete;
RingBuffer &operator=(const RingBuffer&) volatile = delete;
bool push(const T &val) {
size_t t, w;
do {
t = tail.load();
if ((t + 1) % Cap == head.load())
return false;
} while (!tail.compare_exchange_weak(t, (t + 1) % Cap));
std::memcpy(data+t, &val, sizeof(T));
do {
w = t;
} while (!write.compare_exchange_weak(w, (w + 1) % Cap));
return true;
}
bool pop(T &val) {
size_t h;
do {
h = head.load();
if (h == write.load())
return false;
std::memcpy(&val, data+h, sizeof(T));
} while (!head.compare_exchange_strong(h, (h + 1) % Cap));
return true;
}
private:
T* data;
std::atomic<size_t> head{0}, tail{0}, write{0};
uint64_t Cap;
};
测试
using ValueType = char[64]
,以 512bit 的字符数组为一单位元素进行测试
性能测试
线程均绑定隔离核
节选结果如下:
*:~/fastQ/bin$ ./test_sng --op 3e9
Sng_Queue Performance Test for Queue size 1<<10, 1 producers, 1 consumers, 3000000000 operations per producer, 0 nanoseconds per production, 0 nanoseconds per consumption
Mutex CircularQueue Elapsed time: 350.593 seconds
KFIFO Elapsed time: 34.319 seconds
SPSC Elapsed time: 32.0383 seconds
*:~/fastQ/bin$ ./test_muti --pd 3 --cs 3 --op 1e9
Muti_Queue Performance Test for Queue size 1<<10, 3 producers, 3 consumers, 1000000000 operations per producer, 0 nanoseconds per production, 0 nanoseconds per consumption
Muti-ptr Disruptor Elapsed time: 321.647 seconds
Atomic RingBuffer Elapsed time: 710.012 seconds
Mutex CircularQueue Elapsed time: 650.429 seconds
*:~/fastQ/bin$ ./test_muti --pd 3 --cs 1 --op 1e9
Muti_Queue Performance Test for Queue size 1<<10, 3 producers, 1 consumers, 1000000000 operations per producer, 0 nanoseconds per production, 0 nanoseconds per consumption
Muti-ptr Disruptor Elapsed time: 234.385 seconds
Atomic RingBuffer Elapsed time: 430.647 seconds
Mutex CircularQueue Elapsed time: 732.851 seconds
kfifo
、spsc
速度比加锁快一个数量级,spsc
更优一些
Disruptor
速度约为加锁的 2~3 倍
RingBuffer
在消费者数量相对较多时表现不佳,怀疑是 CAS 操作导致竞争过于激烈浪费了 CPU 资源
同等数据量的下,单生产单消费的 kfifo
的速度比多生产多消费的 Disruptor
快 7~8 倍
同时,多生产多消费情况存在竞争,当竞争过于激烈时,无锁的性能就相对下降了
正确性测试
开 -O3
的情况下也都能通过,除了 spsc
注释中标注的错误方法,暂时还没有解决(:
在性能测试时 spsc
的表现比 kfifo
更优秀一些,但开-O3
后,如果将自旋的 while 改到 push
或pop
函数内会出现死循环。有个现象是在自旋中sleep for 1 nanosecond
就可以正确运行,不懂 ovo
局限
- 存储智能指针时它指向的对象并不能被及时的析构,出队后对象仍然在 data 数组里,并没有立即销毁
- 为保证性能数组大小 N 应当在编译时确定,无法动态扩容。
亲亲
最近发现了 CRTP 的一个神秘应用。
考虑如下一种很常见的场景,你有一个很复杂很复杂的类 D,你尝试将其划分出以下模块:
- 共享信息 A,这部分信息在其他功能模块(B 和 C)都需要使用
- 功能模块 B
- 功能模块 C
- 功能模块 B 和 C 之间的交互和整体接口,称为 D
为了防止你把一大坨屎山拉在一个文件内,你决定每个文件各拉一坨把他们拆分到不同的文件里去实现。既然你使用了 C++,你希望这个划分满足零开销抽象原则,也就是在时间上和空间上不引入额外的开销。
我们考虑以下几种方法:
- .h 和 .cpp
C++ 可以把声明放在 .h 中,实现放在不同的 .cpp 里,只要合理拆分 .cpp 那么理论上是可行的,但是这个还是很丑陋,比如你要编译就得把所有 .cpp 丢进编译器(只 include.h 会链接失败)。以及函数声明仍然一坨摆在一个文件里。
另外分离接口和实现会阻止某些优化,你需要开启 -flto 启用跨文件优化。
- 拆成四个类然后多继承
写一个公共类 A,B 和 C 继承 A 并且 D 多继承 B 和 C。
这种方法的问题在于出现了菱形继承,于是你会发现 A 的成员变量在 D 中出现了两次,这很不好,不仅导致了歧义还浪费了空间。
- 加上虚继承
加上虚继承解决了空间上的浪费,但很不幸,引入了虚表。这导致时间上的浪费。
- 最终解决方法:CRTP
说到底,实际上你只需要拿到一个指向共有信息 x 的指针,如果你把共有信息写在 A 中并使用多继承,那么无论是时间还是空间总得有地方妥协,因为 C++ 需要保证 B 和 C 的可用性(但你实际上并不会单独使用 B 和 C)。
所以不妨把共有信息写在 D 中,然后通过 CRTP 的方式拿到 D 的指针进行操作。
既然 B 和 C 都得用 A,那肯定不是 B 或者 C 拥有 A 的所有权,而是 B 和 C 的所有权拥有者拥有 A 的所有权。