# 先统计所有可能的对,并且区分是否是直接好友或者间接好友
# 累加
# 过滤
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
def fun1(line):
result = []
arrs = str(line).split(" ")
me = arrs[0]
for x in range(1, len(arrs)):
friendA = arrs[x]
resultA = (me + "_" + friendA, 0) if hash(me) > hash(friendA) else (friendA + "_" + me, 0)
result.append(resultA)
for y in range(x + 1, len(arrs)):
friendB = arrs[y]
resultB = (friendA + "_" + friendB, 1) if hash(friendA) > hash(friendB) else (friendB + "_" + friendA, 1)
result.append(resultB)
return result
def fun2(x):
flag = False
t = tuple(x)
count = 0
name = t[0]
Iterable = t[1]
for y in Iterable:
if y == 0:
flag = True
else:
count = count + 1
if flag == True:
return ("直接好友", 0)
else:
return (name, count)
if __name__ == "__main__":
"""
需求:对本地文件系统URI为:/root/friend.txt 的数据统计间接好友的数量
"""
# ********** Begin **********#
sc = SparkContext("local", "pySpark")
result = sc.textFile("/root/friend.txt").flatMap(fun1).groupByKey().map(fun2).filter(
lambda x: tuple(x)[1] != 0).collect()
print(result)
# ********** End **********#