0%

基本架构

85da297f86b3880a9192397b08ae953d.png

  1. 用户接口: shell/CLI, jdbc/odbc, webui Command Line Interface
  2. 跨语言服务 : thrift server 提供了一种能力,让用户可以使用多种不同的语言来操纵hive
  3. Driver
    Driver 组件完成 HQL 查询语句从词法分析,语法分析,编译,优化,以及生成逻辑执行 计划的生成。生成的逻辑执行计划存储在 HDFS 中,并随后由 MapReduce 调用执行
    Hive 的核心是驱动引擎, 驱动引擎由四部分组成:
    1. 解释器:解释器的作用是将 HiveSQL 语句转换为抽象语法树(AST)
    2. 编译器:编译器是将语法树编译为逻辑执行计划
    3. 优化器:优化器是对逻辑执行计划进行优化
    4. 执行器:执行器是调用底层的运行框架执行逻辑执行计划
  4. 元数据存储系统 : RDBMS MySQL
    元数据,通俗的讲,就是存储在 Hive 中的数据的描述信息。
    Hive 中的元数据通常包括:表的名字,表的列和分区及其属性,表的属性(内部表和 外部表),表的数据所在目录
    Metastore 默认存在自带的 Derby 数据库中。缺点就是不适合多用户操作,并且数据存 储目录不固定。数据库跟着 Hive 走,极度不方便管理
    解决方案:通常存我们自己创建的 MySQL 库(本地 或 远程)
    Hive 和 MySQL 之间通过 MetaStore 服务交互

执行流程:

HiveQL 通过命令行或者客户端提交,经过 Compiler 编译器,运用 MetaStore 中的元数 据进行类型检测和语法分析,生成一个逻辑方案(Logical Plan),然后通过的优化处理,产生 一个 MapReduce 任务。

数据组织

  1. Hive 的存储结构包括数据库、表、视图、分区和表数据等。数据库,表,分区等等都对 应 HDFS 上的一个目录。表数据对应 HDFS 对应目录下的文件。
  2. Hive 中所有的数据都存储在 HDFS 中,没有专门的数据存储格式,因为 Hive 是读模式 (Schema On Read),可支持 TextFile,SequenceFile,RCFile 或者自定义格式等
  3. 只需要在创建表的时候告诉 Hive 数据中的列分隔符和行分隔符,Hive 就可以解析数据
      Hive 的默认列分隔符:控制符 Ctrl + A,\x01 Hive 的
      Hive 的默认行分隔符:换行符 \n
  4. Hive 中包含以下数据模型:
    database:在 HDFS 中表现为${hive.metastore.warehouse.dir}目录下一个文件夹
    table:在 HDFS 中表现所属 database 目录下一个文件夹
    external table:与 table 类似,不过其数据存放位置可以指定任意 HDFS 目录路径
    partition:在 HDFS 中表现为 table 目录下的子目录
    bucket:在 HDFS 中表现为同一个表目录或者分区目录下根据某个字段的值进行 hash 散 列之后的多个文件
    view:与传统数据库类似,只读,基于基本表创建
  5. Hive 的元数据存储在 RDBMS 中,除元数据外的其它所有数据都基于 HDFS 存储。默认情 况下,Hive 元数据保存在内嵌的 Derby 数据库中,只能允许一个会话连接,只适合简单的 测试。实际生产环境中不适用,为了支持多用户会话,则需要一个独立的元数据库,使用 MySQL 作为元数据库,Hive 内部对 MySQL 提供了很好的支持。
  6. Hive 中的表分为内部表、外部表、分区表和 Bucket 表
    外部表示hive 对存储在 HDFS 上的数据提供了一种新的抽象。而不是管理存储在 HDFS 上的数据。所以不管创建内部 表还是外部表,都可以对 hive 表的数据存储目录中的数据进行增删操作。
    • 内部表和外部表的区别:
      • 删除内部表,删除表元数据和数据
      • 删除外部表,删除元数据,不删除数据
    • 内部表和外部表的使用选择:
      • 大多数情况,他们的区别不明显,如果数据的所有处理都在 Hive 中进行,那么倾向于 选择内部表,但是如果 Hive 和其他工具要针对相同的数据集进行处理,外部表更合适。
      • 使用外部表访问存储在 HDFS 上的初始数据,然后通过 Hive 转换数据并存到内部表中
      • 使用外部表的场景是针对一个数据集有多个不同的 Schema
    • 分区表和分桶表的区别:
      Hive 数据表可以根据某些字段进行分区操作,细化数据管理,可以让部分查询更快。同 时表和分区也可以进一步被划分为 Buckets,分桶表的原理和 MapReduce 编程中的 HashPartitioner 的原理类似。
      分区和分桶都是细化数据管理,但是分区表是手动添加区分,由于 Hive 是读模式,所 以对添加进分区的数据不做模式校验,分桶表中的数据是按照某些分桶字段进行 hash 散列 形成的多个文件,所以数据的准确性也高很多

元数据表

meta表

hive版本

VERSION表,记录hive版本

数据库相关meta表

表名 说明 字段
DBS 该表存储Hive中所有数据库的基本信息 数据库ID,数据库描述,数据库HDFS路径,数据库名,数据库所有者用户名,所有者角色
DATABASE_PARAMS 该表存储数据库的相关参数,在CREATE DATABASE时候用WITH DBPROPERTIES (property_name=property_value, …)指定的参数 数据库ID,参数名,参数值

DBS和DATABASE_PARAMS这两张表通过DB_ID字段关联。

表相关meta表

表名 说明 字段
TBLS 该表中存储Hive表、视图、索引表的基本信息 表ID,创建时间,数据库ID,上次访问时间,所有者,保留字段,序列化配置信息,表名,表类型,视图的详细HQL语句,视图的原始HQL语句
TABLE_PARAMS 该表存储表/视图的属性信息 表ID,属性名,属性值
TBL_PRIVS 该表存储表/视图的授权信息 授权ID,授权时间,授权执行用户,授权者类型,被授权用户,被授权用户类型,权限,表ID

文件存储信息相关的元数据表

由于HDFS支持的文件格式很多,而建Hive表时候也可以指定各种文件格式,Hive在将HQL解析成MapReduce时候,需要知道去哪里,使用哪种格式去读写HDFS文件,而这些信息就保存在这几张表中。

