Hadoop3.2.1环境搭建与WordCount
选择镜像下载压缩包
http://mirrors.hust.edu.cn/apache/hadoop/core/hadoop-3.2.1/
解压后配置etc/hadoop下的各个配置文件.jdk的地址以及主机名根据实际情况决定
修改配置文件
hadoop-env.sh JAVA_HOME=/home/tangsong.math/jdk1.8.0_251 HADOOP_HOME=/home/tangsong.math/repos/hadoop-3.2.1 core-site.xml <property><name>fs.defaultFS</name><value>hdfs://n227-026-077:9000</value></property> <property><name>hadoop.tmp.dir</name><value>/home/tangsong.math/repos/hadoop-3.2.1/data</value></property> (3.2.1版本只有mapred-site.xml但是2.10.1版本只有mapred-site.xml.template) cp mapred-site.xml.template mapred-site.xml mapred-site.xml <property><name>mapreduce.framework.name</name><value>yarn</value></property> <property><name>mapreduce.jobhistory.address</name><value>n227-026-077:10020</value></property> <property><name>mapreduce.jobhistory.webapp.address</name><value>n227-026-077:19888</value></property> <property><name>yarn.app.mapreduce.am.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value></property> <property><name>mapreduce.map.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value></property> <property><name>mapreduce.reduce.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value></property> yarn-site.xml <property><name>yarn.resourcemanager.hostname</name><value>n227-026-077</value></property> <property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property> #配置本机ssh免密登陆 cd ~/.ssh/ ssh-keygen -t rsa //提示按ENTER就行 cat./id_rsa.pub>>./authorized_key //加入授权 #启动hadoop 在hadoop的bin目录执行 sudo ./hdfs namenode -format sbin目录执行 ./start-all.sh 用jps命令看一下,应该有下面几个进程.如果没有正确启动,到hadoop/logs下看看日志,通常是因为没有权限等问题. DataNode ResourceManager SecondaryNameNode NameNode,NodeManager 查看yarn的历史任务(hadoop3才是默认8088端口) http://10.227.26.77:8088/cluster 查看集群情况 http://10.227.26.77:9870/dfshealth.html#tab-overview
maven工程配置
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.2.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> <version>3.2.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.2.1</version> </dependency> 在resource下新建一个log4j.properties内容如下 log4j.rootLogger=DEBUG, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
WordCount
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; public class Foo { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(); conf.set("fs.defaultFS","hdfs://10.227.26.77:9000"); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); System.getProperties().setProperty("HADOOP_USER_NAME","root"); FileSystem fs=FileSystem.get(conf); String inputPath="/hdfsapi/input"; String outputPath="/hdfsapi/output"; fs.mkdirs(new Path(inputPath)); fs.copyFromLocalFile(new Path("/Users/bytedance/repos/test.py"),new Path(inputPath)); fs.close(); Job job= Job.getInstance(conf,"word count"); job.setJarByClass(Foo.class); job.setMapperClass(WMapper.class); job.setReducerClass(WReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job,new Path(inputPath)); FileOutputFormat.setOutputPath(job,new Path(outputPath)); job.waitForCompletion(true); } } class WMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private LongWritable one = new LongWritable(1); private Text word = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split("\\s+"); for (String token : tokens) { word.set(token); context.write(word, one); } } } class WReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (LongWritable val : values) count++; context.write(key, new LongWritable(count)); } }
矩阵乘法
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Objects; public class Foo { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://10.227.26.77:9000"); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); conf.set("ARowCount", "4"); conf.set("AColCount", "3"); conf.set("BRowCount", "3"); conf.set("BColCount", "2"); System.getProperties().setProperty("HADOOP_USER_NAME", "root"); FileSystem fs = FileSystem.get(conf); String inputPath = "/hdfsapi/input"; String outputPath = "/hdfsapi/output"; fs.mkdirs(new Path(inputPath)); fs.copyFromLocalFile(new Path("/Users/bytedance/repos/a.json"), new Path(inputPath)); fs.close(); Job job = Job.getInstance(conf, "matrix multiply"); job.setJarByClass(Foo.class); job.setMapperClass(WMapper.class); job.setReducerClass(WReducer.class); job.setMapOutputKeyClass(MatrixKey.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.waitForCompletion(true); } } class MatrixKey implements WritableComparable<MatrixKey> { private int row; public MatrixKey() { } public MatrixKey(int row, int col) { this.row = row; this.col = col; } private int col; @Override public int compareTo(MatrixKey o) { if (this.row > o.row) return 1; else if (this.row < o.row) return -1; if (this.col > o.col) return 1; else if (this.col < o.col) return -1; return 0; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; MatrixKey matrixKey = (MatrixKey) o; return row == matrixKey.row && col == matrixKey.col; } @Override public int hashCode() { return Objects.hash(row, col); } public void setRow(int row) { this.row = row; } public void setCol(int col) { this.col = col; } public int getRow() { return row; } public int getCol() { return col; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(row); dataOutput.writeInt(col); } @Override public void readFields(DataInput dataInput) throws IOException { row = dataInput.readInt(); col = dataInput.readInt(); } } // A 4*3 B3*2 class WMapper extends Mapper<LongWritable, Text, MatrixKey, Text> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split("\\s+"); if (tokens.length != 4) return; int row = Integer.parseInt(tokens[1]); int col = Integer.parseInt(tokens[2]); if (tokens[0].equals("A")) { //矩阵C是ARowCount*BColCount的.一个aij会对所有的cik,1<=k<=BColCount有效 int count = Integer.parseInt(context.getConfiguration().get("BColCount")); for (int k = 1; k <= count; k++) context.write(new MatrixKey(row, k), value); } else if (tokens[0].equals("B")) { //矩阵C是RowsAndCols.ARowCount*RowsAndCols.BColCount的.一个bij会对所有的ckj,1<=ARowCount<=k int count = Integer.parseInt(context.getConfiguration().get("ARowCount")); for (int k = 1; k <= count; k++) context.write(new MatrixKey(k, col), value); } } } class WReducer extends Reducer<MatrixKey, Text, Text, LongWritable> { @Override public void reduce(MatrixKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int row = key.getRow(); int col = key.getCol(); //有贡献的是a[row,k]以及b[k,col],1<=k<= int count = Integer.parseInt(context.getConfiguration().get("AColCount")); int[] buf1 = new int[count + 1]; int[] buf2 = new int[count + 1]; for (Text value : values) { String[] tokens = value.toString().split("\\s+"); if (tokens.length != 4) return; if (tokens[0].equals("A")) { int k = Integer.parseInt(tokens[2]); int val = Integer.parseInt(tokens[3]); buf1[k]+=val; } else if (tokens[0].equals("B")) { int k = Integer.parseInt(tokens[1]); int val = Integer.parseInt(tokens[3]); buf2[k]+=val; } } int sum=0; for(int i=1;i<=count;i++)sum+=buf1[i]*buf2[i]; context.write(new Text("C "+row+" "+col),new LongWritable(sum)); } }
http://mirrors.hust.edu.cn/apache/hadoop/core/hadoop-3.2.1/
解压后配置etc/hadoop下的各个配置文件.jdk的地址以及主机名根据实际情况决定
任务提交到yarn队列运行
此时必须先在本地打好jar然后调用Job的setJar而不是setJarByClass
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Objects; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class Foo { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://10.227.26.77:9000"); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.hostname", "n227-026-077"); conf.set("yarn.resourcemanager.resource-tracker.address", "n227-026-077:8031"); conf.set("yarn.resourcemanager.address", "n227-026-077:8032"); conf.set("yarn.resourcemanager.scheduler.address", "n227-026-077:8030"); conf.set("yarn.resourcemanager.admin.address", "n227-026-077:8033"); conf.set( "yarn.application.classpath", "/home/tangsong.math/repos/hadoop-3.2.1/etc/hadoop:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/common/lib/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/common/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/hdfs:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/hdfs/lib/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/hdfs/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/mapreduce/lib/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/mapreduce/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/yarn:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/yarn/lib/*:/home/tangsong.math/repos/hadoop-3.2.1/share/hadoop/yarn/*"); conf.set("yarn.app.mapreduce.am.env", "HADOOP_MAPRED_HOME=${HADOOP_HOME}"); conf.set("mapreduce.map.env", "HADOOP_MAPRED_HOME=${HADOOP_HOME}"); conf.set("mapreduce.reduce.env", "HADOOP_MAPRED_HOME=${HADOOP_HOME}"); conf.set("HADOOP_HOME", "/home/tangsong.math/repos/hadoop-3.2.1"); conf.set("hadoop.home.dir", "/home/tangsong.math/repos/hadoop-3.2.1"); conf.set( "mapreduce.application.classpath", "/home/tangsong.math/repos/hadoop-3.2.1/etc/hadoop:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/common/*:$HADOOP_MAPRED_HOME/share/hadoop/common/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/yarn/*:$HADOOP_MAPRED_HOME/share/hadoop/yarn/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/hdfs/*:$HADOOP_MAPRED_HOME/share/hadoop/hdfs/lib/*"); conf.set("ARowCount", "4"); conf.set("AColCount", "3"); conf.set("BRowCount", "3"); conf.set("BColCount", "2"); System.getProperties().setProperty("HADOOP_USER_NAME", "root"); FileSystem fs = FileSystem.get(conf); String inputPath = "/hdfsapi/input"; String outputPath = "/hdfsapi/output"; fs.mkdirs(new Path(inputPath)); fs.copyFromLocalFile(new Path("/Users/bytedance/repos/a.json"), new Path(inputPath)); fs.close(); Job job = Job.getInstance(conf, "matrix multiply"); // job.setJarByClass(Foo.class); job.setJar("/Users/bytedance/repos/untitled3/target/untitled3-1.0-SNAPSHOT.jar"); job.setMapperClass(WMapper.class); job.setReducerClass(WReducer.class); job.setMapOutputKeyClass(MatrixKey.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); int ret = job.waitForCompletion(true) ? 0 : 1; System.out.println("ret= " + ret); org.apache.hadoop.mapreduce.Counters counters = job.getCounters(); org.apache.hadoop.mapreduce.Counter counter = counters.findCounter(IncorrectFormatReason.NOT_A_OR_B); System.out.println("IncorrectFormatReason.NOT_A_OR_B:" + counter.getValue()); } } class MatrixKey implements WritableComparable<MatrixKey> { private int row; public MatrixKey() {} public MatrixKey(int row, int col) { this.row = row; this.col = col; } private int col; @Override public int compareTo(MatrixKey o) { if (this.row > o.row) return 1; else if (this.row < o.row) return -1; if (this.col > o.col) return 1; else if (this.col < o.col) return -1; return 0; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; MatrixKey matrixKey = (MatrixKey) o; return row == matrixKey.row && col == matrixKey.col; } @Override public int hashCode() { return Objects.hash(row, col); } public void setRow(int row) { this.row = row; } public void setCol(int col) { this.col = col; } public int getRow() { return row; } public int getCol() { return col; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(row); dataOutput.writeInt(col); } @Override public void readFields(DataInput dataInput) throws IOException { row = dataInput.readInt(); col = dataInput.readInt(); } } enum IncorrectFormatReason { NOT_A_OR_B } // A 4*3 B3*2 class WMapper extends Mapper<LongWritable, Text, MatrixKey, Text> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split("\\s+"); if (tokens.length != 4) { context.getCounter(IncorrectFormatReason.NOT_A_OR_B).increment(1); return; } int row = Integer.parseInt(tokens[1]); int col = Integer.parseInt(tokens[2]); if (tokens[0].equals("A")) { // 矩阵C是ARowCount*BColCount的.一个aij会对所有的cik,1<=k<=BColCount有效 int count = Integer.parseInt(context.getConfiguration().get("BColCount")); for (int k = 1; k <= count; k++) context.write(new MatrixKey(row, k), value); } else if (tokens[0].equals("B")) { // 矩阵C是RowsAndCols.ARowCount*RowsAndCols.BColCount的.一个bij会对所有的ckj,1<=ARowCount<=k int count = Integer.parseInt(context.getConfiguration().get("ARowCount")); for (int k = 1; k <= count; k++) context.write(new MatrixKey(k, col), value); } } } class WReducer extends Reducer<MatrixKey, Text, Text, LongWritable> { @Override public void reduce(MatrixKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int row = key.getRow(); int col = key.getCol(); // 有贡献的是a[row,k]以及b[k,col],1<=k<= int count = Integer.parseInt(context.getConfiguration().get("AColCount")); int[] buf1 = new int[count + 1]; int[] buf2 = new int[count + 1]; for (Text value : values) { String[] tokens = value.toString().split("\\s+"); if (tokens.length != 4) return; if (tokens[0].equals("A")) { int k = Integer.parseInt(tokens[2]); int val = Integer.parseInt(tokens[3]); buf1[k] += val; } else if (tokens[0].equals("B")) { int k = Integer.parseInt(tokens[1]); int val = Integer.parseInt(tokens[3]); buf2[k] += val; } } int sum = 0; for (int i = 1; i <= count; i++) sum += buf1[i] * buf2[i]; context.write(new Text("C " + row + " " + col), new LongWritable(sum)); } }
spark安装与WordCount
scala版本与jdk版本一定要匹配否则无法运行。如果用scala 2.12版本或以上需要jdk9或更高,用jdk8报错
jdk8可以用scala 2.11.7
pom如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>untitled4</artifactId> <version>1.0-SNAPSHOT</version> <inceptionYear>2008</inceptionYear> <properties> <scala.version>2.11.7</scala.version> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.7</version> </dependency> <dependency> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.11</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-eclipse-plugin --> <dependency> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <version>2.5.1</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.4</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs</groupId> <artifactId>specs</artifactId> <version>1.2.5</version> <scope>test</scope> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.8</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project>
下面分别给出scala版和java版的wordcount
package org.example import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object App { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setMaster("local").setAppName("WordCount") val sc=new SparkContext(sparkConf) val lines:RDD[String]=sc.textFile("/Users/bytedance/Downloads/a.cpp") val words=lines.flatMap(_.split(" ")) val wordGroup:RDD[(String,Iterable[String])]=words.groupBy(word=>word) val array=wordGroup.map({ case (word,list)=>(word,list.size) }).collect() array.foreach(println) sc.stop() } }
package com.zkdx; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; public class WordCountSpark { public static void main(String[]args){ SparkConf conf = new SparkConf().setAppName("testWordCount").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("/Users/bytedance/Downloads/a.cpp"); JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator()); JavaPairRDD<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<String,Integer>(s,1)); JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey((Function2<Integer, Integer, Integer>) (v1, v2) -> v1+v2); wordCounts.foreach((VoidFunction<Tuple2<String, Integer>>) wordAndCount -> System.out.println(wordAndCount._1+wordAndCount._2)); sc.close(); } }
spark 2.4.7安装
https://mirrors.ustc.edu.cn/apache/spark/spark-2.4.7/
下载2.4.7 on hadoop2.7 版本的tgz文件. tgz的解压也可以用tar -zxvf
解压到/home/tangsong.math/repos/spark-2.4.7-bin-hadoop2.7/
cd conf
cp slaves.template slaves
默认这个slaves只有一个localhost
cp spark-env.sh.template spark-env.sh
配置为
export JAVA_HOME=/home/tangsong.math/jdk1.8.0_251
SPARK_MASTER_HOST=localhost
SPARK_MASTER_PORT=7077
我们在data目录新建一个文件
然后执行bin目录下的spark-shell
可以看到使用的是scala 2.11.12
已经预先准备好一个SparkContext.控制台会显示
Spark context available as 'sc' (master = local[*], app id = local-1616228222799).
Spark session available as 'spark'.
注意,此时的目录就是data目录.
执行
sc.textFile("words.txt").flatMap(o=>o.split(" ")).map(o=>(o,1)).reduceByKey((a,b)=>a+b).collect
要退出命令行环境可以输入:quit 注意有个冒号
spark自然也有一个web监控页面(退出命令行的话这个页面就打不开了)
http://10.227.26.77:4040/jobs/
提交任务,也可以用命令行.下面是一个自带的计算pi的例子
bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ./examples/jars/spark-examples_2.11-2.4.7.jar 10
为了使得退出命令行后仍然可以看历史记录,需要进行一些配置.首先启动hadoop.
访问hdfs的url为
hdfs://n227-026-077:9000
新建一个directory目录
bin/hadoop fs -mkdir /directory
在spark的conf目录下复制一下文件
cp spark-defaults.conf.template spark-defaults.conf
新增配置项
spark.eventLog.enabled true
spark.eventLog.dir hdfs://n227-026-077:9000/directory
再在spark-env.sh中新增配置
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://n227-026-077:9000/directory
-Dspark.history.retainedApplication=30"
启动spark
sbin/start-all.sh
sbin/start-history-server.sh
执行一个任务
bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://localhost:7077 ./examples/jars/spark-examples_2.11-2.4.7.jar 10
打开
http://10.227.26.77:18080/
即可看到历史记录.
为了向yarn cluster提交任务,需要修改spark-env.sh
export HADOOP_CONF_DIR=/home/tangsong.math/repos/hadoop-3.2.1/etc/hadoop
启动Hadoop后在spark的bin目录下执行
./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.4.7.jar 100
要查看结果需要到Hadoop的日志目录中
/home/tangsong.math/repos/hadoop-3.2.1/logs/userlogs/application_1618850623125_0002/container_1618850623125_0002_01_000001
cat stdout即可查看.当然也可以在yarn webui中查看这个task的日志
如果我们希望远程提交spark任务需要将spark-env.sh中的
SPARK_MASTER_HOST=localhost
改为
SPARK_MASTER_HOST=n227-026-077
现在我们可以
SparkConf conf = new SparkConf().setAppName("testWordCount").setMaster("spark://10.227.26.77:7077");
另外有一个默认的webui(不是jobhistory)
http://10.227.26.77:8080/
这里可以看正在运行的任务,也可以kill掉他们
如果出现下面的WARN
WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
然后任务一直running,打开webui查看worker的log
http://n227-026-077:8080/app/?appId=app-20210423224539-0000
实际上是因为worker连不上driver(driver就是在mac上运行那个)
spark on yarn有client和cluster两种模式.在client模式下,driver运行在提交任务的机器上.在cluster模式下driver是AM(ApplicationMaster)的一部分
在我们的mac idea控制台上可以看到类似于
21/04/23 23:23:20 INFO NettyBlockTransferService: Server created on 192.168.1.113:51414
21/04/23 23:23:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51414.
21/04/23 23:23:20 INFO NettyBlockTransferService: Server created on 192.168.1.113:51414
如果ifconfig | grep inet也可以看到本机ip.
Caused by: java.io.IOException: Failed to connect to /192.168.1.113:51412
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
at org.apache.spark.rpc.netty.Outbox$1.call(Outbox.scala:194)
at org.apache.spark.rpc.netty.Outbox$1.call(Outbox.scala:190)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
解决是手动设置driver.host以及jar
String[] jars={"/Users/bytedance/repos/untitled4/target/untitled4-1.0-SNAPSHOT.jar"};
SparkConf conf = new SparkConf().setAppName("testWordCount")
.setMaster("spark://10.227.26.77:7077")
.set("spark.executor.memory", "2g")
.set("spark.driver.host","10.254.5.116")
.setJars(jars);
下面报错可以参考https://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser/28367602#28367602
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, 10.227.26.77, executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDDLike$fn
1.f
fn
1
报错FileNotFound则可能是由于worker节点没有读到本地文件,所以最好用hdfs
报错java.lang.NumberFormatException: For input string: "30s"一般是spark与hadoop版本不匹配(很多坑)
./spark-shell 执行下面的代码,我们发现可以直接访问hdfs.
sc.textFile("hdfs://10.227.26.77:9000/input/query_result.tsv").flatMap(o =>o.split(" ")).map(o=>(o,1)).reduceByKey((a,b)=>a+b).collect().map(a=>(a.2,a._1)).sortBy(._1).reverse.map(a=>(a._2,a._1)).collect({
case o:(String,Int) => o
}
).foreach(println)
最终找到一个可以运行的版本(Cannot resolve org.apache.hadoop:hadoop-hdfs-client:2.7.7不过好像没影响?或者直接把这一项删掉)
HDFS的驱动类org.apache.hadoop.hdfs.DistributedFileSystem在
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-hdfs</artifactid>
中.
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.7</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.11</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-eclipse-plugin --> <dependency> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <version>2.5.1</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.4</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs</groupId> <artifactId>specs</artifactId> <version>1.2.5</version> <scope>test</scope> </dependency></dependencies> <build> <sourcedirectory>src/main/java</sourcedirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.8</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>${mainClass}</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> </plugins></build> <reporting> <plugins> <plugin> <groupid>org.scala-tools</groupid> <artifactid>maven-scala-plugin</artifactid> <configuration> <scalaversion>${scala.version}</scalaversion> </configuration> </plugin> </plugins> </reporting> </project>
测试结果:必须调用setJars(将jar上传到master)必须用实际的实现类而不是lambda否则会有SerializedLambda的问题(不同hadoop版本报的错可能还不同)
运行结果可以在spark对应application的日志的stdout看到打印到控制台的内容.改了代码的话要重新打包才会使得master上运行的是新版代码.
http://n227-026-077:8080/
package com.zkdx;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class WordCountSpark {
public static void main(String[]args) throws ClassNotFoundException {
Class.forName("org.apache.hadoop.hdfs.DistributedFileSystem");
String[] jars={"/Users/bytedance/repos/untitled4/target/untitled4-1.0-SNAPSHOT-jar-with-dependencies.jar"};
SparkConf conf = new SparkConf().setAppName("testWordCount")
.setMaster("spark://10.227.26.77:7077")
.set("spark.executor.memory", "2g")
.set("spark.driver.host","10.254.35.95")
.setJars(jars);
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<string> lines = sc.textFile("hdfs://10.227.26.77:9000/input/query_result.tsv");
JavaRDD<string> words = lines.flatMap(new Helper1());
JavaPairRDD<String, Integer> pairs = words.mapToPair(new Helper2());
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Helper3());
wordCounts.foreach(new Helper4());
sc.close();</string></string>
}
}
class Helper1 implements FlatMapFunction<String, String>{
@Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); }
}
class Helper2 implements PairFunction<String, String, Integer>{
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
}
class Helper3 implements Function2<Integer, Integer, Integer>{
@Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; }
}
class Helper4 implements VoidFunction<Tuple2<String, Integer>>{
@Override public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { System.out.println(stringIntegerTuple2._1+stringIntegerTuple2._2); }
}
SparkSQL
将hive的hive-site.xml拷到spark的conf目录下从spark的bin目录执行spark-sql即可进入.
有问题的case.
java.sql.SQLException: Cannot convert column 1 to integerjava.lang.NumberFormatException: For input string: "key_date1.id"
看起来返回结果类似于一个csv结构,最上面一行是字段名
key_date1.id key_date1.name key_date1.start_date
字段类型分别为int,string,string所以调用jd.show()或者jd.collectAsList()都会因为最上面一行的字段名中首个字段名不能转为int报错.
public class WordCountSpark {
public static void main(String[] args) throws ClassNotFoundException {
SparkConf conf = new SparkConf().setAppName("testSparkSQL").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); Class.forName("org.apache.hadoop.hdfs.DistributedFileSystem"); SQLContext sqlContext = new SQLContext(sc); String url = "jdbc:hive2://n227-026-077:10000/analysis"; Properties connectionProperties = new Properties(); connectionProperties.put("user", "root"); connectionProperties.put("password", ""); connectionProperties.put("driver", "org.apache.hive.jdbc.HiveDriver"); Dataset<Row> jd = sqlContext.read().jdbc(url, "key_date1", connectionProperties); System.out.println(jd.count()); List<Row> rows = jd.collectAsList(); for (int i = 0; i < rows.size(); i++) if (i == 0) System.out.println( String.format( "%s %s %s", rows.get(0).getString(1), rows.get(0).getString(1), rows.get(0).getString(2))); else System.out.println( new KeyDate(rows.get(i).getInt(1), rows.get(i).getString(2), rows.get(i).getString(3)));
}
public static class KeyDate implements Serializable {
private Integer id;
private String name;
private String startDate;
public KeyDate() {} public void setId(Integer id) { this.id = id; } public void setName(String name) { this.name = name; } public void setStartDate(String startDate) { this.startDate = startDate; } public KeyDate(Integer id, String name, String startDate) { this.id = id; this.name = name; this.startDate = startDate; } @Override public String toString() { return "KeyDate{" + "id=" + id + ", name='" + name + '\'' + ", startDate='" + startDate + '\'' + '}'; }
}
}
改用SparkSession也不行
SparkSession sparkSession =
SparkSession.builder().appName("HiveMySQLApp").master("local").getOrCreate();
String url = "jdbc:hive2://n227-026-077:10000/analysis"; Properties connectionProperties = new Properties(); connectionProperties.put("user", "root"); connectionProperties.put("password", ""); connectionProperties.put("driver", "org.apache.hive.jdbc.HiveDriver"); Dataset<Row> jd = sparkSession.read().jdbc(url, "key_date1", connectionProperties); jd.collectAsList();
在hive server2的页面上我们看到生成的sql是
SELECT "key_date1.id","key_date1.name","key_date1.start_date" FROM key_date1
注意这里的双引号,这会导致生成key_date表行数那么多的相同的3个字符串.
换成javaRDD也没用
SparkSession sparkSession =
SparkSession.builder().appName("HiveMySQLApp").master("local").getOrCreate();
String url = "jdbc:hive2://n227-026-077:10000"; Properties connectionProperties = new Properties(); connectionProperties.put("user", "root"); connectionProperties.put("password", ""); connectionProperties.put("driver", "org.apache.hive.jdbc.HiveDriver"); sparkSession .read() .jdbc(url, "analysis.key_date1", connectionProperties) .createOrReplaceTempView("view"); JavaRDD<Row> javaRDD = sparkSession.table("view").javaRDD(); javaRDD.foreach( row -> System.out.println(new KeyDate(row.getInt(1), row.getString(2), row.getString(3))));
似乎是包中代码冲突(反编译scala的spark-sql包后发现调用的会抛出异常的方法).但是为什么直接运行命令行的spark-sql没有这个问题
用spark2.4.7目录下的jar覆盖本地m2目录中的jar
/Users/bytedance/.m2/repository/org/apache/spark/spark-core_2.11/2.4.7
scp tangsong.math@10.227.26.77:/repos/spark-2.4.7-bin-hadoop2.7/jars/spark-sql_2.11-2.4.7.jar .//repos/spark-2.4.7-bin-hadoop2.7/jars/hive-jdbc-1.2.1.spark2.jar ./
/Users/bytedance/.m2/repository/org/spark-project/hive/hive-jdbc/1.2.1.spark2
scp tangsong.math@10.227.26.77:
事后证明是配置文件不对.resources目录下的hive-site.xml文件没写对.
更换了hive1.2.2以及hadoop2.7.7版本
注意给spark用的hive-site.xml中路径为本地路径.文件新增如下配置.(如果在开发机上部署的远程spark运行,则mysql的ip:port可以写成localhost:3306.最下面的hive.downloaded.resources.dir也要相应修改.
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://10.227.26.77:3306/hive?createDatabaseIfNotExist=true&serverTimezone=Asia/Shanghai</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property> <name>hive.querylog.location</name> <value>/data/hive_repo/querylog</value> </property> <property> <name>hive.exec.local.scratchdir</name> <value>/Users/bytedance/repos/scratchdir</value> </property> <property> <name>hive.downloaded.resources.dir</name> <value>/Users/bytedance/repos/resources</value> </property>
public static void main(String[] args) {
String url = "jdbc:hive2://n227-026-077:10000";
SparkSession sparkSession = SparkSession.builder() .appName("HiveMySQLApp") .config("url", url) .enableHiveSupport() .master("local") .getOrCreate(); sparkSession.sql("select* from analysis.key_date").show();
}
spark sql读取到的下标从0开始
List<row> rows = sparkSession.sql("select* from analysis.key_date").collectAsList();
for(Row row:rows){
System.out.println(new KeyDate(row.getInt(0),row.getString(1),row.getString(2)));
}</row>
最终配置
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemalocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelversion>4.0.0</modelversion>
<groupid>org.example</groupid>
<artifactid>untitled4</artifactid>
<version>1.0-SNAPSHOT</version>
<inceptionyear>2008</inceptionyear>
</project>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <mainClass>com.zkdx.WordCountSpark</mainClass> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <hadoop.version>2.7.3</hadoop.version> <spark.version>2.3.2</spark.version><dependencies> <dependency> <groupid>mysql</groupid> <artifactid>mysql-connector-java</artifactid> <version>8.0.16</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.2.1</version> </dependency> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client --> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-eclipse-plugin --> <dependency> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <version>2.5.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.4</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs</groupId> <artifactId>specs</artifactId> <version>1.2.5</version> <scope>test</scope> </dependency></dependencies> <build> <plugins> <plugin> <artifactid>maven-compiler-plugin</artifactid> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> <compilerargument>-proc:none</compilerargument>
</configuration> </plugin> <plugin> <groupId>com.coveo</groupId> <artifactId>fmt-maven-plugin</artifactId> <version>2.5.1</version> <executions> <execution> <goals> <goal>check</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <!--放置生成的文件覆盖我们自定义的文件--> <addMavenDescriptor>false</addMavenDescriptor> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"></transformer> </transformers><createDependencyReducedPom>false</createDependencyReducedPom> </configuration> </execution> </executions> </plugin> </plugins></configuration> </plugin>
也可以创建表,此时必须指定用户名密码否则有权限问题(直接访问HDFD因此是指定hadoop用户名)
当使用create as select语句的时候不能创建external table.创建external talel必须指定path.
String url = "jdbc:hive2://n227-026-077:10000";
System.getProperties().setProperty("HADOOP_USER_NAME", "root");
SparkSession sparkSession =
SparkSession.builder()
.appName("HiveMySQLApp")
.config("url", url)
.enableHiveSupport()
.master("local")
.getOrCreate();
sparkSession.sql( "create table analysis.key_date1 as select* from analysis.key_date where id>10");
运行pyspark
会提示
Using Python version 2.7.13 (default, Sep 26 2018 18:42:22)
SparkSession available as 'spark'.
输入
spark.sql('select* from analysis.key_date1').show()