Spark Streaming(1)Spark Streaming Concept and Zookeeper/Kafka on Local

SparkStreaming(1)SparkStreamingConceptandZookeeper/KafkaonLocal

IwasusingSparkformorethan1yearnow,from0.7to0.9onproduction.RecentlyIcamebacktoSparkandconsideringupgradetheversionto1.3.1.Therearealotofnewthingsandgoodideaafter0.9.

1.Introduction

StandaloneCluster

mastermachineisasinglepoint.

https://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper

Wehaveoptions#1usezookeepertomanageseveralmasters

spark-env.sh,

spark.deploy.recoveryMode,defaultvalueisNONE,shouldbechangedtoZOOKEEPER

spark.deploy.zookeeper.url,eg,192.168.1.100:2181,192.168.1.102:2181

spark.deploy.zookeeper.dir,eg,/spark

Forsparkjob,standaloneclusterwillhaveallthejarsandfilesintheworkingdirectory,weneedtosetspark.worker.cleanup.appDataTtltocleanthem.ButYARNclusterwillautomaticallydothat.

ClusterJobSchedule

standalonecluster-FIFO,spark.cores.maxandspark.deploy.defaultCoresandotherstosethowmuchresourceoneapplicationcanuse.

mesos

YARN-—num-executor,—executor-memoryandetc.

SparkStreaming

sourcefromkafka,flume,twitter,zeromq,kinesis

originalDStreamtime1time2time3time4time5

windowedDStreamwindowtime1windowtime2

checkpoint

ssc.checkpoint(hdfsPath),usuallycheckpointtimewillbe5-10timessliding

dstream.checkpoint(checkpointInterval)

receivethestreaminginparallel,

valnumstreams=5

valkafkaStreams=(1tonumStreams).map{i=>KafkaUtils.createStream(…)}

valunifiedStream=streamingContext.union(kafkaStreams)

RecoverytheTaskfromCheckpoint

deffunctionToCreateContext():StreamingContext={

valssc=newStreamingContext(...)

vallines=sac.socketTextStream(...)

...

ssc.checkpoint(checkpointDirectory)

ssc

}

valcontext=StreamingContext.getOrCreate()checkpointDirectory,functionToCreateContext_)

context....

context.start()

context.awaitTermination()

2.Zookeeper

Installzookeeper

>wgethttp://apache.mesi.com.ar/zookeeper/stable/zookeeper-3.4.6.tar.gz

Unzipthat,Placeitintheworkingdirectory,addthebintothepath.

Setuptheconfiguration

>cpconf/zoo_sample.cfgconf/zoo.cfg

StarttheServer

>zkServer.shstartzoo.cfg

Checkstatus

>zkServer.shstatus

Or

>jps

2194

2294QuorumPeerMain

2330Jps

Connectfromclient

>zkCli.sh-serverlocalhost:2181

zookeeper>help

zookeeper>quit

3.Kafka

Downloadthebinarywithversion8.2.1

>wgethttp://psg.mtu.edu/pub/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz

PlacethatintheworkingdirectoryandAddthattopath

Commandtostartkafka

>kafka-server-start.shconfig/server.properties

Createatopic

>bin/kafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topictest

Createdtopic"test".

Listthetopic

>bin/kafka-topics.sh--list--zookeeperlocalhost:2181

test

Producersendingsomemessages

>bin/kafka-console-producer.sh--broker-listlocalhost:9092--topictest

StartaConsumer

>bin/kafka-console-consumer.sh--zookeeperlocalhost:2181--topictest--from-beginning

References:

http://uohzoaix.github.io/studies/categories/#spark

sparkstreaming

http://dataunion.org/15193.html

http://dataunion.org/6308.html

http://colobu.com/2015/01/05/kafka-spark-streaming-integration-summary/

相关推荐