表名 说明 字段
SDS 该表保存文件存储的基本信息,如INPUT_FORMAT、OUTPUT_FORMAT、是否压缩等。TBLS表中的SD_ID与该表关联,可以获取Hive表的存储信息
SD_PARAMS 该表存储Hive存储的属性信息,在创建表时候使用STORED BY ‘storage.handler.class.name’ [WITH SERDEPROPERTIES (…)指定
SERDES 该表存储序列化使用的类信息
SERDE_PARAMS 该表存储序列化的一些属性、格式信息,比如:行、列分隔符

表字段相关的元数据表

表名 说明 字段
COLUMNS_V2 该表存储表对应的字段信息

表分区相关的元数据表

表名 说明 字段
PARTITIONS 该表存储表分区的基本信息
PARTITION_KEYS 该表存储分区的字段信息
PARTITION_KEY_VALS 该表存储分区字段值
PARTITION_PARAMS 该表存储分区的属性信息

数据类型与存储格式

基本数据类型

类型 描述 示例
boolean true/false TRUE
tinyint 1字节的有符号整数 -128~127 1Y
smallint 2个字节的有符号整数,-32768~32767 1S
int 4个字节的带符号整数 1
bigint 8字节带符号整数 1L
float 4字节单精度浮点数 1.0
double 8字节双精度浮点数 1.0
deicimal 任意精度的带符号小数 1.0
String 字符串,变长 “a”,’b’
varchar 变长字符串 “a”,’b’
char 固定长度字符串 “a”,’b’
binary 字节数组 无法表示
timestamp 时间戳,纳秒精度 122327493795
date 日期 ‘2018-04-07’

复杂数据类型

类型 描述 示例
array 有序的的同类型的集合 array(1,2)
map key-value,key必须为原始类型,value可以任意类型 map(‘a’,1,’b’,2)
struct 字段集合,类型可以不同 struct(‘1’,1,1.0), named_stract(‘col1’,’1’,’col2’,1,’clo3’,1.0)

存储格式

Hive会为每个创建的数据库在HDFS上创建一个目录,该数据库的表会以子目录形式存储,表中的数据会以表目录下的文件形式存储。对于default数据库,默认的缺省数据库没有自己的目录,default数据库下的表默认存放在/user/hive/warehouse目录下。

  1. textfile
    textfile为默认格式,存储方式为行存储。数据不做压缩,磁盘开销大,数据解析开销大。
  2. SequenceFile
    SequenceFile是Hadoop API提供的一种二进制文件支持,其具有使用方便、可分割、可压缩的特点。
    SequenceFile支持三种压缩选择:NONE, RECORD, BLOCK。 Record压缩率低,一般建议使用BLOCK压缩。
  3. RCFile
    一种行列存储相结合的存储方式。
  4. ORCFile
    数据按照行分块,每个块按照列存储,其中每个块都存储有一个索引。hive给出的新格式,属于RCFILE的升级版,性能有大幅度提升,而且数据可以压缩存储,压缩快 快速列存取。
  5. Parquet
    Parquet也是一种行式存储,同时具有很好的压缩性能;同时可以减少大量的表扫描和反序列化的时间。

数据格式

当数据存储在文本文件中,必须按照一定格式区别行和列,并且在Hive中指明这些区分符。Hive默认使用了几个平时很少出现的字符,这些字符一般不会作为内容出现在记录中。

Hive默认的行和列分隔符如下表所示。

分隔符 描述
\n 对于文本文件来说,每行是一条记录,所以\n 来分割记录
^A (Ctrl+A) 分割字段,也可以用\001 来表示
^B (Ctrl+B) 用于分割 Arrary 或者 Struct 中的元素,或者用于 map 中键值之间的分割,也可以用\002 分割。
^C 用于 map 中键和值自己分割,也可以用\003 表示。

DDL

hive ddl

数据库DDL

  1. 创建库
  2. 查看库
  3. 删除库
  4. 切换库

表DDL

创建表:

1
2
3
4
5
6
7
8
9
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
  [(col_name data_type [COMMENT col_comment], ...)]
  [COMMENT table_comment]
  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
  [CLUSTERED BY (col_name, col_name, ...)
    [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
  [ROW FORMAT row_format]
  [STORED AS file_format]
  [LOCATION hdfs_path]
  • CREATE TABLE 创建一个指定名字的表。如果相同名字的表已经存在,则抛出异常;用户可以用 IF NOT EXIST 选项来忽略这个异常
  • EXTERNAL 关键字可以让用户创建一个外部表,在建表的同时指定一个指向实际数据的路径(LOCATION)
  • LIKE 允许用户复制现有的表结构,但是不复制数据
  • COMMENT可以为表与字段增加描述
  • PARTITIONED BY 指定分区
  • ROW FORMAT
      DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS TERMINATED BY char]
        MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
        | SERDE serde_name [WITH SERDEPROPERTIES
        (property_name=property_value, property_name=property_value, …)]
      用户在建表的时候可以自定义 SerDe 或者使用自带的 SerDe。如果没有指定 ROW FORMAT 或者 ROW FORMAT DELIMITED,将会使用自带的 SerDe。在建表的时候,
    用户还需要为表指定列,用户在指定表的列的同时也会指定自定义的 SerDe,Hive 通过 SerDe 确定表的具体的列的数据。
  • STORED AS
      SEQUENCEFILE //序列化文件
      | TEXTFILE //普通的文本文件格式
      | RCFILE  //行列存储相结合的文件
      | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname //自定义文件格式
      如果文件数据是纯文本,可以使用 STORED AS TEXTFILE。如果数据需要压缩,使用 STORED AS SEQUENCE 。
  • LOCATION指定表在HDFS的存储路径

查看表

修改表

删除表

清空表

函数&脚本

内置函数

内置函数

窗口函数

hive窗口函数_

窗口函数是用于分析用的一类函数,要理解窗口函数要先从聚合函数说起。 大家都知道聚合函数是将某列中多行的值合并为一行,比如sum、count等。 而窗口函数则可以在本行内做运算,得到多行的结果,即每一行对应一行的值。 通用的窗口函数可以用下面的语法来概括:

Function() Over (Partition By Column1,Column2,Order By Column3)

窗口函数又分为以下三类: 聚合型窗口函数 分析型窗口函数 * 取值型窗口函数

聚合型窗口函数

聚合型即SUM(), MIN(),MAX(),AVG(),COUNT()这些常见的聚合函数。 聚合函数配合窗口函数使用可以使计算更加灵活

分析型窗口函数

分析型即RANk(),ROW_NUMBER(),DENSE_RANK()等常见的排序用的窗口函数

row_number函数:生成连续的序号(相同元素序号相同)

rank函数:如两元素排序相同则序号相同,并且会跳过下一个序号

desrank函数:如两元素排序相同则序号相同,不会跳过下一个序号

取值型窗口函数

这几个函数可以通过字面意思记得,LAG是迟滞的意思,也就是对某一列进行往后错行;LEAD是LAG的反义词,也就是对某一列进行提前几行;FIRST_VALUE是对该列到目前为止的首个值,而LAST_VALUE是到目前行为止的最后一个值。

LAG()和LEAD() 可以带3个参数,第一个是返回的值,第二个是前置或者后置的行数,第三个是默认值。

窗口大小

关键是理解 ROWS BETWEEN 含义,也叫做window子句:

PRECEDING:往前

FOLLOWING:往后

CURRENT ROW:当前行

UNBOUNDED:无边界,UNBOUNDED PRECEDING 表示从最前面的起点开始, UNBOUNDED FOLLOWING:表示到最后面的终点

–其他AVG,MIN,MAX,和SUM用法一样

自定义函数

当 Hive 提供的内置函数无法满足业务处理需要时,此时就可以考虑使用用户自定义函数。

  • UDF(user-defined function)作用于单个数据行,产生一个数据行作为输出。(数学函数,字 符串函数)
  • UDAF(用户定义聚集函数 User- Defined Aggregation Funcation):接收多个输入数据行,并产 生一个输出数据行。(count,max)
  • UDTF(表格生成函数 User-Defined Table Functions):接收一行输入,输出多行(explode)
1
2
3
4
5
6
7
8
9
10
11
import org.apache.hadoop.hive.ql.exec.UDF;

public class ToLowerCase extends UDF{

// 必须是 public,并且 evaluate 方法可以重载
public String evaluate(String field) {
String result = field.toLowerCase();
return result;
}

}

Transform

Hive 的 TRANSFORM 关键字提供了在 SQL 中调用自写脚本的功能。适合实现 Hive 中没有的 功能又不想写 UDF 的情况

1
2
3
4
5
6
7
8
#!/bin/python
import sys
import datetime
for line in sys.stdin:
line = line.strip()
movie,rate,unixtime,userid = line.split('\t')
weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print '\t'.join([movie, rate, str(weekday),userid])
1
2
3
hive>add file /home/hadoop/weekday_mapper.py;
hive> insert into table lastjsontable select transform(movie,rate,unixtime,userid)
using 'python weekday_mapper.py' as(movie,rate,weekday,userid) from rate;

数据倾斜

由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点

hive框架的特性:

  • 不怕数据大,怕数据倾斜
  • Jobs 数比较多的作业运行效率相对比较低,如子查询比较多
  • sum,count,max,min 等聚集函数,通常不会有数据倾斜问题

主要表现:

任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 reduce 处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大 于平均时长。

容易出现数据倾斜的情况:

aeaecc7eb67ab1eac021091ee6874318.png

  1. group by 不和聚集函数搭配使用的时候
  2. count(distinct),在数据量大的情况下,容易数据倾斜,因为 count(distinct)是按 group by 字段分组,按 distinct 字段排序
  3. 小表关联超大表 join

产生数据倾斜的原因

  1. key 分布不均匀
  2. 业务数据本身的特性
  3. 建表考虑不周全
  4. 某些 HQL 语句本身就存在数据倾斜

解决办法

参数调节

hive.map.aggr=true

Map 端部分聚合,相当于Combiner

hive.groupby.skewindata=true

有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

语句调节

  • 如何Join:
    关于驱动表的选取,选用join key分布最均匀的表作为驱动表
    做好列裁剪和filter操作,以达到两表做join的时候,数据量相对变小的效果。
  • 大小表Join:
    使用mapjoin让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce.
    MAPJION会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,而普通的equality join则是类似于mapreduce模型中的file join,需要先分组,然后再reduce端进行连接,使用的时候需要结合着场景;由于mapjoin是在map是进行了join操作,省去了reduce的运行,效率也会高很多
  • 大表Join大表:
    把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
  • count distinct大量相同特殊值
    count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。
  • group by维度过小:
    采用sum() group by的方式来替换count(distinct)完成计算。
  • 特殊情况特殊处理:
    在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。

例子

  1. 空值产生的数据倾斜
    在日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和用户表中的 user_id 相关联,就会碰到数据倾斜的问题。
    解决方案 1:user_id 为空的不参与关联
    解决方案 2:赋予空值新的 key 值
  2. 不同数据类型关联产生数据倾斜
    用户表中 user_id 字段为 int,log 表中 user_id 为既有 string 也有 int 的类型, 当按照两个表的 user_id 进行 join 操作的时候,默认的 hash 操作会按照 int 类型的 id 进 行分配,这样就会导致所有的 string 类型的 id 就被分到同一个 reducer 当中
    解决方案:把数字类型 id 转换成 string 类型的 id
  3. 大小表关联查询产生数据倾斜
    使用map join解决小表关联大表造成的数据倾斜问题。这个方法使用的频率很高。

hive执行过程

概述

  1. Hive 将 HQL 转换成一组操作符(Operator),比如 GroupByOperator, JoinOperator 等
  2. 操作符 Operator 是 Hive 的最小处理单元
  3. 每个操作符代表一个 HDFS 操作或者 MapReduce 作业
  4. Hive 通过 ExecMapper 和 ExecReducer 执行 MapReduce 程序,执行模式有本地模式和分 布式两种模式

操作符类型

f3c5848ccaddac8a43b1f286665feaca.png

编译器的工作职责

  1. Parser:将 HQL 语句转换成抽象语法树(AST:Abstract Syntax Tree)
  2. Semantic Analyzer:将抽象语法树转换成查询块
  3. Logic Plan Generator:将查询块转换成逻辑查询计划
  4. Logic Optimizer:重写逻辑查询计划,优化逻辑执行计划。优化过程可能包括谓词下推(Predicate Push Down),分区剪裁(Partition Prunner),关联排序(Join Reorder)
  5. Physical Plan Gernerator:将逻辑计划转化成物理计划(MapReduce Jobs)
  6. Physical Optimizer:选择最佳的 Join 策略,优化物理执行计划。比如基于输入选择执行路径,增加备份作业等

优化器类型

e76307484976aefd449c340576495570.png

上表中带①符号的,优化目的都是尽量将任务合并到一个 Job 中,以减少 Job 数量,带②的 优化目的是尽量减少 shuffle 数据量

Join

Map:

  1. 以 JOIN ON 条件中的列作为 Key,如果有多个列,则 Key 是这些列的组合
  2. 以 JOIN 之后所关心的列作为 Value,当有多个列时,Value 是这些列的组合。在 Value 中还会包含表的 Tag 信息,用于标明此 Value 对应于哪个表
  3. 按照 Key 进行排序

Shuffle:

根据 Key 的值进行 Hash,并将 Key/Value 对按照 Hash 值推至不同对 Reduce 中

Reduce:

Reducer 根据 Key 值进行 Join 操作,并且通过 Tag 来识别不同的表中的数据

例子:

SELECT pv.pageid, u.age FROM page_view pv JOIN user u ON pv.userid = u.userid;

%!(EXTRA markdown.ResourceType=, string=, string=)

Group By

实例:

SELECT pageid, age, count(1) FROM pv_users GROUP BY pageid, age;

1cdd2c2d832d98763803d325a69b4859.png

distinct

示例:

SELECT age, count(distinct pageid) FROM pv_users GROUP BY age;

按照 age 分组,然后统计每个分组里面的不重复的 pageid 有多少个

68292b6da4fce41f92317a8ab3794443.png

该 SQL 语句会按照 age 和 pageid 预先分组,进行 distinct 操作。然后会再按 照 age 进行分组,再进行一次 distinct 操作

优化

常用手段

  1. 好的模型设计事半功倍
  2. 解决数据倾斜问题
  3. 减少 job 数
  4. 设置合理的 MapReduce 的 task 数,能有效提升性能。(比如,10w+级别的计算,用 160个 reduce,那是相当的浪费,1 个足够)
  5. 了解数据分布,自己动手解决数据倾斜问题是个不错的选择。这是通用的算法优化,但 算法优化有时不能适应特定业务背景,开发人员了解业务,了解数据,可以通过业务逻辑精 确有效的解决数据倾斜问题
  6. 数据量较大的情况下,慎用 count(distinct),group by 容易产生倾斜问题
  7. 对小文件进行合并,是行之有效的提高调度效率的方法,假如所有的作业设置合理的文件数,对云梯的整体调度效率也会产生积极的正向影响
  8. 优化时把握整体,单个作业最优不如整体最优

排序选择

  • cluster by:对同一字段分桶并排序,不能和 sort by 连用。cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是倒叙排序,不能指定排序规则为ASC或者DESC。
  • distribute by + sort by:distribute by是控制在map端如何拆分数据给reduce端的。hive会根据distribute by后面列,对应reduce的个数进行分发,默认是采用hash算法。sort by为每个reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,这通常是为了进行后续的聚集操作。distribute by刚好可以做这件事。因此,distribute by经常和sort by配合使用
  • sort by:sort by不是全局排序,其在数据进入reducer前完成排序,因此,如果用sort by进行排序,并且设置mapred.reduce.tasks>1,则sort by只会保证每个reducer的输出有序,并不保证全局有序。sort by不同于order by,它不受hive.mapred.mode属性的影响,sort by的数据只能保证在同一个reduce中的数据可以按指定字段排序。使用sort by你可以指定执行的reduce个数(通过set mapred.reduce.tasks=n来指定),对输出的数据再执行归并排序,即可得到全部结果。
  • order by:全局排序,缺陷是只能使用一个 reduce

笛卡尔积

当 Hive 设定为严格模式(hive.mapred.mode=strict)时,不允许在 HQL 语句中出现笛卡尔积, 这实际说明了 Hive 对笛卡尔积支持较弱。因为找不到 Join key,Hive 只能使用 1 个 reducer 来完成笛卡尔积。

当然也可以使用 limit 的办法来减少某个表参与 join 的数据量,但对于需要笛卡尔积语义的 需求来说,经常是一个大表和一个小表的 Join 操作,结果仍然很大(以至于无法用单机处 理),这时 MapJoin才是最好的解决办法。MapJoin,顾名思义,会在 Map 端完成 Join 操作。 这需要将 Join 操作的一个或多个表完全读入内存。

PS:MapJoin 在子查询中可能出现未知 BUG。在大表和小表做笛卡尔积时,规避笛卡尔积的 方法是,给 Join 添加一个 Join key,原理很简单:将小表扩充一列 join key,并将小表的条 目复制数倍,join key 各不相同;将大表扩充一列 join key 为随机数。

精髓就在于复制几倍,最后就有几个 reduce 来做,而且大表的数据是前面小表扩张 key 值 范围里面随机出来的,所以复制了几倍 n,就相当于这个随机范围就有多大 n,那么相应的, 大表的数据就被随机的分为了 n 份。并且最后处理所用的 reduce 数量也是 n,而且也不会 出现数据倾斜。

设置合理的 maptask 数量

Map 数过大

  • Map 阶段输出文件太小,产生大量小文件
  • 初始化和创建 Map 的开销很大

Map 数太小

  • 文件处理或查询并发度小,Job 执行时间过长
  • 大量作业时,容易堵塞集群

在 MapReduce 的编程案例中,我们得知,一个MR Job的 MapTask 数量是由输入分片 InputSplit 决定的。而输入分片是由 FileInputFormat.getSplit()决定的。一个输入分片对应一个 MapTask, 而输入分片是由三个参数决定的:

e8e26931026e26c5a8c775b0be3273ea.png

输入分片大小的计算是这么计算出来的:

long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))

