中心化交易所都是基于CLOB (中央统一订单薄)进行撮合交易, 撮合交易的交易原则是价格优先,时间优先。撮合引擎对于交易所来说,是基石,是重中之重。撮合引擎需要稳定,高效,可扩展,且能够容灾,同时,要保证在极端行情下和故障的情况下,可以迅速恢复或者回滚。
基于消息驱动的并行撮合
这里提出一种基于消息驱动的并行撮合引擎,首先,基于消息驱动是指,撮合引擎的输入有且只有消息队列(例如kafka);其次,什么是 并行撮合
?
并行撮合是指,多台机器对订单同时撮合,不是1主N从的方式,也不是 Round-Robin,而是 同时撮合. 那么,一个很明显的问题,这样不会出现数据混乱吗?答案是在持久层(包括redis,数据库),我们需要做可重入处理。也就是说,同一个订单(同一个消息id标识),即使被撮合引擎撮合无数次,在持久层的状态也只会变化一次。
实现并行撮合,最为重要的有两点:一是状态机;一是可重入。
状态机
并行撮合的原理是,如果我们把撮合引擎看做一个状态机,输入是 kafka 消息,输出是下一个状态。该状态机依赖于上一个状态和输入消息,用数学表达如下:
f(0) = 初始状态
f(t) = f(m(t), f(t-1))
其中:
f(t) 是消息 t 对应的状态;
m(t) 是消息 t
对于撮合引擎而言,当系统载入初始数据后,撮合过程不再依赖环境变量,也不依赖任何随机数据,对于数据的处理,结果是完全确定的。因此,从理论上说,撮合系统,在相同的初始条件,相同的输入,必然产生相同的输出,这就是并行撮合的原理。也就是说,我们不需要像传统的做法那样,在多台撮合引擎之间通过多播协议来同步数据,而是靠相同的初始条件,相同的输入数据,相同的处理流程,以及定时检查,来保证在 同一消息点 上,各个撮合引擎之间的数据的一致性。
Ok,撮合引擎之间的数据一致性可以保证了,这只是整个系统容灾的一小部分,对于整个系统来说,每个环节都是需要确保正确,可容灾。
可重入
可重入的数学表达式:
f(x) = f(f(x))
即,对同样的输入x, 无论执行多少次,最终的结果是一样的。
对于持久存储,例如redis
和数据库,我们通过 kafka sequece id
来保证不可重入。
首先,对于一个订单生命周期的每一次变动,都是由kafka消息触发的。例如,订单的创建,由用户发送买入或卖出请求触发;订单的撮合,是由新订单的进入触发;订单的结束,是撮合的结果,自然也是新的订单触发; 其次,用户的资产变动,也可以认为是由消息触发。用户的资产变动,主要由以下几个方面:
- 充币
- 提币
- OTC
- 各种交易
最后,从更大的层面来说,不单是订单,资产,我们可以将所有的输入,都抽象为消息,然后,整个系统的任何变动,都是由基于消息触发。通过在持久层实现可重入,那么我们的整个系统就可以通过
问题来了,如何实现,如何保证正确性,如何保证一致性,如何容灾?
sequence id
作为状态同步的尺度
先是架构图:
架构图对应的消息流程如下:
- 消息进入kafka队列;
- 各台撮合引擎分别从kafka中读取数据,并独立撮合;
- 撮合引擎将撮合结果写入redis HA, redis HA 通过消息id来保证数据的可重入;
- 撮合引擎 写入redis成功 后,将需要修改的数据发送给kafka;
- 由其他模块结束步骤3的消息,延迟写入数据库。
kafka
是我们的核心单元,kafka
的每一条消息都有一个sequence id
,这是一个64位自增的正整数。sequence id
是我们的衡量状态的尺度, 我们依赖 kafka
的 sequence id
来保证数据的一致性。具体来讲,对于订单数据,我们保存在三个地方,撮合引擎的内存中,redis中,数据库中。我们需要保证的是,三者对于同一个 sequence id, 这三个地方对应的订单状态是一致的。
如何保证撮合引擎的数据一致性
理论上,每台撮合引擎初始数据一致,处理相同的消息,并且是按照相同的顺序处理,因此,在同一个消息点上,所有撮合引擎的内存中的orderbook
是应该完全一致的。
当然,这只是理论上的。为了实际保证撮合的一致性,并能够检查验证这一结果,我们可以在kafka中设计一种消息,当撮合引擎收到此类消息时,计算内存中所有orderbook
的hash值,然后发送给kafka,撮合引擎接收其他撮合引擎的hash值,并与自己的对比,如果hash值相同,则证明数据一致;如果不同,则需要撮合引擎对各个价位取hash,并继续比较,直到找到不同之处,然后由开发工程师分析为何会出现不一致。
我们可以非常频繁的对比撮合结果,例如,每10秒钟执行一次,这样,一旦出现不一致,我们可以快速根据当时的数据来分析定位问题。
上述是检查的措施,如果发生不一致的情况,如何处理?例如,我们已经确定了问题所在,并打算以某台撮合的数据为基准。这时,我们可以通过定义同步消息,让正确的撮合引擎把完整的orderbook发送给kafka,其他撮合引擎使用这个 orderbook 来覆盖自己原有的 orderbook 即可,跟启动时同步数据一样的流程。
其他撮合引擎启动时如何同步数据
由于数据在三个地方都有,因此,撮合引擎数据的同步可以从三个地方获取:
- 其他撮合引擎的内存中
- redis中
- 数据库中
从1中获取最好,因为这是最新的数据,而且在撮合引擎中,具有更好的一致性,而从redis和数据库中,如果是集群的话,需要考虑是否数据的存储方式。具体来讲,如果我们需要同步一个交易对的 orderbook,在redis中,如果 orderbook 保存在cluster的多台机器上,这时,我们去读取orderbook的状态时,就很难保证在多台redis上读取到的数据对应的 sequence id 相同。
从撮合引擎同步数据,可以按照这样的流程来实现:
- 向kafka发送请求同步消息;
- 撮合引擎响应消息,将orderbook的数据发给kafka;
- 待同步的撮合引擎读取orderbook数据,拿到orderbook和这个状态所对应的sequence id,初始化orderbook后,从kafka 的这个 sequence id开始读取数据,并处理。
如果这时,主撮合引擎失败了,如何处理?由于这时没有任何撮合引擎继续处理消息,因此,redis集群和数据库集群都是停留在最后一个kafka 消息id对应的状态上,当然,由于数据库的状态同步比redis慢,我们优先从redis中同步数据。因为状态不在发生变化,这时,所有redis集群对应的kafka消息id都是一致的,因此,只需要把redis cluster中的数据读出来,然后恢复出orderbook即可。
如何容灾
因为多台机器同时撮合, 因此,该方案本身天生就是容灾的! 只需要保证所有撮合系统的内存在同一消息点的数据一致性即可。
定期检查机制
通过 kafka 发送 检查指令
消息, 撮合引擎收到该消息后, 对本引擎的交易对的 orderbook 做 hash 运算, 然后将结果发出来对比,必须保证多台撮合引擎的 hash
值完全一致, 否则就需要定位排查为何 hash 值不一致。
优化
按照上面的方案,撮合引擎发送到kafka的消息会成倍的增长,由此带来后续的处理负荷也会成倍增加。如何优化这个架构?通过上面的分析,我们知道,
实现细节
我们后续会讨论一些实现的细节,例如撮合使用的数据结构,redis的存储格式,redis事务等。
方案优势
多台撮合引擎解耦
多台撮合引擎数据时刻一致
易于升级,易于测试
实时验证, 实时监测
缺点
消息量增加 1/N 倍 (架构1增加N倍,架构2增加1倍)
redis 写入次数增加 N 倍