17-Flink消费Kafka写入Mysql
- Flink入门
- Flink DataSet&DataSteam API
- Flink集群部署
- Flink重启策略
- Flink分布式缓存
- Flink广播变量
- Flink中的Time
- Flink中的窗口
- 时间戳和水印
....
本文介绍消费Kafka的消息实时写入Mysql
1. maven新增依赖:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.39</version> </dependency>
2.重写RichSinkFunction,实现一个Mysql Sink
public class MysqlSink extends
 RichSinkFunction<Tuple3<Integer, String, Integer>> {
 private Connection connection;
 private PreparedStatement preparedStatement;
 String username = "";
 String password = "";
 String drivername = ""; //配置改成自己的配置
 String dburl = "";
 @Override
 public void invoke(Tuple3<Integer, String, Integer> value) throws Exception {
 Class.forName(drivername);
 connection = DriverManager.getConnection(dburl, username, password);
 String sql = "replace into table(id,num,price) values(?,?,?)"; //假设mysql 有3列 id,num,price
 preparedStatement = connection.prepareStatement(sql);
 preparedStatement.setInt(1, value.f0);
 preparedStatement.setString(2, value.f1);
 preparedStatement.setInt(3, value.f2);
 preparedStatement.executeUpdate();
 if (preparedStatement != null) {
 preparedStatement.close();
 }
 if (connection != null) {
 connection.close();
 }
 }
}3. Flink主类
public class MysqlSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// 1,abc,100 类似这样的数据,当然也可以是很复杂的json数据,去做解析
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
env.getConfig().disableSysoutLogging(); //设置此可以屏蔽掉日记打印情况
env.getConfig().setRestartStrategy(
RestartStrategies.fixedDelayRestart(5, 5000));
env.enableCheckpointing(2000);
DataStream<String> stream = env
.addSource(consumer);
DataStream<Tuple3<Integer, String, Integer>> sourceStream = stream.filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value))
.map((MapFunction<String, Tuple3<Integer, String, Integer>>) value -> {
String[] args1 = value.split(",");
return new Tuple3<Integer, String, Integer>(Integer
.valueOf(args1[0]), args1[1],Integer
.valueOf(args1[2]));
});
sourceStream.addSink(new MysqlSink());
env.execute("data to mysql start");
}
}改成自己的本地配置试一试吧!
所有代码公众号回复Flink即可看到~

相关推荐
  sweetgirl0    2020-06-28  
   Kafka    2020-09-18  
   wangying    2020-11-13  
   shenzhenzsw    2020-10-09  
   guicaizhou    2020-09-30  
   jiaomrswang    2020-09-23  
   jyj0    2020-09-21  
   guicaizhou    2020-09-15  
   hannuotayouxi    2020-08-20  
   amwayy    2020-08-03  
   yangyutong00    2020-08-01  
   weikaixxxxxx    2020-08-01  
   PoppyEvan    2020-08-01  
   guicaizhou    2020-08-01  
 