默认情况下,输入分片大小和 HDFS 集群默认数据块大小一致,也就是默认一个数据块,启 用一个 MapTask 进行处理,这样做的好处是避免了服务器节点之间的数据传输,提高 job 处 理效率

两种经典的控制 MapTask 的个数方案:减少 MapTask 数或者增加 MapTask 数

  1. 减少 MapTask 数是通过合并小文件来实现,这一点主要是针对数据源
  2. 增加 MapTask 数可以通过控制上一个 job 的 reduceTask 个数

因为 Hive 语句最终要转换为一系列的 MapReduce Job 的,而每一个 MapReduce Job 是由一 系列的 MapTask 和 ReduceTask 组成的,默认情况下, MapReduce 中一个 MapTask 或者一个 ReduceTask 就会启动一个 JVM 进程,一个 Task 执行完毕后, JVM 进程就退出。这样如果任 务花费时间很短,又要多次启动 JVM 的情况下,JVM 的启动时间会变成一个比较大的消耗, 这个时候,就可以通过重用 JVM 来解决:

set mapred.job.reuse.jvm.num.tasks=5

小文件合并

件数目过多,会给 HDFS 带来压力,并且会影响处理效率,可以通过合并 Map 和 Reduce 的 结果文件来消除这样的影响:

set hive.merge.mapfiles = true ##在 map only 的任务结束时合并小文件

