Sqoop 导数据到HDFS, 用Spark SQL进行查询
1. 启动HDFS:
cd /usr/local/hadoop-2.7.7/sbin
./start-dfs.sh
2.启动Yarn:
cd cd /usr/local/hadoop-2.7.7/sbin
./start-yarn.sh
3.启动Spark:
/usr/local/spark-2.3.3-bin-hadoop2.7/sbin
./start-master.sh -h 192.168.96.12
./start-slave.sh spark://192.168.96.128:7077
4.创建Sqoop导入任务:
./sqoop-job \
--meta-connect jdbc:hsqldb:hsql://192.168.96.128:16000/sqoop \
--create t_order_increment_job \
-- import --connect jdbc:mysql://192.168.96.1:3306/demo_ds_0?serverTimezone=Asia/Shanghai \
--username root -P \
--append \
--table t_order_increment \
--incremental lastmodified \
--check-column my_time \
--last-value '2019-08-30 21:36:16' \
--target-dir /increment/t_order_increment
5.执行导入任务:
./sqoop-job \
--meta-connect jdbc:hsqldb:hsql://192.168.96.128:16000/sqoop \
--exec t_order_increment_job
6.Spark SQL进行查询的Java代码:
public class IncrementApplication {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("SparkApplication")
.config("spark.master", "spark://192.168.96.128:7077")
.config("spark.jars", "/usr/local/workspace/spark-test-1.0-SNAPSHOT-shaded.jar")
.getOrCreate();
JavaRDD<Order> orderRdd = spark.read().text("hdfs://192.168.96.128:9000/increment/t_order_increment/").javaRDD().map(
line -> {
Order order = new Order();
String[] items = line.getString(0).split(",");
Integer orderId = Integer.valueOf(items[0]);
order.setOrderId(orderId);
Integer userId = Integer.valueOf(items[1]);
order.setUserId(userId);
order.setStatus(items[2]);
return order;
}
);
Dataset<Row> orderDataSet = spark.createDataFrame(orderRdd, Order.class);
orderDataSet.createOrReplaceTempView("order");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM order");
sqlDF.show();
}
}附录:
删除HDFS文件的命令:
cd /usr/local/hadoop-2.7.7/bin
./hadoop dfs -rm -R /increment/*