BUILDING-A-SCALABLE-SHARED-LOG

Building a Scalable Shared Log

Challenges

设计一个满足复合需求(全序、容错、低延迟、可扩展)的 shared log system 的难点。

  1. 灵活的数据放置与平滑的重配置。可扩展的需求引入了 sharding 技术,从而构建逻辑上的 shared log 与物理 sharding 两层结构。多引入 a level of indirection 带来了新的问题,如何灵活的放置数据(即映射关系的合理设计)?一些应用希望可以定制 shards 间的数据放置,考虑数据本地性的放置策略可以提升应用的吞吐和性能。而这在 shared log 系统中设计的困难处在于为了维护出现故障时的一致性,一般维护系统范围的从 sequence number 到记录存储位置的映射,对于这个映射的修改昂贵,不够灵活,也决定了当启用新的 mapping 的时候,系统会暂停服务,不够平滑。

  2. 高性能序列器设计。使用 shared log 的 client 发出的并发操作,依赖于序列器分配 sequence number,所以序列器性能成为了系统的瓶颈,在一些文献(FuzzyLog,Scalog,Boki)中都有提及,优秀的设计可以达到 200K ops/sec(CORFU),250K ops/sec(NOPaxos),600K(Tango)但仍不不够满足需求,机试使用 122M ops/sec(RDMA-based counter)在 100Gbps 网络环境,payload 为 512 字节情形下的也仅能支持四台服务器。相当一部分设计还需要定制化硬件设备,可编程交换机(e.g. NOPaxos)。

  3. 低延迟。来源于支持一个操作集的 linearizability,为了确定一个操作的 order,需要和所有的 shards 沟通。这种解决方案引入了大量服务期间的消息导致延迟增加。

实践系统

  • Scalog: 解决了前两个挑战:提供平滑的重配置,灵活的数据防止以及可扩展的吞吐量。
  • Ziplog: 解决第三个挑战,不牺牲其他性质的前提下实现低延迟。

Scalog

Persistence-first architecture。先由 clients 发送数据给 storage servers,然后再分配全序 shared log 中的位置。

对于维护 shard 的 storage servers,每个在 shard 内维护一个 primary log segment 以及针对其他 storage sever 的 backup log segment。

Scalog 第二个关键点是,利用每个存储服务器上记录的 FIFO 排序来超越传统排序器的吞吐量限制

阶段性的,每个 storage sever 和 ordering layer 进行通信,报告 log 段,回复哪些记录已经被完全备份。利用 ordering layer 的信息,storage server 可以交错、一致的使用全局序和原始的偏序。

Ziplog

传统的 shared log 设计:

  • 读日志:\(Read\)操作和\(Subscribe\)操作。
  • 写日志:\(Append\) 操作。

\(Append(R)\)操作用于将\(R\)记录添加在尾部。为了简化实现,一般要求操作满足 linearizability(见老论文 Linearizability: a correctness condition for concurrent objects)

  • 拓展只觉得串行顺序正确性以支持并发操作
  • 提供了组合性质:保证如果系统中的每个部件都是 separately linearizable,那么整体系统也是。

Ziplog 设计的初衷是发现,当前架构 append-to-then-end-of-the-log API 导致的高延迟。

鱼与熊掌不可兼得:使用独立的 ordering 服务必然引入额外一轮的信息通信。

But using a separate ordering service must involve an additional roundtrip message delay. In Corfu [26], which uses a sequencer, this additional roundtrip happens before a record is stored, while in Scalog [38], which relies on an ordering layer, it happens after a record is stored. Ordering and storing cannot be done concurrently because either the sequence number must be stored with the record (as in Corfu), or the record’s position in the storage server must be kept by the ordering layer (as Scalog does through its “cuts”).

由于 ordering service 自身也有故障风险,还需要额外代价(比如 Corfu 在这一层启用了 Paxos)。

弃用\(Append(R)\) 转而使用\(InsertAfter(R_2,rid_1)\),利用 separately linearizable 实现全局的 linearizability。不再使用\(Append(R)\)也意味着 log positions 不必按序填充,Ziplog 可以先填后面的位置再填前面的。