set hive.merge.mapredfiles = false ## true 时在 MapReduce 的任务结束时合并小文件

set hive.merge.size.per.task = 25610001000 ##合并文件的大小

set mapred.max.split.size=256000000; ##每个 Map 最大分割大小

set mapred.min.split.size.per.node=1; ##一个节点上 split 的最少值

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ##执行 Map 前进行小文件合并

设置合理的 reduceTask 的数量

Hadoop MapReduce 程序中,reducer 个数的设定极大影响执行效率,这使得 Hive 怎样决定 reducer 个数成为一个关键问题。遗憾的是 Hive 的估计机制很弱,不指定 reducer 个数的情 况下,Hive 会猜测确定一个 reducer 个数,基于以下两个设定:

  1. hive.exec.reducers.bytes.per.reducer(默认为 256000000)
  2. hive.exec.reducers.max(默认为 1009)
  3. mapreduce.job.reduces=-1(设置一个常量 reducetask 数量)

计算 reducer 数的公式很简单: N=min(参数 2,总输入数据量/参数 1) 通常情况下,有必要手动指定 reducer 个数。考虑到 map 阶段的输出数据量通常会比输入有 大幅减少,因此即使不设定 reducer 个数,重设参数 2 还是必要的。

依据 Hadoop 的经验,可以将参数 2 设定为 0.95*(集群中 datanode 个数)。

合并 MapReduce 操作

Multi-group by 是 Hive 的一个非常好的特性,它使得 Hive 中利用中间结果变得非常方便。 例如:

1
2
3
4
5
6
FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b ON (a.userid =
b.userid and a.ds='2009-03-20' ) ) subq1
INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20')
SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20')
SELECT subq1.school, COUNT(1) GROUP BY subq1.school

上述查询语句使用了 multi-group by 特性连续 group by 了 2 次数据,使用不同的 group by key。 这一特性可以减少一次 MapReduce 操作

合理利用分桶:Bucketing 和 Sampling

Bucket 是指将数据以指定列的值为 key 进行 hash,hash 到指定数目的桶中。这样就可以支 持高效采样了。如下例就是以 userid 这一列为 bucket 的依据,共设置 32 个 buckets

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE page_view(viewTime INT, userid BIGINT,
page_url STRING, referrer_url STRING,
ip STRING COMMENT 'IP Address of the User')
COMMENT 'This is the page view table'
PARTITIONED BY(dt STRING, country STRING)
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '1'
COLLECTION ITEMS TERMINATED BY '2'
MAP KEYS TERMINATED BY '3'
STORED AS SEQUENCEFILE;

通常情况下,Sampling 在全体数据上进行采样,这样效率自然就低,它要去访问所有数据。 而如果一个表已经对某一列制作了 bucket,就可以采样所有桶中指定序号的某个桶,这就 减少了访问量。

如下例所示就是采样了 page_view 中 32 个桶中的第三个桶的全部数据:

SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 32);

如下例所示就是采样了 page_view 中 32 个桶中的第三个桶的一半数据:

SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 64);

合理利用分区:Partition

Partition 就是分区。分区通过在创建表时启用 partitioned by 实现,用来 partition 的维度并不 是实际数据的某一列,具体分区的标志是由插入内容时给定的。当要查询某一分区的内容时 可以采用 where 语句,形似 where tablename.partition_column = a 来实现。

Join 优化

总体原则:

  1. 优先过滤后再进行 Join 操作,最大限度的减少参与 join 的数据量
  2. 小表 join 大表,最好启动 mapjoin
  3. Join on 的条件相同的话,最好放入同一个 job,并且 join 表的排列顺序从小到大

在使用写有 Join 操作的查询语句时有一条原则:应该将条目少的表/子查询放在 Join 操作 符的左边。原因是在 Join 操作的 Reduce 阶段,位于 Join 操作符左边的表的内容会被加 载进内存,将条目少的表放在左边,可以有效减少发生 OOM 错误的几率。对于一条语句 中有多个 Join 的情况,如果 Join 的条件相同,比如查询

1
2
3
4
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x ON (u.userid = x.userid);

如果 Join 的 key 相同,不管有多少个表,都会则会合并为一个 Map-Reduce 任务,而不 是”n”个,在做 OUTER JOIN 的时候也是一样

在编写 Join 查询语句时,如果确定是由于 join 出现的数据倾斜,那么请做如下设置:

1
2
set hive.skewjoin.key=100000; // 这个是 join 的键对应的记录条数超过这个值则会进行 分拆,值根据具体数据量设置
set hive.optimize.skewjoin=true; // 如果是 join 过程出现倾斜应该设置为 true

Group By 优化

  1. Map 端部分聚合
    并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进 行部分聚合,最后在 Reduce 端得出最终结果。
    MapReduce 的 combiner 组件参数包括:
1
2
set hive.map.aggr = true 是否在 Map 端进行聚合,默认为 True
set hive.groupby.mapaggr.checkinterval = 100000 在 Map 端进行聚合操作的条目数目
  1. 使用 Group By 有数据倾斜的时候进行负载均衡
    当 sql 语句使用 groupby 时数据出现倾斜时,如果该变量设置为 true,那么 Hive 会自动进行 负载均衡。策略就是把 MR 任务拆分成两个:第一个先做预汇总,第二个再做最终汇总
    在 MR 的第一个阶段中,Map 的输出结果集合会缓存到 maptaks 中,每个 Reduce 做部分聚 合操作,并输出结果,这样处理的结果是相同 Group By Key 有可能被分发到不同的 Reduce 中, 从而达到负载均衡的目的;第二个阶段 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成 最终的聚合操作。
1
set hive.groupby.skewindata = true

合理利用文件存储格式

创建表时,尽量使用 orc、parquet 这些列式存储格式,因为列式存储的表,每一列的数据在 物理上是存储在一起的,Hive 查询时会只遍历需要列数据,大大减少处理的数据量

本地模式执行 MapReduce

Hive 在集群上查询时,默认是在集群上 N 台机器上运行, 需要多个机器进行协调运行,这 个方式很好地解决了大数据量的查询问题。但是当 Hive 查询处理的数据量比较小时,其实 没有必要启动分布式模式去执行,因为以分布式方式执行就涉及到跨网络传输、多节点协调 等,并且消耗资源。这个时间可以只使用本地模式来执行 mapreduce job,只在一台机器上 执行,速度会很快。

并行化处理

