golang 操作es 批量索引数据 Bulk

减少开销 提高效率 现有130万条数据 一条一条索引的话需要不停的跑需要两天左右 如果使用bulk 五分钟就完事儿了 

func IndexPrice() {
	es := tool.ES{
		Index: "financials.us.gama",
		Type:  "esstockprice",
	}
	var MaxId int
	MaxId = 0
	var price *model.PriceRecord
	// 每次取两万条数据索引两万条 记录最大id 下一次根据maxid查询
	for {
		bulkRequest := tool.Client.Bulk()
		// 每次只取2万条数据
		priceList := price.GetPrice2(MaxId)
		if len(priceList) == 0 {
			break
		}
		var upDateList []int
		for _, i := range priceList {
			upDateList = append(upDateList, i.ID)
			if i.ID > MaxId {
				MaxId = i.ID
			}
			id := i.Symbol + "_" + i.DateTime
			var esPrice *ESPrice
			esPrice = &ESPrice{
				ShortName:    i.Symbol,
				TradeDate:    i.DateTime,
				StockPriceId: id,
				IsTradeDate:  1,
				Open:         i.Open,
				Close:        i.Close,
				Volume:       i.Volume,
				High:         i.High,
				Low:          i.Low,
			}
			req := elastic.NewBulkIndexRequest().
				Index(es.Index).
				Type(es.Type).
				Id(id).
				Doc(esPrice)
			bulkRequest = bulkRequest.Add(req)
		}
		bulkResponse, err := bulkRequest.Do(context.Background())
		if err != nil {
			fmt.Println(err)
		}else{
			// 索引成功就修改数据库状态 is_index = 1
			if !price.UpdatePriceList(upDateList){
				fmt.Println("更新数据库状态错误")
			}
		}
		fmt.Println("耗时:",bulkResponse.Took, "索引了:",len(bulkResponse.Items))
	}
}

相关推荐