小编给大家分享一下spark与hbase怎么用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
package hgs.spark.hbase
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
object HbaseTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf
conf.setMaster("local").setAppName("local")
val context = new SparkContext(conf)
val hadoopconf = new HBaseConfiguration
hadoopconf.set("hbase.zookeeper.quorum", "bigdata01:2181,bigdata02:2181,bigdata03:2181")
hadoopconf.set("hbase.zookeeper.property.clientPort", "2181")
val tableName = "test1"
hadoopconf.set(TableInputFormat.INPUT_TABLE, tableName)
hadoopconf.set(TableInputFormat.SCAN_ROW_START, "h")
hadoopconf.set(TableInputFormat.SCAN_ROW_STOP, "x")
hadoopconf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "cf1")
hadoopconf.set(TableInputFormat.SCAN_COLUMNS, "cf1:col1,cf1:col2")
/*val startrow = "h"
val stoprow = "w"
val scan = new Scan
scan.setStartRow(startrow.getBytes)
scan.setStartRow(stoprow.getBytes)
val proto = ProtobufUtil.toScan(scan)
val scanToString = Base64.encodeBytes(proto.toByteArray())
println(scanToString)
hadoopconf.set(TableInputFormat.SCAN, scanToString)
*/
val hbaseRdd = context.newAPIHadoopRDD(hadoopconf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hbaseRdd.foreach(x=>{
val vale = x._2.getValue("cf1".getBytes, "col1".getBytes)
val val2 = x._2.getValue("cf1".getBytes, "col2".getBytes)
println(new String(vale),new String(val2))
})
context.stop()
}
}
package hgs.spark.hbase
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
object SparkToHbase {
def main(args: Array[String]): Unit = {
val conf = new SparkConf
conf.setMaster("local").setAppName("local")
val context = new SparkContext(conf)
val rdd = context.parallelize(List(("aaaaaaa","aaaaaaa"),("bbbbb","bbbbb")), 2)
val hadoopconf = new HBaseConfiguration
hadoopconf.set("hbase.zookeeper.quorum", "bigdata01:2181,bigdata02:2181,bigdata03:2181")
hadoopconf.set("hbase.zookeeper.property.clientPort", "2181")
hadoopconf.set(TableOutputFormat.OUTPUT_TABLE, "test1")
//hadoopconf.set(TableOutputFormat., "test1")
val jobconf = new JobConf(hadoopconf,this.getClass)
jobconf.set(TableOutputFormat.OUTPUT_TABLE, "test1")
jobconf.setOutputFormat(classOf[TableOutputFormat])
val exterrdd = rdd.map(x=>{
val put = new Put(x._1.getBytes)
put.add("cf1".getBytes, "col1".getBytes, x._2.getBytes)
(new ImmutableBytesWritable,put)
})
exterrdd.saveAsHadoopDataset(jobconf)
context.stop()
}
}
看完了这篇文章,相信你对“spark与hbase怎么用”有了一定的了解,如果想了解更多相关知识,欢迎关注天达云行业资讯频道,感谢各位的阅读!