一个 hive sql 语句可能会转为多个 mapreduce Job,每一个 job 就是一个 stage,这些 job 顺序 执行,这个在 cli 的运行日志中也可以看到。但是有时候这些任务之间并不是是相互依赖的, 如果集群资源允许的话,可以让多个并不相互依赖 stage 并发执行,这样就节约了时间,提 高了执行速度,但是如果集群资源匮乏时,启用并行化反倒是会导致各个 job 相互抢占资源 而导致整体执行性能的下降。启用并行化:

1
2
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8; //同一个 sql 允许并行任务的最大线程数

设置压缩存储

Hive 最终是转为 MapReduce 程序来执行的,而 MapReduce 的性能瓶颈在于网络 IO 和 磁盘 IO,要解决性能瓶颈,最主要的是减少数据量,对数据进行压缩是个好的方式。压缩 虽然是减少了数据量,但是压缩过程要消耗 CPU 的,但是在 Hadoop 中, 往往性能瓶颈不 在于 CPU,CPU 压力并不大,所以压缩充分利用了比较空闲的 CPU

RPC

RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务

背景

  • 单一应用架构
    当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。此时,用于简化增删改查工作量的数据访问框架(ORM)是关键。
  • 垂直应用架构
    当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,提升效率的方法之一是将应用拆成互不相干的几个应用,以提升效率。此时,用于加速前端页面开发的Web框架(MVC)是关键。
  • 分布式服务架构
    当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心,使前端应用能更快速的响应多变的市场需求。此时,用于提高业务复用及整合的分布式服务框架(RPC)是关键。
  • 流动计算架构
    当服务越来越多,容量的评估,小服务资源的浪费等问题逐渐显现,此时需增加一个调度中心基于访问压力实时管理集群容量,提高集群利用率。此时,用于提高机器利用率的资源调度和治理中心(SOA)是关键。

基本原理

客户端(Client):服务的调用方。
服务端(Server):真正的服务提供者。
客户端存根:存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方。
服务端存根:接收客户端发送过来的消息,将消息解包,并调用本地的方法。

过程:

  1. 要解决寻址的问题,也就是说,A服务器上的应用怎么告诉底层的RPC框架,如何连接到B服务器(如主机或IP地址)以及特定的端口,方法的名称名称是什么,这样才能完成调用。比如基于Web服务协议栈的RPC,就要提供一个endpoint URI,或者是从UDDI服务上查找。如果是RMI调用的话,还需要一个RMI Registry来注册服务的地址。
  2. 要解决通讯的问题,主要是通过在客户端和服务器之间建立TCP连接,远程过程调用的所有交换的数据都在这个连接里传输。连接可以是按需连接,调用结束后就断掉,也可以是长连接,多个远程过程调用共享同一个连接。
  3. 当A服务器上的应用发起远程过程调用时,方法的参数需要通过底层的网络协议如TCP传递到B服务器,由于网络协议是基于二进制的,内存中的参数的值要序列化成二进制的形式,也就是序列化(Serialize)或编组(marshal),通过寻址和传输将序列化的二进制发送给B服务器。
  4. B服务器收到请求后,需要对参数进行反序列化(序列化的逆操作),恢复为内存中的表达方式,然后找到对应的方法(寻址的一部分)进行本地调用,然后得到返回值。
  5. 返回值还要发送回服务器A上的应用,也要经过序列化的方式发送,服务器A接到后,再反序列化,恢复为内存中的表达方式,交给A服务器上的应用

如何分析一个RPC框架

  1. 服务治理
  2. 编码协议(IDL)
  3. 传输(通信)协议
  4. 线程模型

RPC框架

Dubbo

架构

Dubbo 架构具有连通性、健壮性、伸缩性、以及向未来架构的升级性几个特点。

  • Provider为服务提供方,提供 Java 服务接口的实现,并将其元信息注册到 Dubbo 注册中心(过程 1.register 所示)
  • Consumer为服务消费端,从 Dubbo 注册中心检索订阅的 Java 服务接口的元信息(过程 2.subscribe 所示),通过框架处理后,生成代理程序执行远程方法调用(过程 4.invoke 所示)
  • Registry为注册中心,属于注册元信息中心化基础设施(如 Apache Zookeeper 或 Alibaba Nacos),为 Provider 提供注册通道,为 Cosumer 提供订阅渠道。同时,注册中心支持注册元信息变更通知,通知 Consumer 上游 Provider 节点的变化(如扩容或缩容)。而注册元信息均以 Dubbo URL 的形式存储
  • Monitor为服务治理平台,提供开发和运维人员服务查询、路由规则、服务 Mock 和测试等治理能力

传输协议:TCP+基于长链接的NIO框架
编码协议:定制的Hessian2框架/PB/HTTP等
线程模型:Reactor

Dubbo解决的问题

  • 当服务越来越多时,服务 URL 配置管理变得非常困难,F5 硬件负载均衡器的单点压力也越来越大。此时需要一个服务注册中心,动态地注册和发现服务,使服务的位置透明。并通过在消费方获取服务提供方地址列表,实现软负载均衡和 Failover,降低对 F5 硬件负载均衡器的依赖,也能减少部分成本。
  • 服务间依赖关系变得错踪复杂,甚至分不清哪个应用要在哪个应用之前启动,架构师都不能完整的描述应用的架构关系。这时,需要自动画出应用间的依赖关系图,以帮助架构师理清关系。
  • 服务的调用量越来越大,服务的容量问题就暴露出来,这个服务需要多少机器支撑?什么时候该加机器?为了解决这些问题,第一步,要将服务现在每天的调用量,响应时间,都统计出来,作为容量规划的参考指标。其次,要可以动态调整权重,在线上,将某台机器的权重一直加大,并在加大的过程中记录响应时间的变化,直到响应时间到达阈值,记录此时的访问量,再以此访问量乘以机器数反推总容量。

Spring Cloud

Spring Cloud为开发人员提供了快速构建分布式系统中的一些通用模式(例如配置管理,服务发现,断路器,智能路由,微代理,控制总线,一次性令牌,全局锁,领导选举,分布式 会话,群集状态)。 分布式系统的协调引出样板模式(boiler plate patterns),并且使用Spring Cloud开发人员可以快速地实现这些模式来启动服务和应用程序。

交互流程

Spring Cloud 微服务架构是由多个组件一起组成的,各个组件的交互流程如下。

  • 请求统一通过 API 网关 Zuul 来访问内部服务,先经过 Token 进行安全认证。
  • 通过安全认证后,网关 Zuul 从注册中心 Eureka 获取可用服务节点列表。
  • 从可用服务节点中选取一个可用节点,然后把请求分发到这个节点。
  • 整个请求过程中,Hystrix 组件负责处理服务超时熔断,Turbine 组件负责监控服务间的调用和熔断相关指标,Sleuth 组件负责调用链监控,ELK 负责日志分析。

优点

  • 社区活跃
  • 标准化的将微服务的成熟产品和框架结合一起,Spring Cloud 提供整套的微服务解决方案,开发成本较低,且风险较小。
  • 基于 Spring Boot,具有简单配置、快速开发、轻松部署、方便测试的特点。
  • 支持 REST 服务调用,相比于 RPC,更加轻量化和灵活(服务之间只依赖一纸契约,不存在代码级别的强依赖),有利于跨语言服务的实现,以及服务的发布部署。另外,结合 Swagger,也使得服务的文档一体化。

对比Dubbo

dubbo&sc

gRPC

它的原理是通过 IDL(Interface Definition Language)文件定义服务接口的参数和返回值类型,然后通过代码生成程序生成服务端和客户端的具体实现代码,这样在 gRPC 里,客户端应用可以像调用本地对象一样调用另一台服务器上对应的方法。

它的主要特性包括三个方面。

  • 通信协议采用了 HTTP/2,因为 HTTP/2 提供了连接复用、双向流、服务器推送、请求优先级、首部压缩等机制。Netty 4.1 提供了 HTTP/2 底层协议栈,通过 Http2ConnectionHandler 及其依赖的其它类库,实现了 HTTP/2 消息的统一接入和处理。
  • IDL 使用了ProtoBuf,ProtoBuf 是由 Google 开发的一种数据序列化协议,它的压缩和传输效率极高,语法也简单
  • 多语言支持,能够基于多种语言自动生成对应语言的客户端和服务端的代码。

