Spark 与 JDBC、Hbase之间的交互

JDBC

以MySQL为例

读取

import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Author atguigu
 * Date 2020/5/9 10:23
 */
object JdbcRead {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("JdbcRead").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        
        val driver = "com.mysql.jdbc.Driver"
        val url = "jdbc:mysql://hadoop102:3306/rdd"
        val user = "root"
        val pw = "aaaaaa"
        val rdd = new JdbcRDD(
            sc,
            () => {
                Class.forName(driver)
                DriverManager.getConnection(url, user, pw)
                // 千万不要关闭连接
            },
            "select id, name from user where id >= ? and id <= ?",//?是占位符
            1,//对应第一个 ?
            10,//对应第二个 ?
            2,
            row => {
                (row.getInt("id"), row.getString("name"))
            }//读到的数据类型是Set集合
        )
        
        rdd.collect.foreach(println)
        
        sc.stop()
        /*
        jdbc编程:
            加载启动
            class.forName(..)
            DiverManager.get...
            conn.prestat..
                ...
                pre.ex
                resultSet
         */
        
    }
}

写入

import java.sql.DriverManager

import org.apache.spark.{SparkConf, SparkContext}

/**
 * Author atguigu
 * Date 2020/5/9 10:43
 */
object JdbcWrite {
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://hadoop102:3306/rdd"
    val user = "root"
    val pw = "aaaaaa"
    
    def main(args: Array[String]): Unit = {
        // 把rdd的数据写入到mysql
        val conf: SparkConf = new SparkConf().setAppName("JdbcWrite").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        // wordCount, 然后把wordCount的数据写入到mysql
        val wordCount = sc
            .textFile("c:/1128.txt")
            .flatMap(_.split("\\W+"))
            .map((_, 1))
            .reduceByKey(_ + _, 3)

        val sql = "insert into word_count1128 values(?, ?)"
        //一次写入一条,效率较低
        /*wordCount.foreachPartition( it => {
            // it就是存储的每个分区数据
            // 建立到mysql的连接
            Class.forName(driver)
            // 获取连接
            val conn = DriverManager.getConnection(url, user, pw)
            
            it.foreach{
                case (word, count) =>
                    val ps = conn.prepareStatement(sql)
                    ps.setString(1, word)
                    ps.setInt(2, count)
                    ps.execute()
                    ps.close()
            }
            conn.close()
        } )*/
        //最终优化版本
        wordCount.foreachPartition(it => {
            // it就是存储的每个分区数据
            // 建立到mysql的连接
            Class.forName(driver)
            // 获取连接
            val conn = DriverManager.getConnection(url, user, pw)
            val ps = conn.prepareStatement(sql)
            var max = 0 // 最大批次
            it.foreach {
                case (word, count) =>
                    ps.setString(1, word)
                    ps.setInt(2, count)
                    //批处理
                    ps.addBatch()
                    max += 1
                    if (max >= 10) {
                        ps.executeBatch()
                        ps.clearBatch()
                        max = 0
                    }
            }
            // 最后一次不到提交的上限,  做收尾
            ps.executeBatch()
            conn.close()
        })
        
        
        sc.stop()
        
    }
}

Hbase

读取

import java.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization

/**
 * Author atguigu
 * Date 2020/5/9 13:41
 **/
object HbaseRead {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("HbaseRead").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)

        val hbaseConf: Configuration = HBaseConfiguration.create()
      //配置参数
      hbaseConf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104") // zookeeper配置
        //要读取的表名
        hbaseConf.set(TableInputFormat.INPUT_TABLE, "student")

        // 通用的读法 noSql key-value cf
        val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
            hbaseConf,
            fClass = classOf[TableInputFormat], // InputFormat的类型
        //k v  类型,基本固定的格式
            kClass = classOf[ImmutableBytesWritable],
            vClass = classOf[Result]
        )

        val rdd2: RDD[String] = hbaseRDD.map {
            case (ibw, result) =>
                //                Bytes.toString(ibw.get())
                // 把每一行所有的列都读出来, 然后放在一个map中, 组成一个json字符串
                var map: Map[String, String] = Map[String, String]()
                // 先把row放进去
                map += "rowKey" -> Bytes.toString(ibw.get())
                // 拿出来所有的列
                val cells: util.List[Cell] = result.listCells()
                // 导入里面的一些隐式转换函数, 可以自动把java的集合转成scala的集合
                import scala.collection.JavaConversions._
                for (cell <- cells) { // for循环, 只支持scala的集合
                    val key: String = Bytes.toString(CellUtil.cloneQualifier(cell))
                    val value: String = Bytes.toString(CellUtil.cloneValue(cell))
                    map += key -> value
                }
                // 把map序列化成json字符串
                // json4s 专门为scala准备的json工具
                implicit val d: DefaultFormats = org.json4s.DefaultFormats
                Serialization.write(map)
        }
        //        rdd2.collect.foreach(println)
        rdd2.saveAsTextFile("./hbase")
        sc.stop()

    }
}

写入

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Author atguigu
 * Date 2020/5/9 13:41
 */
object HbaseWrite {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("HbaseWrite").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        
        val list: List[(String, String, String, String)] = List(
            ("2100", "zs", "male", "10"),
            ("2101", "li", "female", "11"),
            ("2102", "ww", "male", "12"))
        val rdd1: RDD[(String, String, String, String)] = sc.parallelize(list)
        
        // 把数据写入到Hbase
        // rdd1做成kv形式
        val resultRDD: RDD[(ImmutableBytesWritable, Put)] = rdd1.map {
            case (rowKey, name, gender, age) =>
                val rk: ImmutableBytesWritable = new ImmutableBytesWritable()
                //为rowkey赋值
                rk.set(toBytes(rowKey))
                val put: Put = new Put(toBytes(rowKey))
                //为各个列赋值
                put.addColumn(toBytes("cf"), toBytes("name"), toBytes(name))
                put.addColumn(toBytes("cf"), toBytes("gender"), toBytes(gender))
                put.addColumn(toBytes("cf"), toBytes("age"), toBytes(age))
                (rk, put)
        }
        
        val hbaseConf: Configuration = HBaseConfiguration.create()
        hbaseConf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104") // zookeeper配置
        hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "student")  // 输出表
        // 通过job来设置输出的格式的类
        val job: Job = Job.getInstance(hbaseConf)
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setOutputValueClass(classOf[Put])
        
        resultRDD.saveAsNewAPIHadoopDataset(conf = job.getConfiguration)
        
        sc.stop()
        
    }
}

相关推荐