spark操作hbase详细讲解

1.java API实现对hbase的操作

package testhbase;

import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class HbaseTest {

	/** * 配置ss */
	static Configuration config = null;
	private Connection connection = null;
	private Table table = null;

	@Before
	public void init() throws Exception {
		config = HBaseConfiguration.create();// 配置都封装成<k,v>
		config.set("hbase.zookeeper.quorum", "mini1,mini2,mini3,mini4");// zookeeper地址
		config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
		connection = ConnectionFactory.createConnection(config);
		/*connection.getTable(TableName.valueOf("test"))这种方式获得的连接是一种连接池的方式, 也可以使用new HTable()的方式创建一个单连接, 明显用连接池可以控制多个线程同时连接hbase的情况,优于new HTable()的方式创建一个单连接 */
		table = connection.getTable(TableName.valueOf("test"));

	}

	/** * 创建一个表 * 类似于shell命令中的create 'test3','info1','info2' --创建test表和info1族与info2族 * @throws Exception */
	@Test
	public void createTable() throws Exception {
		// 创建表管理类
		HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理
		// 创建表描述类
		TableName tableName = TableName.valueOf("test2"); // 表名称
		HTableDescriptor desc = new HTableDescriptor(tableName);
		// 创建列族的描述类
		HColumnDescriptor family = new HColumnDescriptor("info"); // 列族
		// 将列族添加到表中
		desc.addFamily(family);
		HColumnDescriptor family2 = new HColumnDescriptor("info2"); // 列族
		// 将列族添加到表中
		desc.addFamily(family2);
		// 创建表
		admin.createTable(desc); // 创建表
	}

	/* 删除表test2 * */
	@Test
	@SuppressWarnings("deprecation")
	public void deleteTable() throws MasterNotRunningException,
			ZooKeeperConnectionException, Exception {
		HBaseAdmin admin = new HBaseAdmin(config);
		//删除表之前必须将表disabled
		admin.disableTable("test2");
		admin.deleteTable("test2");
		admin.close();
	}

	/** * 向hbase中增加数据 * * @throws Exception */
	@SuppressWarnings({ "deprecation", "resource" })
	@Test
	public void insertData() throws Exception {
		Put put = new Put(Bytes.toBytes("11"));
		put.add(Bytes.toBytes("info1"),Bytes.toBytes("name"),Bytes.toBytes("zhangsan"));
		//插入多个put时可以new List<put>,然后table.put(list);
		//插入的数据是字典序排序的,后面添加的数据会覆盖原来的数据
		table.put(put);
	}

	/** * 修改数据 * * @throws Exception */
	@Test
	public void updateData() throws Exception {
		Put put = new Put(Bytes.toBytes("10"));
		put.add(Bytes.toBytes("info1"), Bytes.toBytes("name"), Bytes.toBytes("lisi1234"));
		put.add(Bytes.toBytes("info2"), Bytes.toBytes("age"), Bytes.toBytes(1234));
		//插入数据
		table.put(put);
		//提交
		table.flushCommits();
	}

	/** * 删除数据 * * @throws Exception */
	@Test
	public void deleteDate() throws Exception {
		//删除某个id的一行数据
		/*Delete delete = new Delete(Bytes.toBytes("1234")); table.delete(delete); table.flushCommits();*/

		//删除某个列族,或者删除某个列
		Delete delete = new Delete(Bytes.toBytes("1234"));
		//删除列族
		delete.addFamily(Bytes.toBytes("info1"));
		//删除列
		delete.addColumn(Bytes.toBytes("info2"),Bytes.toBytes("age"));
		table.delete(delete);
		table.flushCommits();
	}

	/** * 单条查询 * * @throws Exception */
	@Test
	public void queryData() throws Exception {
		//查询一行行键为10的数据
		Get get = new Get(Bytes.toBytes("10"));
		Result result = table.get(get);
		//result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))获取info1:name这一列的数据
		System.out.println("info1:name为"+Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
		System.out.println("info2:age为"+Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));

		System.out.println("-------------------------分隔符--------------------------");

		//只查询某一行
		Get get2 = new Get(Bytes.toBytes("10"));

		get2.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("name"));

		Result result2 = table.get(get2);
		//result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))获取info1:name这一列的数据
		System.out.println("info1:name为"+Bytes.toString(result2.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
		System.out.println("info2:age为"+Bytes.toString(result2.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));

	}

	/** * 全表扫描 * * @throws Exception */
	@Test
	public void scanData() throws Exception {
		Scan scan = new Scan();
		//下面两行代码表示:从主键为10扫描到11就停止了
		scan.setStartRow(Bytes.toBytes("10"));
		scan.setStopRow(Bytes.toBytes("12"));

		//全表扫描某一个列:此时除了info1:name列,其他列都为空
		scan.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("name"));

		ResultScanner scanner = table.getScanner(scan);
		for (Result result : scanner) {
			System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
			System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));
			System.out.println();
		}
	}

	/** * 全表扫描的过滤器 * 列值过滤器:用来定义“列”的<列名:列值>过滤规则:如过滤列info1:name=zhangsan1的列, * 凡是info1:name=zhangsan1的列所在的行都过滤掉出来 * @throws Exception */
	@Test
	public void scanDataByFilter1() throws Exception {

		// 创建全表扫描的scan
		Scan scan = new Scan();
		//过滤器:列值过滤器:参数1:过滤的列族 参数2:过滤的列名 参数3:过滤的规则(大于等于小于) 参数4:与参数3比较的值
		//CompareFilter.CompareOp.EQUAL表示行健等于10
		SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info1"),
				Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL,
				Bytes.toBytes("zhangsan"));
		// 设置过滤器
		scan.setFilter(filter);

		// 打印结果集
		ResultScanner scanner = table.getScanner(scan);
		for (Result result : scanner) {
			System.out.println("id" + Bytes.toString(result.getRow()));
			System.out.println("info1:name:" + Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
			System.out.println("info2:age" + Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));
			System.out.println();
		}

	}


	/** * rowkey过滤器:列名过滤器,过滤主键(正则表达式) * @throws Exception */
	@Test
	public void scanDataByFilter2() throws Exception {
		
		// 创建全表扫描的scan
		Scan scan = new Scan();
		//匹配rowkey以wangsenfeng开头的
		RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("[^1]"));
		// 设置过滤器
		scan.setFilter(filter);
		// 打印结果集
		ResultScanner scanner = table.getScanner(scan);
		for (Result result : scanner) {
			System.out.println("id= " + Bytes.toString(result.getRow()));
			System.out.println("info1:name=" + Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
			System.out.println("info2:age=" + Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));
			System.out.println();
		}

		
	}
	
	/** * 匹配列名前缀 * @throws Exception */
	@Test
	public void scanDataByFilter3() throws Exception {
		
		// 创建全表扫描的scan
		Scan scan = new Scan();
		//匹配列名中含有以na开头的列(如:含有info1:name的行)所在的行
		ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(Bytes.toBytes("na"));
		// 设置过滤器
		scan.setFilter(columnPrefixFilter);
		// 打印结果集
		ResultScanner results = table.getScanner(scan);

		for (Result result:results){
			System.out.println("id=" + Bytes.toString(result.getRow()));
			System.out.println("info1:name=" + Bytes.toString(result.getValue(Bytes.toBytes("info1"),Bytes.toBytes("name"))));
			System.out.println("info2:age=" + Bytes.toString(result.getValue(Bytes.toBytes("info2"),Bytes.toBytes("age"))));
			System.out.println();
		}
	}

	/** * 匹配列名多个前缀 * @throws Exception */
	@Test
	public void scanDataByFilter4() throws Exception {

		// 创建全表扫描的scan
		Scan scan = new Scan();
		//匹配列名中含有以nam开头的列和ag(如:含有info1:name的行和info2:age的行)所在的行
		byte[][] bytes = new byte[][]{Bytes.toBytes("nam"),Bytes.toBytes("ag")};
		MultipleColumnPrefixFilter mcpfilter = new MultipleColumnPrefixFilter(bytes);
		// 设置过滤器
		scan.setFilter(mcpfilter);
		// 打印结果集
		ResultScanner results = table.getScanner(scan);

		for (Result result:results){
			System.out.println("id=" + Bytes.toString(result.getRow()));
			System.out.println("info1:name=" + Bytes.toString(result.getValue(Bytes.toBytes("info1"),Bytes.toBytes("name"))));
			System.out.println("info2:age=" + Bytes.toString(result.getValue(Bytes.toBytes("info2"),Bytes.toBytes("age"))));
			System.out.println();
		}
	}

	/** * 过滤器集合:同时使用多个过滤器,有两种方式:多个过滤器是MUST_PASS_ALL(and),MUST_PASS_ONE(or) * @throws Exception */
	@Test
	public void scanDataByFilter5() throws Exception {
		
		// 创建全表扫描的scan
		Scan scan = new Scan();
		//过滤器集合:MUST_PASS_ALL(and),MUST_PASS_ONE(or)
		FilterList filterList = new FilterList(Operator.MUST_PASS_ONE);
		//匹配rowkey以wangsenfeng开头的
		RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^10"));//以20开头的
		//匹配name的值等于wangsenfeng
		SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("info1"),
				Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL,
				Bytes.toBytes("zhangsan"));//“info1:name = zhangsan”的行
		filterList.addFilter(filter);
		filterList.addFilter(filter2);
		// 设置过滤器
		scan.setFilter(filterList);
		// 打印结果集
		ResultScanner scanner = table.getScanner(scan);

		for (Result result:scanner) {
			System.out.println("id=" + Bytes.toString(result.getRow()));
			System.out.println("info1:name=" + Bytes.toString(result.getValue(Bytes.toBytes("info1"),Bytes.toBytes("name"))));
			System.out.println("info2:age=" + Bytes.toString(result.getValue(Bytes.toBytes("info2"),Bytes.toBytes("age"))));
			System.out.println();
		}
	}

	@After
	public void close() throws Exception {
		table.close();
		connection.close();
	}

}