Thrift

Thrift 是一种轻量级的跨语言 RPC 通信方案,支持多达 25 种编程语言。为了支持多种语言,跟 gRPC 一样,Thrift 也有一套自己的接口定义语言 IDL,可以通过代码生成器,生成各种编程语言的 Client 端和 Server 端的 SDK 代码,这样就保证了不同语言之间可以相互通信。它的架构图可以用下图来描述。

网络栈结构

TProtocol层
支持多种序列化格式:如 Binary、Compact、JSON、Thrift 等。

TTransport层
支持多种通信方式:如 Socket、Framed、File、Memory、zlib 等。

Service模型
服务端支持多种处理方式:如 Simple 、Thread Pool、Non-Blocking 等。

  • TSimpleServer: 简单的单线程服务模型,常用于测试;
  • TThreadPoolServer: 多线程服务模型,使用标准的阻塞式IO;
  • TNonBlockingServer: 多线程服务模型,使用非阻塞式IO(需要使用TFramedTransport数据传输方式);
  • THsHaServer: THsHa引入了线程池去处理,其模型读写任务放到线程池去处理,Half-sync/Half-async处理模式,Half-async是在处理IO事件上(accept/read/write io),Half-sync用于handler对rpc的同步处理;

对比

框架 语言 服务治理 多种序列化 注册中心 管理中心 跨语言
Dubbo Java 支持 支持 支持 支持 不支持
Spring Cloud Java 支持 支持 支持 支持 不支持
gRPC 跨语言 不支持 only pb 不支持 不支持 支持
Thrift 跨语言 不支持 only thrift 不支持 不支持 支持

RPC中的网络传输与线程模型

基础网络模型

Linux IO模式及 select、poll、epoll详解

几种IO模型

  • BIO,同步阻塞IO,阻塞整个步骤,如果连接少,他的延迟是最低的,因为一个线程只处理一个连接,适用于少连接且延迟低的场景,比如说数据库连接。
  • NIO,同步非阻塞IO,阻塞业务处理但不阻塞数据接收,适用于高并发且处理简单的场景,比如聊天软件。
  • 多路复用IO,他的两个步骤处理是分开的,也就是说,一个连接可能他的数据接收是线程a完成的,数据处理是线程b完成的,他比BIO能处理更多请求。
  • 信号驱动IO,这种IO模型主要用在嵌入式开发,不参与讨论。
  • 异步IO,他的数据请求和数据处理都是异步的,数据请求一次返回一次,适用于长连接的业务场景。

NIO

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

多路复用

IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

Netty

概述

Netty采用典型的三层网络架构进行开发和设计,主要涵盖Reactor通信调度层,责任链ChannelPipeline和业务逻辑编排层(Service ChannelHandler)。

Reactor通信调度层:该层主要包含NioSocketChannel(客户端异步非阻塞通道)/NioServerSocketChannel(服务端异步非阻塞通道),Eventloop,ByteBuffer和Task。该层的主要职责是监听网络的读写和连接操作,负责将网络层的数据读取到内存缓冲区中,然后出发各种网络事件,例如连接,读/写等事件,将这些事件出发到pipeline中,由pipeline管理的职责链来进行后续处理。

职责链ChannelPipeline:它负责事件在职责链中的有序传播,同时负责动态地编排职责链。不同应用的Handler 节点的功能也不同,通常情况下,往往会开发编解码Hanlder 用于消息的编解码,它可以将外部的协议消息转换成内部的POJO 对象,这样上层业务则只需要关心处理业务逻辑即可,不需要感知底层的协议差异和线程模型差异,实现了架构层面的分层隔离。

业务逻辑编排层:业务逻辑编排层通常有两类:一类是纯粹的业务逻辑编排,还有一类是其他的应用层协议插件,用于特定协议相关的会话和链路管理。例如CMPP 协议,用于管理和中国移动短信系统的对接。

Reactor通信调度层

该层主要包含NioSocketChannel(客户端异步非阻塞通道)/NioServerSocketChannel(服务端异步非阻塞通道),Eventloop,ByteBuffer和Task。该层的主要职责是监听网络的读写和连接操作,负责将网络层的数据读取到内存缓冲区中,然后出发各种网络事件,例如连接,读/写等事件,将这些事件出发到pipeline中,由pipeline管理的职责链来进行后续处理。

  1. Reactor主线程 MainReactor 对象通过select 监听连接事件, 收到事件后,通过Acceptor 处理连接事件
  2. 当 Acceptor 处理连接事件后,MainReactor 将连接分配给SubReactor
  3. subReactor 将连接加入到连接队列进行监听,并创建handler进行各种事件处理
  4. 当有新事件发生时, subreactor 就会调用对应的handler处理
  5. handler 通过read 读取数据,分发给后面的worker 线程处理
  6. worker 线程池分配独立的worker 线程进行业务处理,并返回结果
  7. handler 收到响应的结果后,再通过send 将结果返回给client
  8. Reactor 主线程可以对应多个Reactor 子线程, 即MainRecator 可以关联多个SubReactor

Netty Reactor

Channel

Netty 网络通信的组件,能够用于执行网络 I/O 操作。Channel 为用户提供:

  • 当前网络连接的通道的状态(例如是否打开?是否已连接?)
  • 网络连接的配置参数 (例如接收缓冲区大小)
  • 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。
  • 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方。
  • 支持关联 I/O 操作与对应的处理程序。

不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。下面是一些常用的 Channel 类型:

  • NioSocketChannel,异步的客户端 TCP Socket 连接。
  • NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
  • NioDatagramChannel,异步的 UDP 连接。
  • NioSctpChannel,异步的客户端 Sctp 连接。
  • NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

ChannelPipeline

Netty将Channel的数据管道抽象为ChannelPipeline,消息在ChannelPipline中流动和传递。ChannelPipeline持有I/O事件拦截器ChannelHandler的双向链表,由ChannelHandler对I/O事件进行拦截和处理,可以方便的新增和删除ChannelHandler来实现不同的业务逻辑定制,不需要对已有的ChannelHandler进行修改,能够实现对修改封闭和对扩展的支持

ChannelHandler

它是一个接口,用于处理I/O事件或拦截I/O事件,并将其转发给对应的channelPipeline中的下一个处理程序。

ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:

ChannelInboundHandler 用于处理入站 I/O 事件。

ChannelOutboundHandler 用于处理出站 I/O 操作。

NioEventLoop

NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:

  • I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
  • 非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。

两种任务的执行时间比由变量 ioRatio 控制,默认为 50,则表示允许非 IO 任务执行的时间与 IO 任务的执行时间相等。

特点

  • 零拷贝
  • 数据从内存发到网络中,存在两次拷贝,先是从用户空间拷贝到内核空间,再从内核空间拷贝到网络IO
  • NIO提供的ByteBuffer可以使用Direct Buffer模式
  • 直接开辟一个非堆物理内存,不需要进行字节缓冲区的二次拷贝,可以直接将数据写入到内核空间
  • 可扩展性
    基于Netty的基础NIO框架,可以方便地进行应用层协议定制,例如HTTP协议栈、Thrift协议栈、FTP协议栈等。这些扩展不需要修改Netty的源码,直接基于Netty的二进制类库即可实现协议的扩展和定制。
  • 高可靠
    路有效性检测,内存保护机制,优雅停机

RPC性能优化

  1. 客户端 获取到 目标接口的 client stub
  2. 客户端 调用目标方法
  3. 客户端 获取到 请求方法 和 请求数据
  4. 客户端 把 请求方法 和 请求数据 序列化为 传输数据
  5. 进行网络传输
  6. 服务端 获取到 传输数据
  7. 服务端 反序列化获取到 请求方法 和 请求数据
  8. 服务端 获取到 Invoker
  9. 服务端 调用 具体实现 获取到 响应结果
  10. 服务端 把 响应结果 序列化为 传输数据
  11. 进行网络传输
  12. 客户端 接收到 传输数据
  13. 客户端 反序列化获取到 响应结果
  14. 客户端 返回 响应结果

整个流程中对性能影响比较大的环节有:序列化[4, 7, 10, 13],方法调用[2, 3, 8, 9, 14],网络传输[5, 6, 11, 12]

