Spark-RDD
RDD的特性
RDD的3种基本运算
RDD运算类型 | 说明 |
---|---|
“转换”运算 | 会产生另外一个RDD的运算是“转换”运算,具有lazy特性,等到执行“动作”运算时才会执行 |
“动作”运算 | 不产生另外一个RDD,而是产生数值、数组或其他动作的运算 |
“持久化” | 对于会重复使用的RDD,可以将RDD存入内存中作为后续使用,提高执行性能 |
RDD的容错性
- Lineage机制:会记录每个RDD与其父代RDD之间的关联,还会记录用什么操作才从父代得到子代的信息
- immutable(不可变)特性
RDD函数
parallelize()
将LIST对象创建为RDD对象
“转换”运算,不会实际执行
不但可以创建Int类型的RDD,也可以创建String类型的RDD
intRDD = parallelize(List(3,1,2,5,5)
stringRDD = parallelize(List('A','SD','D'))
RDD.collect()
将RDD转换为List对象
“动作”运算,立刻执行
intRDD.collect()
map()运算
通过参数将每一个元素经过函数运算产生另外一个RDD,参数可以是两种语句:具名函数和匿名函数
具名函数
def addone(x)
return (x+1)
intRDD.map(addone)
匿名函数
intRDD.map(x => x+1)
++简单的逻辑功能用匿名函数,复杂的逻辑用具名函数++
++需要重复使用的功能用具名函数++
filter()数字运算
filter可以哦那个与对RDD内的元素进行筛选,并产生另外的RDD
intRDD.filter(lambda x:x<3)
distinct()运算
RDD的去重运算
intRDD.distinct()
randomSplit()运算
可以将整个集合元素以随机数的方式按照比例分为多个RDD
sRDD = intRDD.randomSplit([0.5,0.5])# 按照1:1的比例进行分配为sRDD[0],sRDD[1]
groupBy()运算
按照传入的匿名参数规则将数据分为多个list
gRDD = intRDD.groupBy(
lambda x:"even" if(x%2==0) else "odd"
).collect()
将整个集合分为偶数和奇数
多个RDD的“转换”运算
- union()并集运算
- intersection交集运算
- subtract差集运算
- cartesian笛卡尔乘积运算
基本“动作”运算
- first()取出第一项数据
- take(n)取出前n项数据
- takeOrdered(n)从小到大排序取出前n项
- stats()统计
- min()取出最小值
- max()取出最大值
- stdev()标准差
- count()计数
- sum()总和
- mean()平均
共享变量
Broadcast广播变量
广播变量使用规则
- 可以使用SparkContext.broadcast([初始值])创建
- 使用.value来读取值
- 被创建后不能修改
broadcast = sc.broadcast(Map)
创建了名字为broadcast的广播变量
在并行处理中,广播变量会传送到WorkerNode机器,并且存储在内存中,后续在Worker Node中都可以使用这个广播变量执行转换,节省很多的内存和时间
accumulator累加器
accumulator累加器的使用规则
- 可以使用SparkContext.accumulator([初始值])创建
- 使用.add()进行累加
- 在工作中,例如for循环,不能读取累加器的值
- 只有驱动程序也就是循环外才可以使用.value来读取累加器的值
IntRDD = sc.parallelize([3,1,2,5,5])
# 创建total、num累加器
total = sc.accumulator(0.0)
num = sc.accumulator(0)
# 使用foreach传入参数i,针对每一项数据执行累加
IntRDD.foreach(lambda i:[total.add(i) , nam.add(1)])
avg = total.value / num.value
RDD Persistence持久化
可以用于将需要重复运算的RDD存储在内存中,以便大幅提升运算速率
使用方法
- RDD.persist(存储等级)——可以指定存储等级,默认存储在内存中
- RDD.unpersist()——取消持久化