然后可以在scala代码编写的DAO层中使用如下方式调用hbase

/** * 保存数据到HBase * @param list CourseClickCount集合 */
  def save(list: ListBuffer[CourseClickCount]): Unit = {

	// HBaseUtils使用单例模式产生一个HBaseUtils实例,其中封装了getTable()等增删改查的方法,和相关过滤器
    val table = HBaseUtils.getInstance().getTable(tableName)

    //将CourseClickCount这个pojo的数据保存到tableName
    for(ele <- list) {
      table.incrementColumnValue(Bytes.toBytes(ele.day_course),
        Bytes.toBytes(cf),
        Bytes.toBytes(qualifer),
        ele.click_count)
      //incrementColumnValue方法可以直接通过闯入的参数,直接用数据对ele.click_count自加
    }

  }

2.使用spark内置的TableInputFormat

Spark中内置提供了两个方法可以将数据写入到Hbase:
(1)saveAsHadoopDataset
(2)saveAsNewAPIHadoopDataset

pom.xml中

<!--增加远程下载仓库-->
    <repositories>
        <repository>
            <id> central-repos1</id>
            <name>Central Repository 2</name>
            <url>http://repo.hortonworks.com/content/groups/public/</url>
            <!--<url>http://repo1.maven.org/maven2</url>-->
        </repository>
    </repositories>

    <properties>
        <spark.version>2.4.0</spark.version>
        <hadoop.version>2.7.3</hadoop.version>
        <hbase.version>0.99.2</hbase.version>
    </properties>

    <dependencies>
        <!--spark hbase connnector-->
        <dependency>
            <groupId>com.hortonworks</groupId>
            <artifactId>shc-core</artifactId>
            <version>1.1.1-2.1-s_2.11</version>
        </dependency>

        <!--spark-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

        <!--hbase-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>

       <!--hadoop-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

    </dependencies>

