kafka-storm 详细

kafka storm 安装:
15.安装kafka cd /usr/local/ wget http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz tar xf kafka_2.10-0.10.0.0.tgz ln -s /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka chown -R hdfs:hadoop /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka chown -R root:root /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka /usr/local/zookeeper/bin/zkCli.sh create /kafka '' vim /usr/local/kafka/config/server.properties broker.id=0 zookeeper.connect=dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka scp -r /usr/local/kafka_2.10-0.10.0.0.tgz [email protected]:/usr/local/ scp -r /usr/local/kafka_2.10-0.10.0.0.tgz [email protected]:/usr/local/ scp -r /usr/local/kafka/config/server.properties [email protected]:/usr/local/kafka/config/server.properties scp -r /usr/local/kafka/config/server.properties [email protected]:/usr/local/kafka/config/server.properties master slave 启动 /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & 创建topic /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --replication-factor 3 --partitions 5 --topic baoy-topic /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --topic baoy-topic /usr/local/kafka/bin/kafka-console-producer.sh --broker-list dev10.aoiplus.openpf:9092,dev05.aoiplus.openpf:9092,dev06.aoiplus.openpf:9092 --topic baoy-topic /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper dev10.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181/kafka --from-beginning --topic baoy-topic
16. storm 安装 cd /usr/local/ wget http://mirrors.cnnic.cn/apache/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz tar xf apache-storm-0.10.0.tar.gz ln -s /usr/local/apache-storm-0.10.0 /usr/local/storm chown -R storm:storm /usr/local/apache-storm-0.10.0 /usr/local/storm chown -R root:root /usr/local/apache-storm-0.10.0 /usr/local/storm mkdir -p /tmp/storm/data/ cd storm vim conf/storm.yaml storm.zookeeper.servers: - "dev10.aoiplus.openpf" - "dev05.aoiplus.openpf" - "dev06.aoiplus.openpf" storm.zookeeper.port: 2181 nimbus.host: "dev10.aoiplus.openpf" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 storm.local.dir: "/tmp/storm/data" scp -r /usr/local/storm/conf/storm.yaml [email protected]:/usr/local/storm/conf/ scp -r /usr/local/storm/conf/storm.yaml [email protected]:/usr/local/storm/conf/ 启动 master /usr/local/storm/bin/storm nimbus >/dev/null 2>&1 & /usr/local/storm/bin/storm ui >/dev/null 2>&1 & slaves /usr/local/storm/bin/storm supervisor >/dev/null 2>&1 & 查看 http://dev10.aoiplus.openpf/index.html cp /usr/local/kafka/libs/kafka_2.10-0.10.0.0.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/scala-library-2.10.6.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/snappy-java-1.1.2.4.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/zkclient-0.8.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/log4j-1.2.17.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/slf4j-api-1.7.21.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/ /usr/local/storm/bin/storm jar /home/baoy/soft/storm/KafkaStormJavaDemo_main_start.jar com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology "terminalInfosAnalysisTopology" mkdir -p /home/baoy/soft/storm/logs chmod -R 777 /home/baoy/soft/storm/logs cd /usr/local/storm/log4j2/ vim cluster.xml <property name="logpath">/home/baoy/soft/storm</property> 关闭 storm /usr/local/storm/bin/storm kill terminalInfosAnalysisTopology



pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.curiousby.baoy.cn</groupId>
<artifactId>KafkaStormJavaDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>SpringKafkaStormDemo</name>
<url>http://maven.apache.org</url>
<!-- properties constant -->
<properties>
<spring.version>4.2.5.RELEASE</spring.version>
<java.version>1.7</java.version>
</properties>
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160810</version>
</dependency>
<!-- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.4</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.4</version>
<type>jar</type>
</dependency> -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.6</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<finalName>SpringKafkaStormDemo</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<dependencies>
<dependency>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-compiler-javac</artifactId>
<version>2.5</version>
</dependency>
</dependencies>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
<compilerArguments>
<verbose />
<bootclasspath>${java.home}/lib/rt.jar:${java.home}/lib/jce.jar</bootclasspath>
</compilerArguments>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<finalName>${project.artifactId}_TerminalInfosAnalysisTopology_main_start</finalName>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<finalName>${project.artifactId}_main_start</finalName>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>package com.curiousby.baoyou.cn.storm;
import java.util.UUID;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
/**
* @see com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology
* @Type TerminalInfosAnalysisTopology.java
* @Desc
* @author cmcc-B100036
* @date 2016年12月15日 下午4:54:50
* @version
*/
public class TerminalInfosAnalysisTopology {
private static String topicName = "baoy-topic";
private static String zkRoot = "" ;
public static void main(String[] args) {
BrokerHosts hosts = new ZkHosts(
"172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka");
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, UUID.randomUUID().toString());
spoutConfig.forceFromStart= false;
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", kafkaSpout);
builder.setBolt("terminalInfosAnalysisRedisBolt", new TerminalInfosAnalysisRedisBolt(),2).shuffleGrouping("kafkaSpout");
builder.setBolt("terminalInfosAnalysisElasticsearchBolt", new TerminalInfosAnalysisElasticsearchBolt(), 2).shuffleGrouping("kafkaSpout");
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(2);
try {
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
} else {
conf.setMaxSpoutPending(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("terminalInfosAnalysisTopology", conf, builder.createTopology());
}
}
}public class TerminalInfosAnalysisRedisBolt extends BaseRichBolt {
private Logger logger =LoggerFactory.getLogger(TerminalInfosAnalysisRedisBolt.class);
private OutputCollector collector;
@Override
public void execute(Tuple tuple) {
JSONObject formate = TerminalInfos.formate(tuple.getString(0));
TerminalInfos entity = new TerminalInfos();
entity.formate(formate);
if (entity != null) {
System.out.println(entity);
logger.info("===========================================================");
logger.info("========================="+entity+"=========================");
logger.info("===========================================================");
}
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}使用 kafka 客户端 定时发送

kafka-storm 及时处理

这里面 我使用的是本地模式
捐助开发者
在兴趣的驱动下,写一个免费的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。


谢谢您的赞助,我会做的更好!
相关推荐
Kafka 2020-09-18
wangying 2020-11-13
shenzhenzsw 2020-10-09
guicaizhou 2020-09-30
jiaomrswang 2020-09-23
jyj0 2020-09-21
guicaizhou 2020-09-15
hannuotayouxi 2020-08-20
amwayy 2020-08-03
yangyutong00 2020-08-01
weikaixxxxxx 2020-08-01
PoppyEvan 2020-08-01
guicaizhou 2020-08-01
PoppyEvan 2020-07-29