spark出租车情况统计
spark udf练习
# -*- coding: UTF-8 -*- from pyspark.sql import SparkSession import json if __name__ == '__main__': spark = SparkSession.builder.master("local").appName("demo").getOrCreate() #**********begin**********# df = spark.read.option("header", True).option("delimiter", "\t").csv("/root/data2.csv") # 1 将时间戳转换成时间 ,并将列名重命名为 TIME df.createTempView("data") spark.sql( "select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME ,POLYLINE from data").show() # 2.1 计算每个行程总时长,以秒为单位,并将其作为新列,列名为 TIMELEN # 2.2 分离出起始位置与目的位置作为新列,起始位置列名为 STARTLOCATION,目的位置列名为 ENDLOCATION spark.udf.register("timeLen", lambda x: { (len(json.loads(x)) - 1) * 15 if len(json.loads(x)) > 0 else 0 }) spark.udf.register("startLocation", lambda x: { str(json.loads(x)[0]) if len(json.loads(x)) > 0 else "" }) spark.udf.register("endLocation", lambda x: { str(json.loads(x)[len(json.loads(x)) - 1]) if len(json.loads(x)) > 0 else "" }) spark.sql( "select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME ,POLYLINE,timeLen(POLYLINE) as TIMELEN,startLocation(POLYLINE) as STARTLOCATION ,endLocation(POLYLINE) as ENDLOCATION from data").createTempView( "data2") spark.sql("select * from data2").show() # 3 统计每天各种呼叫类型的数量并以CALL_TYPE,TIME升序排序 spark.sql("select CALL_TYPE,TIME,count(1) as NUM from data2 group by CALL_TYPE,TIME order by CALL_TYPE,TIME").show() #**********end**********#