搜索
您的当前位置:首页正文

如何构建一个交易系统(十三)

来源:二三娱乐

如果你的系统不是采用一个大而全的设计( monolithic ), 不管是现在流行的微服务(microservices)还是其他, 就涉及到进程间通讯。机器之间的交流和人与人之间的交流一样,诸多的问题就会扑面而来。

简单的交流模型

从一个发送者(客户端)到接受者(服务器),之间会掺和进各种因素, 比如人与人之间交流的噪音,不同文化背景等等,在进程间进行通讯有同样的问题:

  1. 通讯的格式
  2. 服务的注册和发现
  3. 同步还是异步
  4. 平衡(Balance)模式
  5. 备选方案

如果将上面一般的信息交流过程,映射到rpc 可以得到下面的图形:

一个RPC 调用流程

基本的模块功能也非常的容易理解,但是在实际的操作中还有诸多的细节需要考虑, 现在市面上也有很多成熟的解决方案。 如阿里巴巴的dubbo(开源)、Facebook的thrift(开源)、Google grpc(开源)、Twitter的finagle(开源)等。

其实在我们的系统中,有部分使用grpc 作为gateway 转送外围请求到内部系统,但是在一些模块中还是使用了自己定制的rpc框架。

当然这个架构的设计有大量为本应用定制的元素,不一定适合所有用类。

需求

  1. 和DDD 紧密集合,面向Command 端, Event 自然走MQ走不在讨论范围内。
  2. 一套内部的序列化方式
  3. 基于Netty
  4. 提供服务有限精简,非CRUD 这样的操作(参考DDD设计思想)
  5. 无侵入性

参考前面基本介绍DDD 设计基本文章,我们整个交易系统采用了此设计架构,DDD 中的aggregate root 作为触发状态变化的核心元素, 将 domain 中触发状态改变的command 转换成事件分发出去;这一层对外提供操作服务,所以这部分是我们需要改造的部分。

为了维护高可用性和高并发性, 这层首先需要做partition, 但是不能replication, 因为一个aggregate root 在全局中只能有单个线程在运作, 所以其实是带状态的。也就是在同一个时间,一个aggregate root 只能在一台机器,一个线程中被操作。

这让服务器端和客户端协调都带来很棘手的问题, 理论提供一个带parition 的MQ 比如KAFKA 就可以解决这个问题。

这确实是我们第一个版本的实现方案(其实第二个,开始有个其他的rpc 解决方案),能够很好的满足需求, 比如我们对唯一组件的 account id 作为partition 的key将请求分发到kafka(需要保证kafka 的partition 和你节点上面parition算法一致), 服务端监听就可以了:

kafka 解决方案

有了这个中间层, 可以很好的解耦各系统之间的关联, 多一个中间的跳转有性能上的稍微损耗,而且kafka 没有很好的transaction 支持,但是这个方案可以是很好的 failover backup 方案。

下面的服务的注册和发现, 将提供一个解决方案如何在rpc 下保证服务端和客户端保持一致。

网络通讯层

        final ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("tagDecoder", new LengthFieldBasedFrameDecoder(4069, 0, 4, 0, 0)); //Size is 4k at most
        pipeline.addLast("rpcDecoder", new SBERpcDecoder());
        pipeline.addLast("rpcEncoder", new SBERpcEncoder());
        pipeline.addLast("pingPong", new PingPongHandler()); // process pingPong
        if (application != null) {
            pipeline.addLast("app", application); // process application events events
        }

唯一需要定制的就是你的 Decoder 和 Encoder 和你的Application 层用。

后面接的是你的Application 层应用,这一次是避免加入RPC 对原有价格的侵入控制点, 我们这里是分发到进程内Command Bus(基于disruptor)。所以这里RPC 必须都是异步调用, 需要根据一个request id(UUID, 或者为一个sequence) 回调---如果需要返回结果的话。 Command Bus 不介意一个Command 是从RPC 还是 MQ 还是 HTTP 请求来。

RPC异步 vs 同步:

同步 异步

业务层

也就是处理Command Business Logic 部分,对于发送过来的command 需要是一个DDD风格的Command:

  1. 包含一个Aggregate Root Identifier, 也就是唯一标示一个物体的, 比如人的account Id, 一个产品的booker Id 等。
  2. 一个全局统一的partition 算法。
  3. 一个标准的SBE 兼容消息

比如一个简单的创建账号的请求(command)