RPC性能取决于

  1. IO模型。优化点:结合多路复用器(select,epoll)实现非阻塞IO。
  2. 线程模型。优化点:使用Reactor线程模型
  3. 序列化方式。优化点:pb,thrift或者自定义协议
  4. 池化技术,减少对象创建和销毁
  5. 使用Direct Buffer
  6. 定制报文。优化点:不使用HTTP,使应用层的头尽可能小
  7. 连接优化。优化点:在合适的地方使用长链接

kRPC

https://wiki.corp.kuaishou.com/pages/viewpage.action?pageId=351636938

Kafka

拓扑结构

ef35529683d3fe948250b8a4985450cb.png

  1. producer:
      消息生产者,发布消息到 kafka 集群的终端或服务。
  2. broker:
      kafka 集群中包含的服务器。
  3. topic:
      每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
  4. partition:
      partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
  5. consumer:
      从 kafka 集群中消费消息的终端或服务。
  6. Consumer group:
      high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
  7. replica:
      partition 的副本,保障 partition 的高可用。
  8. leader:
      replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
  9. follower:
      replica 中的一个角色,从 leader 中复制数据。
  10. controller:
      kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。
  11. zookeeper:
      kafka 通过 zookeeper 来存储集群的 meta 信息。

zookeeper节点

eaec86120c4f10c45efdb6ca8d5b760b.png

producer

写入方式:

producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。

分区路由:

producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:

  1. 指定了 patition,则直接使用;
  2. 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
  3. patition 和 key 都未指定,使用轮询选出一个 patition。

写入流程:

  1. producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
  2. producer 将消息发送给该 leader
  3. leader 将消息写入本地 log
  4. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
  5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

producer delivery guarantee

一般情况下存在三种情况:

  1. At most once 消息可能会丢,但绝不会重复传输
  2. At least one 消息绝不会丢,但可能会重复传输
  3. Exactly once 每条消息肯定会被传输一次且仅传输一次

broker

存储方式:

物理上把 topic 分成一个或多个 patition(对应 server.properties 中的 num.partitions=3 配置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件)

存储策略

无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:

  1. 基于时间:log.retention.hours=168
  2. 基于大小:log.retention.bytes=1073741824

HA

replication

同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入replication 之后,同一个 partition 可能会有多个 replica,而这时需要在这些 replica 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 作为 follower 从 leader 中复制数据。

Kafka 分配 Replica 的算法如下:

  1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
  2. 将第 i 个 partition 分配到第(i mod n)个 broker 上
  3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上

leader failover

当 partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader。在选举新leader时,一个基本的原则是,新的 leader 必须拥有旧 leader commit 过的所有消息。

kafka 在 zookeeper 中(/brokers/…/state)动态维护了一个 ISR(in-sync replicas),由3.3节的写入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成员才能选为 leader。对于 f+1 个 replica,一个 partition 可以在容忍 f 个 replica 失效的情况下保证消息不丢失。

当所有 replica 都不工作时,有两种可行的方案:

  1. 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
  2. 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。

broker failover

  1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
  2. controller 从 /brokers/ids 节点读取可用broker
  3. controller决定set_p,该集合包含宕机 broker 上的所有 partition
  4. 对 set_p 中的每一个 partition
    4.1 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
    4.2 决定新 leader(如4.3节所描述)
    4.3 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点
  5. 通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令

controller failover

当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 “/controller” 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。

当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作

  1. 读取并增加 Controller Epoch。
  2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
  3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
  4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
  5. 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
  6. 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
  7. 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
  8. 启动 replicaStateMachine 和 partitionStateMachine。
  9. 将 brokerState 状态设置为 RunningAsController。
  10. 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。
  11. 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
  12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

consumer

The high-level consumer API

high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由 zookeeper 保存。

使用 high-level consumer API 可以是多线程的应用,应当注意:

  1. 如果消费线程大于 patition 数量,则有些线程将收不到消息
  2. 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
  3. 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的

The SimpleConsumer API

如果你想要对 patition 有更多的控制权,那就应该使用 SimpleConsumer API,比如:

  1. 多次读取一个消息
  2. 只消费一个 patition 中的部分消息
  3. 使用事务来保证一个消息仅被消费一次

但是使用此 API 时,partition、offset、broker、leader 等对你不再透明,需要自己去管理。你需要做大量的额外工作:

  1. 必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息
  2. 应用程序需要通过程序获知每个 Partition 的 leader 是谁
  3. 需要处理 leader 的变更

consumer group

kafka 的分配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),但是多个 group 可以同时消费这个 partition。

kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用 spark/Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 consumer group。

消费方式

consumer 采用 pull 模式从 broker 中读取数据。

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

consumer delivery guarantee

如果将 consumer 设置为 autocommit,consumer 一旦读到数据立即自动 commit。如果只讨论这一读取消息的过程,那 Kafka 确保了 Exactly once。

但实际使用中应用程序并非在 consumer 读取完数据就结束了,而是要进行进一步处理,而数据处理与 commit 的顺序在很大程度上决定了consumer delivery guarantee:

  1. 读完消息先 commit 再处理消息。
    这种模式下,如果 consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于 At most once
  2. 读完消息先处理再 commit。
    这种模式下,如果在处理完消息之后 commit 之前 consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于 At least once。
  3. 如果一定要做到 Exactly once,就需要协调 offset 和实际操作的输出。
    精典的做法是引入两阶段提交。如果能让 offset 和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer 拿到数据后可能把数据放到 HDFS,如果把最新的 offset 和数据本身一起写到 HDFS,那就可以保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once。(目前就 high-level API而言,offset 是存于Zookeeper 中的,无法存于HDFS,而SimpleConsuemr API的 offset 是由自己去维护的,可以将之存于 HDFS 中)

总之,Kafka 默认保证 At least once,并且允许通过设置 producer 异步提交来实现 At most once

consumer rebalance

当有 consumer 加入或退出、以及 partition 的改变(如 broker 加入或退出)时会触发 rebalance。consumer rebalance算法如下:

  1. 将目标 topic 下的所有 partirtion 排序,存于PT
  2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci
  3. N=size(PT)/size(CG),向上取整
  4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始)
  5. 将第iN到(i+1)N-1个 partition 分配给 Ci

在 0.8.*版本,每个 consumer 都只负责调整自己所消费的 partition,为了保证整个consumer group 的一致性,当一个 consumer 触发了 rebalance 时,该 consumer group 内的其它所有其它 consumer 也应该同时触发 rebalance。这会导致以下几个问题:

  1. Herd effect
      任何 broker 或者 consumer 的增减都会触发所有的 consumer 的 rebalance
  2. Split Brain
      每个 consumer 分别单独通过 zookeeper 判断哪些 broker 和 consumer 宕机了,那么不同 consumer 在同一时刻从 zookeeper 看到的 view 就可能不一样,这是由 zookeeper 的特性决定的,这就会造成不正确的 reblance 尝试。
  3. 调整结果不可控
      所有的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,这可能会导致 kafka 工作在一个不正确的状态。

基于以上问题,kafka 设计者考虑在0.9.*版本开始使用中心 coordinator 来控制 consumer rebalance,然后又从简便性和验证要求两方面考虑,计划在 consumer 客户端实现分配方案。

nsq区别

核心区别:

  1. kafka是pull,nsq是push
  2. nsq不固化数据

丢消息

刨根问底,kafka 到底会不会丢消息

生产者丢失消息

Kafka 通过配置 request.required.acks 属性来确认消息的生产:

  • 0 表示不进行消息接收是否成功的确认;不能保证消息是否发送成功,生成环境基本不会用。
  • 1 表示当 Leader 接收成功时确认;只要 Leader 存活就可以保证不丢失,保证了吞吐量。
  • -1 或者 all 表示 Leader 和 Follower 都接收成功时确认;可以最大限度保证消息不丢失,但是吞吐量低。