数据

aaaa    1234
bbbb   4366
cccc    6577
dddd    1234
eeee   4366
ffff    7577

测试代码:


import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}

object SparkToHBase {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: SparkToHBase <input file>")
      System.exit(1)
    }
 
    val conf = new SparkConf().setAppName("SparkToHBase")
    val sc = new SparkContext(conf)
 
    val input = sc.textFile(args(0))
 
    //创建HBase配置
    val hConf = HBaseConfiguration.create()
    hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181")
 
    //创建JobConf,设置输出格式和表名
    val jobConf = new JobConf(hConf, this.getClass)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog")
 
    val data = input.map { item =>
      val Array(key, value) = item.split("\t")
      val rowKey = key.reverse
      val put = new Put(Bytes.toBytes(rowKey))
      put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value))
      (new ImmutableBytesWritable, put)
    }
    //保存到HBase表
    data.saveAsHadoopDataset(jobConf)
    sc.stop()
  }
}

3.SHC:Apache Spark—Apache HBase Connector

这个 TableInputFormat 有一些缺点:

  • 一个 Task 里面只能启动一个 Scan 去 HBase 中读取数据;
  • TableInputFormat 中不支持 BulkGet;
  • 不能享受到 Spark SQL 内置的 catalyst 引擎的优化。

通过这个类库,我们可以直接使用 Spark SQL 将 DataFrame 中的数据写入到 HBase 中;而且我们也可以使用 Spark SQL 去查询 HBase 中的数据,在查询 HBase 的时候充分利用了 catalyst 引擎做了许多优化,比如分区修剪(partition pruning),列修剪(column pruning),谓词下推(predicate pushdown)和数据本地性(data locality)等等。因为有了这些优化,通过 Spark 查询 HBase 的速度有了很大的提升。


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog

object spark_hbase2 {
  def main(args: Array[String]): Unit = {
    //hbase catalog
    //1.表名test
    //2.rowkey为id
    //3.列族为info
    val catalog = s"""{
                     |"table":{"namespace":"default", "name":"test"},
                     |"rowkey":"id",
                     |"columns":{
                     |"col0":{"cf":"rowkey", "col":"id", "type":"String"},
                     |"col1":{"cf":"info", "col":"value", "type":"String"},
                     |}
                     |}""".stripMargin


    val spark = SparkSession.builder()
      .appName("WriteHBase")
      .master("local")
      .config(new SparkConf().set("spark.testing.memory", "512000000"))
      .getOrCreate()

    val sc = spark.sparkContext

    val sqlContext = spark.sqlContext
    import sqlContext.implicits._

    // 模拟一批数据
    val data = Array(("spark hbase connector1","1")).map(x => HBaseRecord(x._1,x._2))

    //写数据,跟新
    sc.parallelize(data).toDF.write.options(
      Map(HBaseTableCatalog.tableCatalog -> catalog))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .save()

    // 写数据
    def withCatalog(cat: String): DataFrame = {
      sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
    }

    //dataframe操作hbase
    val df = withCatalog(catalog)
    //SQL example
    df.createOrReplaceTempView("table")
    sqlContext.sql("select count(info) from table").show

    spark.stop()
  }
}

case class HBaseRecord(col0: String,  // sql: string
                       col1: String)
全部评论

相关推荐

一颗宏心:华为HR晚上过了十二点后还给我法消息。
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务