@SBEMessage(id = 800, desc = "Create Portfolio Account")
public class CreatePortfolioCommand {
    private static final long serialVersionUID = -8206695527451683906L;

    @TargetAggregateIdentifier
    @FieldType(index = 999, type = "AccountId", id = 999)
    protected AccountId accountId;

    @FieldType(index = 1, type = "int64", id = 1)
    protected long userId; //this may pass by frontend

    @FieldType(index = 2, type = "double", id = 2)
    protected double credit;

    @FieldType(index = 3, type = "VARCHAR8", id = 3, presence = Presence.OPTIONAL)
    protected String vendor;
}

一个Message 包含:

    short domain;
    long timestamp;
    long sequence;
    int part;
    Object payload;
    int id;
  1. domain 一个short 标注来自那个domain, 自己知道就可以,理论一个应用domain 个数不会超过百把个,否则太复杂了!
  2. timestamp 消息发送的时间戳
  3. sequence 一个客户端维护的sequence,服务器不检查重复,回调时候用
  4. part: Domain Command 属于那个partition
  5. payload 消息体
  6. id: 消息id 也就是上面SBEMessage 的id

为什么消息上面要带partition, 下面的服务注册和发现 还会再做说明,所以需要一个统一的客户端和服务器端的partition 算法。

为什么要带消息的id?几个作用:

  1. 路由
  2. 避免服务端和客户端的stub 代码, 无需IDL、XML语言去描述你的服务
  3. 无侵入性

只要在Application 层,套一薄薄层,路由到内部的Command Bus, 但是有个缺点就是内部必须有完善的文档, 知道每个服务器端能接受什么类型的服务,也就是那些类型message Id 支持,否则过来会在Application无情过滤掉。

对应Application 到业务层的Adapter:

@FunctionalInterface
public interface CommandHandler<T> {

    /**
     * Handler for specific command
     *
     * @param command, the payload
     * @param part,      which partition
     * @param sequence,  the sequence like version may the command need to be fail detected
     * @param timestamp, the time message newed
     * @throws Exception
     */
    void handle(final T command,
                final int part,
                final long sequence,
                final long timestamp) throws Exception;
}

服务注册和发现

上面我们反复强调, partition 一致性的重要性,这里可以揭晓为什么这样设计。

DDD设计方式, Domain 无疑是整个架构的核心, 其实一个Object 都可以称为一个Aggregate Root; 维护这个对象状态变更, 比如一个账号对象(Account), 有基本的属性, 比如account Id, cash, credit, position等等, 有对这个对象的不同的操作:充值、取款、加仓、平仓、调整信用度等等, 这些自然的作为一个命令(Command)发送给这个对象。 为了维护这个对象状态一致性, 必须保证单线程操作!这个在单机还容易实现,但是对于集群环境呢?

对 account id 进行partition 分区, 在集群机器上,一个partition只能位于一台机器上。 比如对 account ID 分成32个partition, 理论可以支持最多32台机并行处理对account 的所有请求, 如果现在有两台机器, A & B, 他们分别得到partition:

A: 1,4,8,10,11,13,17,19,20,22,24,27,29
B: 0,2,3,5,6,7,9,12,14,15,16,18,21,23,25,26,30,31

如果错误将一个partition 为0 的账号请求发送到 A 机器上面将导致处理被拒绝。 虽然机器A 可以从Event source 将这个账号replay 出来, 但是由于性能优化, replay 出来的Domain 是做本地缓存的, 同样snapshot 和 event source 也是缓存,这样可能导致 domain 恢复错误。

但是一旦发生re-balance? 比如0 partition 从 A 移动到B, 这个时候会把A 中, partition 为 0 domain 相关所有数据刷入DB, 在 re-balance 结束前, partition 0 上命令可能失败

这就导致一个问题,服务注册要带上自己的IP+Port+Partitioins.

服务发现端,同样需要根据不同partition 发送到对应的服务器端。

注册和发现服务使用redis, 服务器端在感知partition 发生变化后, 会刷到redis 一个map 上面(带TTL),然后双方一个消息到topic上, 同时定时刷行当前parition 到redis上面。

客户端直接监听和定时poll 这个map 就可以:

客户端服务发现

测试

不考虑落地数据, 单机测试可以满足据大部分的性能需求。当然这个离上线还是比较远, 需要考虑re-balance 情况下有parittion 不可用, 所以我们这里还是有failover 方案, 在服务不可得情况下是发送到 mq 中,还待长时间进一步检验。

参考

Top