kafka producer 的参数 acks 的默认值为 1,所以默认的 producer 级别是 at least once,并不能 exactly once。

  • 如果 acks 配置为 0,发生网络抖动消息丢了,生产者不校验 ACK 自然就不知道丢了。
  • 如果 acks 配置为 1 保证 leader 不丢,但是如果 leader 挂了,恰好选了一个没有 ACK 的 follower,那也丢了。
  • all:保证 leader 和 follower 不丢,但是如果网络拥塞,没有收到 ACK,会有重复发的问题。

Kafka Broker 丢失消息

操作系统本身有一层缓存,叫做 Page Cache,当往磁盘文件写入的时候,系统会先将数据流写入缓存中,至于什么时候将缓存的数据写入文件中是由操作系统自行决定。

Kafka 提供了一个参数 producer.type 来控制是不是主动 flush,如果 Kafka 写入到 mmap 之后就立即 flush 然后再返回 Producer 叫同步 (sync);写入 mmap 之后立即返回 Producer 不调用 flush 叫异步 (async)。

Kafka 通过多分区多副本机制中已经能最大限度保证数据不会丢失,如果数据已经写入系统 cache 中但是还没来得及刷入磁盘,此时突然机器宕机或者掉电那就丢了,当然这种情况很极端。

消费者丢失消息

  • 先 commit 再处理消息。如果在处理消息的时候异常了,但是 offset 已经提交了,这条消息对于该消费者来说就是丢失了,再也不会消费到了。
  • 先处理消息再 commit。如果在 commit 之前发生异常,下次还会消费到该消息,重复消费的问题可以通过业务保证消息幂等性来解决。

RocketMQ

持久化 & 多副本

RocketMQ默认配置为 3副本、异步复制、异步刷盘,类比kafka的的replica_factor=3, acks=leader;在这种配置之下,可靠性99.99%,可用性99.95%

同时可以针对特殊场景,提供同步刷盘、同步复制的集群,提供更高的可靠性,在6副本、同步复制、同步刷盘的配置下,消息可靠性可达9个9。相应的,写入延时会升高,qps、availability会降低

RocketMQ的消息默认保留48小时,由于存储模型的关系,这个配置是集群级别的,不像kafka是topic级别的。所以如果有特殊的需求,请联系@沈辉 视情况单独搭建集群

消息重试(Requeue)

kafka设计之初主要作为 log (特指类似binlog这种WAL)的传输总线,同一个partition内的有序是默认需求,这意味着消费失败的消息是不能重入的,否则消息的顺序性就被破坏了;用户需要自己打日志或者转储来处理消费失败的消息。

而在MQ领域,大多数场景下其实并不需要严格的顺序,对于消费失败的消息(可能是消费者下游短时间不可用、load高等),这种情况下用户希望消息能够重新投递到MQ,可以不用额外写代码处理错误的消息。RocketMQ 在非有序消费模式下,支持消息重试,类比NSQ的requeue机制,并且重试的消息具有和普通消息一样的持久化支持。

死信队列 (Dead Letter Queue)

当消息多次重试仍然消费失败,这种情况多半是消费者逻辑有问题,比如对于消息的某些字段没有兼容等;需要有地方能够转储这一批消息。RocketMQ提供了死信队列,消费者可以指定消息最大重试次数,当消息重试超过该次数,消息将会发往死信队列。待消费者的问题解决之后,可以从死信队列拉取这些消息统一处理。

延时消息

一些场景下,生产者发送消息成功后,希望delay一段时间消息再投递给消费者,比如创建一个订单之后30分钟内未支付则取消订单的场景。

NSQ支持ms级别精度的延时消息,其实现为内存里的一个priority queue,没有持久化;kafka不支持延时消息;当前版本的RocketMQ支持分级的时延,比如支持10s、30s、1min、5min、10min、15min、20min、30min等多个级别时延,足够覆盖大部分场景,并且有持久化。

需要任意时延的延迟消息用户,见: 延时消息用户文档

写入延迟保障

RocketMQ和kafka都是顺序写盘,充分利用page cache以获得低时延和吞吐。kafka的存储模型为每个partition都由若干segment(物理文件)组成的逻辑文件,当kafka集群上的topic/partition数量多了之后,kafka的顺序写可能会逐渐退化到随机写,写入延迟上涨;只能拆分集群,减少单集群上topic/partition数量。

而RocketMQ的存储模型与kafka不同,所有消息都写到一个CommitLog中即返回,再异步将offset、length信息dispatch到各Consume Queue中,所以是严格的顺序写,在topic数量很多的时候,page fault仍然维持在低水平,写入时延较为稳定。

在异步复制、异步刷盘下,写入延时的avg为10ms,pct99为20ms

基于时间回溯

比如30分钟前消费者下游故障,期间消费者全部异常,用户希望将消费进度回拨到30min前,达到”补”消息的效果。kafka需要集群版本在0.10以上才能支持,RocketMQ默认支持。

顺序/乱序消费

RocketMQ同时提供了顺序和乱序消费。 顺序的保障是,消息写到broker上的一个Consume Queue的顺序和消费者从该Consume Queue读取的顺序是一致的,多个producer并发写到一个Consume Queue的状况顺序没有保障,实际的顺序以broker收到的顺序为准。

一般有序消息都是同步发送的,也即msg n发送成功并且client收到写入成功的结果之后才会发送msg n+1,异步/并发的有序消息是个伪需求。

对于消息没有顺序要求的场景,可以使用乱序消费,并发度为消费者实例数 * 线程/协程数。

长轮询

长轮询通过客户端和服务端的配合,达到主动权在客户端,同时也能保证数据的实时性;长轮询本质上也是轮询,只不过对普通的轮询做了优化处理,服务端在没有数据的时候并不是马上返回数据,会hold住请求,等待服务端有数据,或者一直没有数据超时处理,然后一直循环下去;下面看一下如何简单实现一个长轮询;

暂不提供什么

  1. 广播消费(TBD)
    即同一个consumer group内的所有消费者每一个实例都能收到全量消息;可以通过每个消费者使用独立的Consumer group达到效果。
  2. 事务消息 (TBD)
    事务消息指:Producer发消息和Producer的其他操作(如写DB)形成事务,如果其他操作Commit,则消息Consumer可见;如果其他操作Rollback,则消息也不会投递给Consumer。
    原生的RocketMQ的java SDK支持事务消息,现阶段其他语言暂不支持,后续会支持。

vs Kafka

技术选型:RocketMQ or Kafka

(1) 适用场景

Kafka适合日志处理;

RocketMQ适合业务处理。

(2) 性能

Kafka单机写入 TPS 号称在百万条/秒;

RocketMQ 大约在10万条/秒。

结论:追求性能的话,Kafka单机性能更高。

(3) 可靠性

RocketMQ支持异步/同步刷盘;异步/同步Replication;

Kafka使用异步刷盘方式,异步Replication。

结论:RocketMQ所支持的同步方式提升了数据的可靠性。

(4) 实时性

均支持pull长轮询,RocketMQ消息实时性更好

结论:RocketMQ 胜出。

(5) 支持的队列数

Kafka单机超过64个队列/分区,消息发送性能降低严重;

RocketMQ 单机支持最高5万个队列,性能稳定

结论:长远来看,RocketMQ 胜出,这也是适合业务处理的原因之一

(6) 消息顺序性

Kafka 某些配置下,支持消息顺序,但是一台Broker宕机后,就会产生消息乱序;

RocketMQ支持严格的消息顺序,在顺序消息场景下,一台Broker宕机后,

发送消息会失败,但是不会乱序;

结论:RocketMQ 胜出

(7)消费失败重试机制

Kafka消费失败不支持重试

RocketMQ消费失败支持定时重试,每次重试间隔时间顺延。

(8)定时/延时消息

Kafka不支持定时消息;

RocketMQ支持定时消息

(9)分布式事务消息

Kafka不支持分布式事务消息;

阿里云ONS支持分布式定时消息,未来开源版本的RocketMQ也有计划支持分布式事务消息

(10)消息查询机制

Kafka不支持消息查询

RocketMQ支持根据Message Id查询消息,也支持根据消息内容查询消息

(11)消息回溯

Kafka理论上可以按照Offset来回溯消息

RocketMQ支持按照时间来回溯消息,精度毫秒,例如从一天之前的某时某分某秒开始重新消费消息