Flink 基本算子map、keyBy、sum
核心代码:
object TransformTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")
//------------map--------------
val dataStream1 = streamFromFile.map(d => {
val arr = d.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
})
//------------keyBY--------------
//此时key是一个元组
val dataStream2: KeyedStream[SensorReading, Tuple] = dataStream1.keyBy(0)
val dataStream3: KeyedStream[SensorReading, Tuple] = dataStream1.keyBy("id")
//此时key是字段的类型
val dataStream4: KeyedStream[SensorReading, String] = dataStream1.keyBy(_.id)
//------------sum--------------
val dataStream5 = dataStream4.sum(2)
val dataStream6 = dataStream4.sum("temperature")
dataStream6.print
env.execute("transform test")
}
}
case class SensorReading(id: String, timestamp: Long, temperature: Double)sensor.txt文件内容:
sensor_1, 1547718199, 35.80018327300259 sensor_6, 1547718201, 15.402984393403084 sensor_7, 1547718202, 6.720945201171228 sensor_10, 1547718205, 38.101067604893444 sensor_1, 1547718200, 30.8 sensor_1, 1547718201, 40.8
输出结果:
SensorReading(sensor_1,1547718199,35.80018327300259) SensorReading(sensor_6,1547718201,15.402984393403084) SensorReading(sensor_7,1547718202,6.720945201171228) SensorReading(sensor_10,1547718205,38.101067604893444) SensorReading(sensor_1,1547718199,66.60018327300259) SensorReading(sensor_1,1547718199,107.40018327300258)
相关推荐
LczPtr 2020-07-17
lqxqust 2020-06-01
清溪算法 2020-05-25
Lius 2020-05-11
流云追风 2020-04-22
huavhuahua 2020-04-15
ITxiaobaibai 2020-03-07
陈云佳 2020-03-05
linmufeng 2020-02-21
waitwolf 2020-02-21
范范 2020-02-14
贤冰 2020-02-02
GoatSucker 2020-01-24
老和山下的小学童 2020-01-13
typhoonpython 2020-01-10
zhangchaoming 2020-01-04
xiefei0 2013-07-26
wannagonna 2013-07-21