拓展 local pair identifier\(\langle sid, lsn\rangle\)至 global\(gsn\): \(gsn = N \times lsn + sid\)。如果\(rid_1\)中包含\(R_1\)的 global index\(gsn_1\),那么我们只需要满足\(gsn_2(N \times lsn_2 + sid_2)\)大于\(gsn_1\)就可以满足序要求。为了保证 shared logs 不存在空洞,shard 可以放置 no-op 记录。这种分配策略要求 shards 知道\(N\)(shards 的总数目),所以在 shards 数目增减的时候不再有效。在某些特定情况下这是必要的,Ziplog 使用 Paxos-based 成员管理服务。

主要贡献:

  • Ziplog 的 InsertAfter 接口和新的备份策略允许记录在 shard 内持久化。2 个消息延迟后 ordered without contention,3 个消息延迟后 with contention。可以无需特定硬件情况下提供近似于 NOPaxos 的延迟。
  • Ziplog 的成员管理服务(shards member)在关键写路径之外,仅处理重配置和故障恢复。这类似情况是轻负载(不常发生)使得 Ziplog 可以管理上千 shards 并达到和 Scalog 持平的吞吐量。
  • Ziplog 也提供 seamless reconfiguration

General Techniques

讨论几个重要技术及其实现手段

Fault Tolerance

分布式计算中重要一环,允许分布式系统在部分组件出现故障时仍可以提供正常服务,常通过 state machine replication 来实现,不同副本运行共识协议(或等价的原子广播)。经半个多世纪的研究,可选用的具体方案有:

  • The Primary/Backup approach 主从备份模式
  • Paxos and its variants Paxos 家族共识协议
  • Chain Replication 链复制
    • Clients 向链的头写,并从尾读,可实现高吞吐量

Scalability

  • Scaling horizontally(scaling out,水平扩展)
    • 通过添加更多的服务器来增加吞吐量和容量。存储系统如 Cassandra,Corfu,DynamoDB 和 Spanner 都可以扩展到相当大规模的服务器上。
  • Scaling vertically(scaling up,垂直扩展)
    • 通过优化软件实现和增加单一服务器中的硬件来增加吞吐量。手段:Batching 优化硬盘和网络通信。DPDK 和 SPDK 优化 I/O 路径来旁路操作系统内核。

Total Order

任意操作对都有序。

  • DistributedLog 使用单一写方案,所有请求通过该 server。
  • Corfu 和 LogDevice 使用中心化的 sequencer 来排序 records,使之脱离存储的核心组件(视 sequencer 为提供序的外部组件)。
  • Mencius,Calvin 和 Derecho 都使用 round-robin 的方式,让 clients 将请求直接送达独立的 shard。

Seamless Reconfiguration

Vertical Paxos 支持平滑的重配置,作为共识操作序列的部分,但是不支持 sharding,所以不具备可扩展性。

水平可扩展的系统希望随时增删 shards,纵向的可扩展系统希望替换部分 servers 但不停止服务。重配置很关键,但难以实现 seamless reconfiguration。

使用 ZooKeeper 来支持重配置会有典型暂停,比如 Corfu 会用 30ms 来完成一次重配置。

Flexible Data Placement

选择放置数据的 shard。应是是支持的,但是有些系统牺牲灵活性来保证其他性质。

Low Latency

Zyzzyva,EPaxos,Speculative Paxos ,以及 NOPaxos 规定系统行为的不同方面,包括领导者的正确性,消息之间的依赖性以及网络中的消息顺序。当猜测成功时,系统将快速进行;否则,他们必须修复推测结果,花费更多时间。

Mencius

RDMA,DPDK 和 SPDK 等技术旁路内核允许纵向的扩展同时优化吞吐量。Derecho,一个持久的组同学设施就是依赖 RDMA 设施来降低延迟。

Ziplog a step further

核心点:同时兼具 low latency 和 scalable throughput

一些研究 LogDevice、Oracle Message Queue、vCorfu 和 Scalog 关注在使用 shared log 抽象构建高性能带有容错性的系统。这些工作表名可以协调 total order 和诸如 high scalability(通过 multi-shards),应用友好的数据布局以及平滑的重配置。

