Spark 与 JDBC、Hbase之间的交互
JDBC
以MySQL为例
读取
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author atguigu
* Date 2020/5/9 10:23
*/
object JdbcRead {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("JdbcRead").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop102:3306/rdd"
val user = "root"
val pw = "aaaaaa"
val rdd = new JdbcRDD(
sc,
() => {
Class.forName(driver)
DriverManager.getConnection(url, user, pw)
// 千万不要关闭连接
},
"select id, name from user where id >= ? and id <= ?",//?是占位符
1,//对应第一个 ?
10,//对应第二个 ?
2,
row => {
(row.getInt("id"), row.getString("name"))
}//读到的数据类型是Set集合
)
rdd.collect.foreach(println)
sc.stop()
/*
jdbc编程:
加载启动
class.forName(..)
DiverManager.get...
conn.prestat..
...
pre.ex
resultSet
*/
}
}写入
import java.sql.DriverManager
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author atguigu
* Date 2020/5/9 10:43
*/
object JdbcWrite {
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop102:3306/rdd"
val user = "root"
val pw = "aaaaaa"
def main(args: Array[String]): Unit = {
// 把rdd的数据写入到mysql
val conf: SparkConf = new SparkConf().setAppName("JdbcWrite").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
// wordCount, 然后把wordCount的数据写入到mysql
val wordCount = sc
.textFile("c:/1128.txt")
.flatMap(_.split("\\W+"))
.map((_, 1))
.reduceByKey(_ + _, 3)
val sql = "insert into word_count1128 values(?, ?)"
//一次写入一条,效率较低
/*wordCount.foreachPartition( it => {
// it就是存储的每个分区数据
// 建立到mysql的连接
Class.forName(driver)
// 获取连接
val conn = DriverManager.getConnection(url, user, pw)
it.foreach{
case (word, count) =>
val ps = conn.prepareStatement(sql)
ps.setString(1, word)
ps.setInt(2, count)
ps.execute()
ps.close()
}
conn.close()
} )*/
//最终优化版本
wordCount.foreachPartition(it => {
// it就是存储的每个分区数据
// 建立到mysql的连接
Class.forName(driver)
// 获取连接
val conn = DriverManager.getConnection(url, user, pw)
val ps = conn.prepareStatement(sql)
var max = 0 // 最大批次
it.foreach {
case (word, count) =>
ps.setString(1, word)
ps.setInt(2, count)
//批处理
ps.addBatch()
max += 1
if (max >= 10) {
ps.executeBatch()
ps.clearBatch()
max = 0
}
}
// 最后一次不到提交的上限, 做收尾
ps.executeBatch()
conn.close()
})
sc.stop()
}
}Hbase
读取
import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization
/**
* Author atguigu
* Date 2020/5/9 13:41
**/
object HbaseRead {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("HbaseRead").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val hbaseConf: Configuration = HBaseConfiguration.create()
//配置参数
hbaseConf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104") // zookeeper配置
//要读取的表名
hbaseConf.set(TableInputFormat.INPUT_TABLE, "student")
// 通用的读法 noSql key-value cf
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
hbaseConf,
fClass = classOf[TableInputFormat], // InputFormat的类型
//k v 类型,基本固定的格式
kClass = classOf[ImmutableBytesWritable],
vClass = classOf[Result]
)
val rdd2: RDD[String] = hbaseRDD.map {
case (ibw, result) =>
// Bytes.toString(ibw.get())
// 把每一行所有的列都读出来, 然后放在一个map中, 组成一个json字符串
var map: Map[String, String] = Map[String, String]()
// 先把row放进去
map += "rowKey" -> Bytes.toString(ibw.get())
// 拿出来所有的列
val cells: util.List[Cell] = result.listCells()
// 导入里面的一些隐式转换函数, 可以自动把java的集合转成scala的集合
import scala.collection.JavaConversions._
for (cell <- cells) { // for循环, 只支持scala的集合
val key: String = Bytes.toString(CellUtil.cloneQualifier(cell))
val value: String = Bytes.toString(CellUtil.cloneValue(cell))
map += key -> value
}
// 把map序列化成json字符串
// json4s 专门为scala准备的json工具
implicit val d: DefaultFormats = org.json4s.DefaultFormats
Serialization.write(map)
}
// rdd2.collect.foreach(println)
rdd2.saveAsTextFile("./hbase")
sc.stop()
}
}写入
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author atguigu
* Date 2020/5/9 13:41
*/
object HbaseWrite {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("HbaseWrite").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val list: List[(String, String, String, String)] = List(
("2100", "zs", "male", "10"),
("2101", "li", "female", "11"),
("2102", "ww", "male", "12"))
val rdd1: RDD[(String, String, String, String)] = sc.parallelize(list)
// 把数据写入到Hbase
// rdd1做成kv形式
val resultRDD: RDD[(ImmutableBytesWritable, Put)] = rdd1.map {
case (rowKey, name, gender, age) =>
val rk: ImmutableBytesWritable = new ImmutableBytesWritable()
//为rowkey赋值
rk.set(toBytes(rowKey))
val put: Put = new Put(toBytes(rowKey))
//为各个列赋值
put.addColumn(toBytes("cf"), toBytes("name"), toBytes(name))
put.addColumn(toBytes("cf"), toBytes("gender"), toBytes(gender))
put.addColumn(toBytes("cf"), toBytes("age"), toBytes(age))
(rk, put)
}
val hbaseConf: Configuration = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104") // zookeeper配置
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "student") // 输出表
// 通过job来设置输出的格式的类
val job: Job = Job.getInstance(hbaseConf)
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Put])
resultRDD.saveAsNewAPIHadoopDataset(conf = job.getConfiguration)
sc.stop()
}
} 相关推荐
ASoc 2020-11-14
Cherishyuu 2020-08-19
dongtiandeyu 2020-08-18
CoderYYN 2020-08-16
大黑牛 2020-08-15
Dullonjiang 2020-08-11
gaozhennan 2020-08-03
mcvsyy 2020-08-02
zbcaicai 2020-07-29
AscaryBird 2020-07-27
liulin0 2020-07-26
ldcwang 2020-07-26
helloxusir 2020-07-25
娜娜 2020-07-20
pengpengflyjhp 2020-07-19
点滴技术生活 2020-07-19
人可 2020-07-18
chenjiazhu 2020-07-08