这是Go编程语言里LMAXDisruptor的接口。它保留了Disruptor的本质和原理,并利用了很多相同的抽象概念和理论,但不会保持同样的API。
简述:
在我的MacBookPro(IntelCorei7-4960HQCPU@2.60GHz)中,我使用了Go1.4.2,此版本使我能在一秒内发送9亿多份邮件(是的,你没有听错),从一个goroutine到另一个goroutine.讯息在两台CPU间的传递很简单。请注意,您的里程可能会有所不同,通过控制CPU并清除其缓存,不同的操作系统可以添加特定的“jitter”到App中。Linux和Windows系统有给定的进程分配给特定的CPU内核它通过将所有的CPU缓存热显著降低“jitter”的能力。顺便,当Disruptor代码被编译并在Nexus5上运行,它可以每秒可推送约15-20万条信息。
一旦被初始化,在运行时,Disruptor杰出设计的考虑因素之一,就是以一个恒定的速率来处理消息。为此,它使用两个主要技术:
1.它避免了在所有costs上使用锁,costs通常会引起CPU内核间的排斥,影响可测量性。
2.它允许应用程序预先在一个环形缓冲区分配连续的空间,不产生垃圾。
通过避免垃圾,垃圾清理站和应用程序暂停的功能可以免去。
示例代码:
Wireup
runtime.GOMAXPROCS(2) // make sure we have enough cores available to executeconst RingBufferCapacity = 1024 // must be a power of 2const RingBufferMask = RingBufferCapacity - 1// this instance will be shared among producers and consumers of this applicationvar ringBuffer = [RingBufferCapacity]MyStruct{}myDisruptor := disruptor. Configure(RingBufferCapacity). WithConsumerGroup(MyConsumer{}). // we can have a set of concurrent consumers run first // WithConsumerGroup(MyConsumer{}). // and then run this/these consumers after the first set of consumers BuildShared() // Build() = single producer vs BuildShared() = multiple producersmyDisruptor.Start()defer myDisruptor.Stop() // clean shutdown which stops all idling consumers after all published items have been consumed// application code here, e.g. listen to HTTP, read from a network socket, etc.生产者
Producerwriter := myDisruptor.Writer()// for each item received from a network socket, e.g. UDP packets, HTTP request, etc. etc.sequence := writer.Reserve(1) // reserve 1 slot on the ring buffer and give me the upper-most sequence of the reservation// this could be written like this: ringBuffer[sequence%RingBufferCapacity] but the Mask and & operator is faster.ringBuffer[sequence&RingBufferMask].MyImportStructData = ... // data from network streamwriter.Commit(sequence, sequence) // the item is ready to be consumed消费者
type MyConsumer struct{}func (m MyConsumer) Consume(lowerSequence, upperSequence int64) { for sequence := lowerSequence; sequence <= upperSequence; sequence++ { message := ringBuffer[sequence&RingBufferMask] // see performance note on producer sample above // handle the incoming message with your application code }}
评论