高性能队列:Disruptor
高性能队列:Disruptor
0、文章导读
- 了解传统队列存在的一些问题
- 了解内存屏障,更详细可以了解:浅谈 "内存屏障"- 开发者角度
1、传统队列问题
以 ArrayBlockingQueue 为例
通过加锁来保证线程安全
但是加锁会严重的影响性能。
变量出现伪共享问题(占用相同的 cache line)
- takeIndex、putIndex、count 很容易被放到同一个缓存行,但是之间修改并没太多关联,修改其中一个元素,导致整个缓存行失效,从而出现伪共享。
- 注:这种无法充分使用缓存行特性的现象,称为伪共享。
加锁可能会存在死锁问题,但原子变量不会存在 死锁
问题
2、内存屏障和缓存问题
内存屏障指令
内存屏障(英语:Memory barrier),也称内存栅栏,内存栅障,屏障指令等,是一类同步屏障指令,它使得 CPU 或编译器在对内存进行操作的时候, 严格按照一定的顺序来执行, 也就是说在内存屏障之前的指令和之后的指令不会由于系统优化等原因而导致乱序。
- 完全内存屏障(full memory barrier)保障了早于屏障的内存读写操作的结果提交到内存之后,再执行晚于屏障的读写操作。
- 内存读屏障(read memory barrier)仅确保了内存读操作;
- 内存写屏障(write memory barrier)仅保证了内存写操作。
缓存
这对数据意味着,任何值的最新版本在被写入后的任何阶段都可以位于寄存器、存储缓冲区、L1/L2/L3 缓存之一或主内存中。如果线程要共享此值,则需要以有序的方式使其可见,这是通过协调缓存一致性消息的交换来实现的。这些信息的及时产生可以通过内存屏障来控制。
举例
比如:变量:private volatile int state
,被线程 A 用 CAS 操作赋值为 3,那线程 B 是如何一定拿到的值是 3 呢?
由
Happens-before
原则保证,其中的 volatile 变量规则:对一个 volatile 变量的写操作,Happens-Before 于后续对这个变量的读操作。从底层实现来看:从底层实现来看,volatile 写操作会通过汇编中的 lock 前缀指令,对这块内存区域的缓存行进行锁定(相当于加入了
内存屏障
,从而保证程序执行顺序),这样,如果此时正在对 volatile 变量时行写操作,那么其他线程所有的读操作都需要等待写操作完成,这就是 volatile 写操作 happens-before 对这个字段读操作的原因
3、Disruptor
Disruptor 通过以下设计来解决队列速度慢的问题:
- 类似于对象池机制(
RingBuffer
)
启动时,将预先分配环形缓冲区的所有内存,这些 entry 中的每一个通常不是传递的数据本身,而是类似 对象池机制
,是的实际数据 的 容器
。这种 entry 的预分配消除了支持垃圾回收的语言中的问题,因为 entry 将被重用,并在整个 Disruptor 实例存活期间都有效。
// 预分配源码
private void fill(EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD+i] = eventFactory.newInstance();
}
}
// 比如 E 为 ELement
class Element {
private int value; // 实际数据
private int get() {
return value;
}
private void set(int value) {
this.value = value;
}
}
- 环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。由于环形的存储方式,且不存在元素的删除操作,只有覆盖操作,没有扩容的概念。
- 元素位置定位
数组长度 2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心 index 溢出的问题。index 是 long 类型,即使 100 万 QPS 的处理速度,也需要 30 万年才能用完。
- 无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
如何实现无锁设计?整个过程通过原子变量 CAS,保证操作的线程安全。
4、实际应用
Apache Log4j 2 的性能提升
总结
Disruptor 通过精巧的无锁设计实现了在高并发情形下的高性能。
整体上来看 Disruptor 在提高吞吐量、减少并发执行损耗上做出了很大贡献,通过贴合硬件机制的方式进行设计,消除写争用,最小化读争用,并确保代码与现代处理器使用的 Cache 特性良好配合。
很多的高并发场景都可以借鉴 Disruptor 的设计,减少竞争的强度。其设计思想也可以扩展到分布式场景,通过无锁设计,来提升服务性能。
案例
使用 Disruptor 比使用 ArrayBlockingQueue 略微复杂,为方便读者上手,增加代码样例。
代码实现的功能:每 10ms 向 disruptor 中插入一个元素,消费者读取数据,并打印到终端。详细逻辑请细读代码。
以下代码基于 3.3.4 版本的 Disruptor 包。
public class App {
public static void main(String[] args) throws InterruptedException {
// 队列中的元素
class Element {
private int value;
private int get() {
return value;
}
private void set(int value) {
this.value = value;
}
}
// 生产者的线程工厂
ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread");
}
};
// RingBuffer生产工厂,初始化RingBuffer的时候使用
EventFactory<Element> factory = new EventFactory<Element>() {
public Element newInstance() {
return new Element();
}
};
// 处理Event的handler
EventHandler<Element> handler = new EventHandler<Element>() {
public void onEvent(Element event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Element: " + event.get());
}
};
// 阻塞策略
BlockingWaitStrategy strategy = new BlockingWaitStrategy();
// 指定RingBuffer的大小
int bufferSize = 16;
// 创建disruptor,采用单生产者模式
Disruptor<Element> disruptor = new Disruptor<>(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
// 设置 EventHandler
disruptor.handleEventsWith(handler).and();
// 启动disruptor的线程
disruptor.start();
RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();
for(int l = 0; true; l ++) {
// 获取下一个可用位置的下标
long sequence = ringBuffer.next();
System.out.println("sequence: " + sequence);
try {
// 返回可用位置的元素
Element event = ringBuffer.get(sequence);
// 设置该位置元素的值
event.set(l);
} finally {
ringBuffer.publish(sequence);
}
Thread.sleep(10);
}
}
}
扩展
如何测试 cache line 的影响?
public class Main {
// 考虑一般缓存行大小是 64 字节,一个 long 类型占 8 字节
static long[][] arr;
public static void main(String[] args) {
arr = new long[1024 * 1024][];
for (int i = 0; i < 1024 * 1024; i++) {
arr[i] = new long[8];
for (int j = 0; j < 8; j++) {
arr[i][j] = 0L;
}
}
long sum = 0L;
long marked = System.currentTimeMillis();
for (int i = 0; i < 1024 * 1024; i++) {
for (int j = 0; j < 8; j++) {
sum += arr[i][j];
}
}
System.out.println("Time taken: " + (System.currentTimeMillis() - marked) + "ms");
marked = System.currentTimeMillis();
sum = 0L;
for (int i = 0; i < 8; i++) {
for (int j = 0; j < 1024 * 1024; j++) {
sum += arr[j][i];
}
}
System.out.println("Time taken: " + (System.currentTimeMillis() - marked) + "ms");
}
}
// 2.3 GHz 八核I、32 GB 内存、jdk 17
// Time taken: 15ms
// Time taken: 54ms
参考文章
当前页面是本站的「Google AMP」版。查看和发表评论请点击:完整版 »