spark操作hbase详细讲解
1.java API实现对hbase的操作
package testhbase;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class HbaseTest {
/** * 配置ss */
static Configuration config = null;
private Connection connection = null;
private Table table = null;
@Before
public void init() throws Exception {
config = HBaseConfiguration.create();// 配置都封装成<k,v>
config.set("hbase.zookeeper.quorum", "mini1,mini2,mini3,mini4");// zookeeper地址
config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
connection = ConnectionFactory.createConnection(config);
/*connection.getTable(TableName.valueOf("test"))这种方式获得的连接是一种连接池的方式, 也可以使用new HTable()的方式创建一个单连接, 明显用连接池可以控制多个线程同时连接hbase的情况,优于new HTable()的方式创建一个单连接 */
table = connection.getTable(TableName.valueOf("test"));
}
/** * 创建一个表 * 类似于shell命令中的create 'test3','info1','info2' --创建test表和info1族与info2族 * @throws Exception */
@Test
public void createTable() throws Exception {
// 创建表管理类
HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理
// 创建表描述类
TableName tableName = TableName.valueOf("test2"); // 表名称
HTableDescriptor desc = new HTableDescriptor(tableName);
// 创建列族的描述类
HColumnDescriptor family = new HColumnDescriptor("info"); // 列族
// 将列族添加到表中
desc.addFamily(family);
HColumnDescriptor family2 = new HColumnDescriptor("info2"); // 列族
// 将列族添加到表中
desc.addFamily(family2);
// 创建表
admin.createTable(desc); // 创建表
}
/* 删除表test2 * */
@Test
@SuppressWarnings("deprecation")
public void deleteTable() throws MasterNotRunningException,
ZooKeeperConnectionException, Exception {
HBaseAdmin admin = new HBaseAdmin(config);
//删除表之前必须将表disabled
admin.disableTable("test2");
admin.deleteTable("test2");
admin.close();
}
/** * 向hbase中增加数据 * * @throws Exception */
@SuppressWarnings({ "deprecation", "resource" })
@Test
public void insertData() throws Exception {
Put put = new Put(Bytes.toBytes("11"));
put.add(Bytes.toBytes("info1"),Bytes.toBytes("name"),Bytes.toBytes("zhangsan"));
//插入多个put时可以new List<put>,然后table.put(list);
//插入的数据是字典序排序的,后面添加的数据会覆盖原来的数据
table.put(put);
}
/** * 修改数据 * * @throws Exception */
@Test
public void updateData() throws Exception {
Put put = new Put(Bytes.toBytes("10"));
put.add(Bytes.toBytes("info1"), Bytes.toBytes("name"), Bytes.toBytes("lisi1234"));
put.add(Bytes.toBytes("info2"), Bytes.toBytes("age"), Bytes.toBytes(1234));
//插入数据
table.put(put);
//提交
table.flushCommits();
}
/** * 删除数据 * * @throws Exception */
@Test
public void deleteDate() throws Exception {
//删除某个id的一行数据
/*Delete delete = new Delete(Bytes.toBytes("1234")); table.delete(delete); table.flushCommits();*/
//删除某个列族,或者删除某个列
Delete delete = new Delete(Bytes.toBytes("1234"));
//删除列族
delete.addFamily(Bytes.toBytes("info1"));
//删除列
delete.addColumn(Bytes.toBytes("info2"),Bytes.toBytes("age"));
table.delete(delete);
table.flushCommits();
}
/** * 单条查询 * * @throws Exception */
@Test
public void queryData() throws Exception {
//查询一行行键为10的数据
Get get = new Get(Bytes.toBytes("10"));
Result result = table.get(get);
//result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))获取info1:name这一列的数据
System.out.println("info1:name为"+Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
System.out.println("info2:age为"+Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));
System.out.println("-------------------------分隔符--------------------------");
//只查询某一行
Get get2 = new Get(Bytes.toBytes("10"));
get2.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("name"));
Result result2 = table.get(get2);
//result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))获取info1:name这一列的数据
System.out.println("info1:name为"+Bytes.toString(result2.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
System.out.println("info2:age为"+Bytes.toString(result2.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));
}
/** * 全表扫描 * * @throws Exception */
@Test
public void scanData() throws Exception {
Scan scan = new Scan();
//下面两行代码表示:从主键为10扫描到11就停止了
scan.setStartRow(Bytes.toBytes("10"));
scan.setStopRow(Bytes.toBytes("12"));
//全表扫描某一个列:此时除了info1:name列,其他列都为空
scan.addColumn(Bytes.toBytes("info1"), Bytes.toBytes("name"));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));
System.out.println();
}
}
/** * 全表扫描的过滤器 * 列值过滤器:用来定义“列”的<列名:列值>过滤规则:如过滤列info1:name=zhangsan1的列, * 凡是info1:name=zhangsan1的列所在的行都过滤掉出来 * @throws Exception */
@Test
public void scanDataByFilter1() throws Exception {
// 创建全表扫描的scan
Scan scan = new Scan();
//过滤器:列值过滤器:参数1:过滤的列族 参数2:过滤的列名 参数3:过滤的规则(大于等于小于) 参数4:与参数3比较的值
//CompareFilter.CompareOp.EQUAL表示行健等于10
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info1"),
Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL,
Bytes.toBytes("zhangsan"));
// 设置过滤器
scan.setFilter(filter);
// 打印结果集
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println("id" + Bytes.toString(result.getRow()));
System.out.println("info1:name:" + Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
System.out.println("info2:age" + Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));
System.out.println();
}
}
/** * rowkey过滤器:列名过滤器,过滤主键(正则表达式) * @throws Exception */
@Test
public void scanDataByFilter2() throws Exception {
// 创建全表扫描的scan
Scan scan = new Scan();
//匹配rowkey以wangsenfeng开头的
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("[^1]"));
// 设置过滤器
scan.setFilter(filter);
// 打印结果集
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println("id= " + Bytes.toString(result.getRow()));
System.out.println("info1:name=" + Bytes.toString(result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"))));
System.out.println("info2:age=" + Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"))));
System.out.println();
}
}
/** * 匹配列名前缀 * @throws Exception */
@Test
public void scanDataByFilter3() throws Exception {
// 创建全表扫描的scan
Scan scan = new Scan();
//匹配列名中含有以na开头的列(如:含有info1:name的行)所在的行
ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(Bytes.toBytes("na"));
// 设置过滤器
scan.setFilter(columnPrefixFilter);
// 打印结果集
ResultScanner results = table.getScanner(scan);
for (Result result:results){
System.out.println("id=" + Bytes.toString(result.getRow()));
System.out.println("info1:name=" + Bytes.toString(result.getValue(Bytes.toBytes("info1"),Bytes.toBytes("name"))));
System.out.println("info2:age=" + Bytes.toString(result.getValue(Bytes.toBytes("info2"),Bytes.toBytes("age"))));
System.out.println();
}
}
/** * 匹配列名多个前缀 * @throws Exception */
@Test
public void scanDataByFilter4() throws Exception {
// 创建全表扫描的scan
Scan scan = new Scan();
//匹配列名中含有以nam开头的列和ag(如:含有info1:name的行和info2:age的行)所在的行
byte[][] bytes = new byte[][]{Bytes.toBytes("nam"),Bytes.toBytes("ag")};
MultipleColumnPrefixFilter mcpfilter = new MultipleColumnPrefixFilter(bytes);
// 设置过滤器
scan.setFilter(mcpfilter);
// 打印结果集
ResultScanner results = table.getScanner(scan);
for (Result result:results){
System.out.println("id=" + Bytes.toString(result.getRow()));
System.out.println("info1:name=" + Bytes.toString(result.getValue(Bytes.toBytes("info1"),Bytes.toBytes("name"))));
System.out.println("info2:age=" + Bytes.toString(result.getValue(Bytes.toBytes("info2"),Bytes.toBytes("age"))));
System.out.println();
}
}
/** * 过滤器集合:同时使用多个过滤器,有两种方式:多个过滤器是MUST_PASS_ALL(and),MUST_PASS_ONE(or) * @throws Exception */
@Test
public void scanDataByFilter5() throws Exception {
// 创建全表扫描的scan
Scan scan = new Scan();
//过滤器集合:MUST_PASS_ALL(and),MUST_PASS_ONE(or)
FilterList filterList = new FilterList(Operator.MUST_PASS_ONE);
//匹配rowkey以wangsenfeng开头的
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^10"));//以20开头的
//匹配name的值等于wangsenfeng
SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("info1"),
Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL,
Bytes.toBytes("zhangsan"));//“info1:name = zhangsan”的行
filterList.addFilter(filter);
filterList.addFilter(filter2);
// 设置过滤器
scan.setFilter(filterList);
// 打印结果集
ResultScanner scanner = table.getScanner(scan);
for (Result result:scanner) {
System.out.println("id=" + Bytes.toString(result.getRow()));
System.out.println("info1:name=" + Bytes.toString(result.getValue(Bytes.toBytes("info1"),Bytes.toBytes("name"))));
System.out.println("info2:age=" + Bytes.toString(result.getValue(Bytes.toBytes("info2"),Bytes.toBytes("age"))));
System.out.println();
}
}
@After
public void close() throws Exception {
table.close();
connection.close();
}
}
然后可以在scala代码编写的DAO层中使用如下方式调用hbase
/** * 保存数据到HBase * @param list CourseClickCount集合 */
def save(list: ListBuffer[CourseClickCount]): Unit = {
// HBaseUtils使用单例模式产生一个HBaseUtils实例,其中封装了getTable()等增删改查的方法,和相关过滤器
val table = HBaseUtils.getInstance().getTable(tableName)
//将CourseClickCount这个pojo的数据保存到tableName
for(ele <- list) {
table.incrementColumnValue(Bytes.toBytes(ele.day_course),
Bytes.toBytes(cf),
Bytes.toBytes(qualifer),
ele.click_count)
//incrementColumnValue方法可以直接通过闯入的参数,直接用数据对ele.click_count自加
}
}
2.使用spark内置的TableInputFormat
Spark中内置提供了两个方法可以将数据写入到Hbase:
(1)saveAsHadoopDataset
(2)saveAsNewAPIHadoopDataset
pom.xml中
<!--增加远程下载仓库-->
<repositories>
<repository>
<id> central-repos1</id>
<name>Central Repository 2</name>
<url>http://repo.hortonworks.com/content/groups/public/</url>
<!--<url>http://repo1.maven.org/maven2</url>-->
</repository>
</repositories>
<properties>
<spark.version>2.4.0</spark.version>
<hadoop.version>2.7.3</hadoop.version>
<hbase.version>0.99.2</hbase.version>
</properties>
<dependencies>
<!--spark hbase connnector-->
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc-core</artifactId>
<version>1.1.1-2.1-s_2.11</version>
</dependency>
<!--spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!--hbase-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<!--hadoop-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
数据
aaaa 1234
bbbb 4366
cccc 6577
dddd 1234
eeee 4366
ffff 7577
测试代码:
import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
object SparkToHBase {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: SparkToHBase <input file>")
System.exit(1)
}
val conf = new SparkConf().setAppName("SparkToHBase")
val sc = new SparkContext(conf)
val input = sc.textFile(args(0))
//创建HBase配置
val hConf = HBaseConfiguration.create()
hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181")
//创建JobConf,设置输出格式和表名
val jobConf = new JobConf(hConf, this.getClass)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog")
val data = input.map { item =>
val Array(key, value) = item.split("\t")
val rowKey = key.reverse
val put = new Put(Bytes.toBytes(rowKey))
put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value))
(new ImmutableBytesWritable, put)
}
//保存到HBase表
data.saveAsHadoopDataset(jobConf)
sc.stop()
}
}
3.SHC:Apache Spark—Apache HBase Connector
这个 TableInputFormat 有一些缺点:
- 一个 Task 里面只能启动一个 Scan 去 HBase 中读取数据;
- TableInputFormat 中不支持 BulkGet;
- 不能享受到 Spark SQL 内置的 catalyst 引擎的优化。
通过这个类库,我们可以直接使用 Spark SQL 将 DataFrame 中的数据写入到 HBase 中;而且我们也可以使用 Spark SQL 去查询 HBase 中的数据,在查询 HBase 的时候充分利用了 catalyst 引擎做了许多优化,比如分区修剪(partition pruning),列修剪(column pruning),谓词下推(predicate pushdown)和数据本地性(data locality)等等。因为有了这些优化,通过 Spark 查询 HBase 的速度有了很大的提升。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog
object spark_hbase2 {
def main(args: Array[String]): Unit = {
//hbase catalog
//1.表名test
//2.rowkey为id
//3.列族为info
val catalog = s"""{
|"table":{"namespace":"default", "name":"test"},
|"rowkey":"id",
|"columns":{
|"col0":{"cf":"rowkey", "col":"id", "type":"String"},
|"col1":{"cf":"info", "col":"value", "type":"String"},
|}
|}""".stripMargin
val spark = SparkSession.builder()
.appName("WriteHBase")
.master("local")
.config(new SparkConf().set("spark.testing.memory", "512000000"))
.getOrCreate()
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
// 模拟一批数据
val data = Array(("spark hbase connector1","1")).map(x => HBaseRecord(x._1,x._2))
//写数据,跟新
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
// 写数据
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
//dataframe操作hbase
val df = withCatalog(catalog)
//SQL example
df.createOrReplaceTempView("table")
sqlContext.sql("select count(info) from table").show
spark.stop()
}
}
case class HBaseRecord(col0: String, // sql: string
col1: String)