但低延迟要求依然困难:

  • Scalog append: 1.2ms
  • NOPaxos:
    • 111 us(特殊定制的硬件,比得上一个 non-replicated 系统)
    • 200 us(不使用)

作者认为更深的原因是:在 multi-shard log 中支持 linearizable append 操作使得低延迟几乎不可能。

而当今的系统是在 total order、scalable throughput 和 latency 中做取舍:

  • Scalog:total order+scalable throughput
  • NOPaxos:total order+low latency(for a single shard)
  • Kafka:scalable throughput + low latency

Ziplog 以牺牲?第四维(shared log's API)的代价来换取以上三者的优势。

\(InsertAfter(R, rid')\)允许 clients 指定一个入口\(rid'\),Ziplog 会在其后寻找位置插入\(R\)。查询可以根据指定的\(rid\)获取对应记录或者从某个起始开始获取\((R,rid)\)对的记录。

(有关 linearizability 的讨论):API 修改仍保证可以构建相同一致性保证的应用(easily why?)。这里是个问题,作者表示利用新的 API,clients 订阅操作得到的\(rid'\)之后的序列,从而根据 API 设定语义,让 casual dependent 的操作满足\(rid'' > rid'\)即可满足一致性要求。

Ziplog 的性能

  • without contention,2 message delays, 150 us
  • with contention, 3 message delays, 220 us

cross-shard 的同步仅发生于处理失败的情况和重配置中。

Goals and Non-goals

  • Goals
    • 同步实现匹敌 Scalog 的吞吐量和匹敌 NOPaxos 的 latency
    • 不使用定制的专门硬件
    • 从根源(API 设计)来达成目标
    • 设计用于数据中心(大量的 application servers 作为使用 Ziplog 的 clients)
    • 假设为 crash failure 模型
  • Non-goals
    • 绝对最优 latency(比如使用 RDMA 和 DPDK 等网络层面优化)
    • 不假设为拜占庭故障

每个 Ziplog 的 partition storage 由一组 shards 组成,\(f+1\),其中\(f\)为可容忍错误数量。

Ziplog API

  • \(InsertAfter\)
    • 定义了一个\(RID_GENESIS\)(初始位置,对于没有依赖/放置随意的记录)
  • \(SetPolicy\)
    • 允许 client 选择放置策略(类似的在 vCorfu 和 Scalog 中也有)
  • \(Subscribe\)
    • 允许应用从\(rid\)参数起开始顺序地读取日志
    • 特殊参数\(RID_GENESIS\),则从初始位置开始读取日志
    • 特殊参数\(RID_RECENT\),则从最近地插入(不一定是最新)位置开始读取
  • \(Trim\)
    • 进行垃圾回收,删除某个记录之前地记录标识

Design Overview

对于插入一条操作来说,关键路径

  1. 由 client 使用本地的 client library 调度 SetPolicy 确定放置策略选择 shard 发送记录
  2. shard 接收到记录后进行 replication(容错要求)并独立(无需跨 shard 通信)赋给记录 gsn 标识(global sequence number)
  3. shard 将 gsn 发送回 client 端,client library 接收到 gsn 后返回 rid 给上层 application
  4. client application 最终获取 rid(record identifier)
    • 对于 application 来说,不透明的是一个由 gsn 和 shard identifier 构成元组。

三个要点:

  • GSNA
  • Replication
  • Failure Recovery

GSNA

保证低延时的关键点是允许不同 shard 独立分发 global sequence number,同时保证:

  1. 一个 gsn 不会对应多个 shard
  2. 每个 gsn 都会对应到一个 shard

实践遇到的困难:

  1. shards 集合可能由于重配置或者 failure 产生变化。
  2. 不同 shards 会看到不同比例的变更:计算 gsn 的算法不考虑这种差异会造成延迟增大
    1. 高速分配的 log 更新到靠前地方,低速分配的 log 需要使用 no-op(无操作)来填补空白。
    2. 填补是一种空间上的浪费,也会消耗一些无意义的带宽

Ziplog 的 membership management service(MMS)解决了两个问题,这里的 membership 就是指 shards 集合。

  • 告知每个 shard 可用的添加新记录的 shard 集合
  • 广播消息划分任期/版本 epochs

通过调整一个 epoch 的长度来防止出界。32 位长度来表示一个 epoch 内的 gsn 号,周期最长为 10ms。

Transition 的不理解?

7.28 meeting

交流内容

  1. 简述 shared log systems 发展过程,阐述我们的构想(Serverless 场景下 partial order shared log runtime)
  2. 简述 Ziplog 设计,交流对 key point 的理解是否正确
  3. 针对 Ziplog Q&A,以及我们的构想是否可行

要点记录

Shared Log Systems Development

  • 丁博建议补充阅读 Virtual Consensus [OSDI'20](同为 Mahash 的工作)
    • 针对 LogDevice 比较详尽的
  • 针对我们提出的想法,在 Serverless 场景下做 Multi-level consistency,丁博表示他对 partial order 发面了解不是很多,之前的工作都是在 total order 这个要求下做的,idea 可行主要要看有没有需求场景(他们组 ziplog 后续投稿都因类似原因被拒了)
    • 以及如果要优化 latency,也要考虑平台影响,他们实验在 AWS 上的延迟约为 CloudLab(最终选用)10 倍

Ziplog

现有的 shared log 设计是在 total order、low latency、scalable throughput 三者间做取舍。

total order low latency scalable throughput
Kafka*
Scalog [NSDI ‘20]
NOPaxos [OSDI ‘16]
Ziplog

(*丁博解释 latency 这个维度叙述比较 trick,kafka 实际的 latency 并不好,现在很多工作也是在做这上面的优化)

Ziplog 的设计,为了同时达到这三条性质,引入新的维度:API design

  • 将原有语义为添加的日志 tail 的\(Append(R)/Insert(R)\)修改为\(InsertAfter(R,rid)\)\(rid\)表示某条已经加入 log 的记录的 position,如果操作间存在依赖关系则由使用的 client 自己关系。
  • 对于 latency 的优化,主要在于移除了中心化的 sequencer/ordering layer,而是可以在每个 shard 在本地跑序列号计算算法(配合新 API 实际上预先给不同 shard 分配了 slot,得到的仍然是全局的序列号)
  • 理解:实际上是将部分维护一致性的负担转移给程序员(丁博承认这点),但整体上仍可以保证 linearizability(这一点比较难以理解,审稿人也针对这一点进行过提问)

Ziplog 的后续工作:

  • 丁博所在组后面的工作主要是网络通信的优化,使用 RDMA 技术压低 latency,文章被拒原因还是:
    • 没有讲好 shared log 有什么用(应用场景)
    • 使用新 API 以后,仍能保证 linearizability 没有说服审稿人(一致性等级)

聊完 Ziplog 以后主要针对我们的 idea 可行性,在 shared log 这个话题上讲故事的主要问题,Kafka 相关可以发掘的 idea 进行了讨论。

  • 丁博建议对于 shared log 有什么用、优势和局限是什么要考虑清楚:

    • 场景:

      • 价于一个无限拓展的单机日志
      • 用于 message queue 服务(比如 AWS 和 Oracle)的
    • shared log 最主要的性质是 scalability(主要是持久存储的)

    • 基于 shared log 使用的窘境;如果要求 total order 现实中读的要求不需要达到这么高的 thoughput,如果不要求 total order 那就退化成 Kafaka,可以 cover 90%的情况。(他文章中所提淘宝对 throughput 的需求最后就是牺牲了 total order 用 Kafka 解决的)

  • Kafka 的 research 的点:
    • cost 问题(hybrid 存储迁移)
    • latency 问题(读、写、replication 三个操作)
      • page-cache 崩掉, tail latency 的问题
      • 读-写 pattern latency 希望都低

TODO

对于应用来说:

  • 使用单机 in memory 的场景的局限(message queue)
  • shared log 有什么作用?
    • 现阶段款
    • 存储的量很大,append 这么多,但是中心化的读用不到如此 throughput

total order 的价值在哪里

丁博回顾读博期间做 shared log 的模型,只关心一个 partition,写就写,读就读。