Spark整合Elasticsearch-从ES读取数据
Spark整合Elasticsearch-从ES读取数据
由于ES集群在拉取数据时可以提供过滤功能,因此在采用ES集群作为spark运算时的数据来源时,
根据过滤条件在拉取的源头就可以过滤了(ES提供过滤),就不必像从hdfs那样必须全部加载进spark的内存根据filter算子过滤,费时费力。
代码:
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
object Spark2Elasticsearch {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("Spark2ES").setMaster("local[2]")
conf.set("es.nodes","hadoop1,hadoop2,hadoop3")
conf.set("es.port","9200")
conf.set("es.index.auto.create","true")
val sc =new SparkContext(conf)
val query:String =s"""{
"query" : {
"match_all" : {}
},
"filter" : {
"term" : {
"price" : 50.55
}
}
}"""
val rdd = sc.esRDD("store", query)
println(rdd.collect().toBuffer)
}
}
运行结果:

采坑点:
那个sc.esRDD方法其实是ES提供的jar包里的一个隐试转换,在import org.elasticsearch.spark._这个包下,
配置mavin依赖时注意spark的配套版本,本文1.6的spark依赖如下:

相关推荐
newbornzhao 2020-09-14
做对一件事很重要 2020-09-07
renjinlong 2020-09-03
明瞳 2020-08-19
李玉志 2020-08-19
mengyue 2020-08-07
molong0 2020-08-06
AFei00 2020-08-03
molong0 2020-08-03
wenwentana 2020-08-03
YYDU 2020-08-03
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。
sifeimeng 2020-08-03
心丨悦 2020-08-03
liangwenrong 2020-07-31
sifeimeng 2020-08-01
mengyue 2020-07-30
tigercn 2020-07-29
IceStreamLab 2020-07-29