Spark压缩文件性能分析

Spark压缩文件性能分析

HDFS上分布式文件存储,成为大数据平台首选存储平台。而Spark往往以HDFS文件为输入,为保持兼容性,Spark支持多种格式文件读取,大数据场景下,性能瓶颈往往是IO,而不是CPU算力,所以对文件的压缩处理成为了很必要的手段。Spark为提供兼容性,同时支持多种压缩包直接读取,方便于用户使用,不用提前对压缩格式处理,但各种压缩格式各有优缺点,若不注意将导致Spark的能力无法发挥出来。故对Spark计算压缩文件做一个分析。

支持的压缩格式

首先来看一下Spark读取HDFS文件常用的压缩格式:
图片说明

执行对比分析

实验数据:同一个文件包,json格式文件数据
处理逻辑:增加列,然后发送到kafka中。
DAG逻辑划分:两个job(read动作一个job,foreach动作一个job),每个job下面各一个stage,每个stage下面task若干
程序执行参数:–master yarn --deploy-mode client --executor-cores 4 --executor-memory 4G --num-executors 4

非压缩文件

文件大小:33.7GB
运行时间:9min

read阶段:

图片说明
可以看到所有节点都在读取,分布式读取,速度很快。
图片说明

Stage里面共计分成了252个task,每个读取128MB数据。

foreach阶段

图片说明
依然并行全力计算
图片说明
每个执行节点上4个core都在并发运行。

GZIP

文件大小:10.6GB
运行时间:2.2h

read阶段

图片说明
只有单节点读取

同时该节点上也只有一个核心在运行
图片说明
foreach阶段

也是只有单节点、单core运行

BZIP2

文件大小:7.7GB
运行时间:12min

read阶段

图片说明
与非压缩一致,并行进行

foreach阶段

图片说明
同样并发执行

SNAPPY

文件大小:16.5GB
运行时间:2.1h
这里直接采用的整文件压缩,所以文件不可分割。

read阶段

图片说明

单节点单核心读取,非并行。

foreach阶段

同上类似,单节点单核心运行
结果
图片说明
gzip和snappy无法采用并行计算,也就是说在spark平台上,这两种格式只能采用串行单进程执行,于本文开头表格对应,无法分割(splittable)的压缩格式只能顺序一个进程读取,而读取后多文件又在一个executor上,其他executor无文件导致无法并行的foreach。
bz2和非压缩格式支持分割,也就是说可以并行读取以及计算。
不可分割的压缩格式文件不可并行读取,完全无法发挥spark的并行计算优势,并且若压缩包过大,对单节点的物理性能要求较高。
建议
snappy采用分块压缩方式使其可以并行读取计算。
gzip格式最好提前进行分割成小文件或者换格式,因多个文件可以并行读取。另一个办法是read文件后调用repartition操作强制将读取多数据重新均匀分配到不同的executor上,但这个操作会导致大量单节点性能占用,因此该格式建议不在spark上使用。
bz2表现相同于非压缩,但解压操作需要耗费时间。
非压缩性能表现最佳,但会占用过大HDFS存储。
spark输出压缩文件
实际生产环境需要spark输出文件到HDFS,并且为了节省空间会使用压缩格式,以下介绍几种常用的压缩格式
文本文件压缩

bzip2

压缩率最高,压缩解压速度较慢,支持split。

import org.apache.hadoop.io.compress.BZip2Codec
//
rdd.saveAsTextFile("codec/bzip2",classOf[BZip2Codec])

snappy

json文本压缩率 38.2%,压缩和解压缩时间短。

import org.apache.hadoop.io.compress.SnappyCodec
//
rdd.saveAsTextFile("codec/snappy",classOf[SnappyCodec])

gzip

压缩率高,压缩和解压速度较快,不支持split,如果不对文件大小进行控制,下次分析可能可能会造成效率低下的问题。
json文本压缩率23.5%,适合使用率低,长期存储的文件。

import org.apache.hadoop.io.compress.GzipCodec
//
rdd.saveAsTextFile("codec/gzip",classOf[GzipCodec])

parquet文件压缩

列式存储布局(比如 Parquet)可以加速查询,因为它只检查所有需要的列并对它们的值执行计算,因此只读取一个数据文件或表的小部分数据。Parquet 还支持灵活的压缩选项,因此可以显著减少磁盘上的存储。
如果您在 HDFS 上拥有基于文本的数据文件或表,而且正在使用 Spark SQL 对它们执行查询,那么强烈推荐将文本数据文件转换为 Parquet 数据文件,以实现性能和存储收益。当然,转换需要时间,但查询性能的提升在某些情况下可能达到 30 倍或更高,存储的节省可高达 75%!
转换 1 TB 数据将花费多长时间?
50 分钟,在一个 6 数据节点的 Spark v1.5.1 集群上可达到约 20 GB/分的吞吐量。使用的总内存约为 500GB。HDFS 上最终的 Parquet 文件的格式为:

...
/user/spark/data/parquet/catalog_page/part-r-00000-9ff58e65-0674-440a-883d-256370f33c66.gz.parquet
/user/spark/data/parquet/catalog_page/part-r-00001-9ff58e65-0674-440a-883d-256370f33c66.gz.parquet
...

存储节省

以下 Linux 输出显示了 TEXT 和 PARQUET 在 HDFS 上的大小比较:

% hadoop fs -du -h -s /user/spark/hadoopds1000g
897.9 G /user/spark/hadoopds1000g
% hadoop fs -du -h -s /user/spark/data/parquet
231.4 G /user/spark/data/parquet

1 TB 数据的存储节省了将近 75%!
parquet为文件提供了列式存储,查询时只会取出需要的字段和分区,对IO性能的提升非常大,同时占用空间较小,即使是parquet的uncompressed存储方式也比普通的文本要小的多。
spark中通过对parquet文件进行存储,spark2.0后默认使用snappy压缩,1.6.3及以前版本默认使用的gzip压缩方式。

dataset.write().parquet("path");

可以通过
spark.sql.parquet.compression.codec

参数或是在代码中进行修改配置压缩方式。
sparkConf.set("spark.sql.parquet.compression.codec","gzip")

parquet存储提供了
lzo gzip snappy uncompressed

参考文章
https://zturn.cc/?p=24
https://blog.csdn.net/bajinsheng/article/details/100031359
https://www.ibm.com/developerworks/cn/analytics/blog/ba-parquet-for-spark-sql/index.html

全部评论

相关推荐

09-29 17:44
已编辑
蔚来_测(准入职员工)
//鲨鱼辣椒:见不了了我实习了四个月上周再投筛选了一天就给我挂了
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务