Apache Zeppelin(2)Zeppelin and Spark Yarn Cluster

ApacheZeppelin(2)ZeppelinandSparkYarnCluster

Recentlytrytodebugsomethingonzeppelin,ifsomeerrorhappens,weneedtogotothelogfiletocheckmoreinformation.

Checkthelogfileunderzeppelin/opt/zeppelin/logs

zeppelin-carl-carl-mac.local.log

zeppelin-interpreter-spark-carl-carl-mac.local.log

ErrorMessage:

java.lang.NoClassDefFoundError:Couldnotinitializeclassorg.apache.spark.deploy.SparkHadoopUtil$

atorg.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1959)

atorg.apache.spark.storage.BlockManager.<init>(BlockManager.scala:104)

atorg.apache.spark.storage.BlockManager.<init>(BlockManager.scala:179)

atorg.apache.spark.SparkEnv$.create(SparkEnv.scala:310)

atorg.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163)

atorg.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:269)

atorg.apache.spark.SparkContext.<init>(SparkContext.scala:272)

BuildtheZeppelinagain.

>mvncleanpackage-Pspark-1.4-Dhadoop.version=2.6.0-Phadoop-2.6-Pyarn-DskipTests

ErrorMessage

ERROR[2015-06-3017:04:43,588]({Thread-43}JobProgressPoller.java[run]:57)-Cannotgetorupdateprogress

org.apache.zeppelin.interpreter.InterpreterException:java.lang.IllegalStateException:Poolnotopen

atorg.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(RemoteInterpreter.java:286)

atorg.apache.zeppelin.interpreter.LazyOpenInterpreter.getProgress(LazyOpenInterpreter.java:110)

atorg.apache.zeppelin.notebook.Paragraph.progress(Paragraph.java:179)

atorg.apache.zeppelin.scheduler.JobProgressPoller.run(JobProgressPoller.java:54)

Causedby:java.lang.IllegalStateException:Poolnotopen

atorg.apache.commons.pool2.impl.BaseGenericObjectPool.assertOpen(BaseGenericObjectPool.java:662)

atorg.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:412)

atorg.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363)

atorg.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.getClient(RemoteInterpreterProcess.java:139)

atorg.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(RemoteInterpreter.java:284)

ErrorMessage:

ERROR[2015-06-3017:18:05,297]({sparkDriver-akka.actor.default-dispatcher-4}Logging.scala[logError]:75)-Lostexecutor13onubuntu-dev1:remoteRpcclientdisassociated

INFO[2015-06-3017:18:05,297]({sparkDriver-akka.actor.default-dispatcher-4}Logging.scala[logInfo]:59)-Re-queueingtasksfor13fromTaskSet3.0

WARN[2015-06-3017:18:05,298]({sparkDriver-akka.actor.default-dispatcher-4}Logging.scala[logWarning]:71)-Losttask0.3instage3.0(TID14,ubuntu-dev1):ExecutorLostFailure(executor13lost)

ERROR[2015-06-3017:18:05,298]({sparkDriver-akka.actor.default-dispatcher-4}Logging.scala[logError]:75)-Task0instage3.0failed4times;abortingjob

Solutions:

AfterIfetchandloadtherecentlyzeppelinfromgithubandbuilditmyselfagain.Everythingworks.

Someconfigurationareasfollow:

>lessconf/zeppelin-env.sh

exportMASTER="yarn-client"

exportHADOOP_CONF_DIR="/opt/hadoop/etc/hadoop/"

exportSPARK_HOME="/opt/spark"

.${SPARK_HOME}/conf/spark-env.sh

exportZEPPELIN_CLASSPATH="${SPARK_CLASSPATH}"

Starttheyarncluster,startthezeppelinwithcommand

>bin/zeppelin-daemon.shstart

Checktheyarncluster

http://ubuntu-master:8088/cluster/apps

VisitthezeppelinUI

http://ubuntu-master:8080/

ChecktheInterpretertomakesureweareusingtheyarn-clientmodeandotherinformation.

PlacethisSimpleListthere:

valthreshold="book1"

valproducts=Seq("book1","book2","book3","book4")

valrdd=sc.makeRDD(products,2)

valresult=rdd.filter{p=>

p.equals(threshold)

}.count()

println("!!!!!!!!!!!!!!================result="+result)

Runthatsimplelist,zeppelinwillstartasomethinglikespark-shellcontextonyarncluster,thatjobswillbealwaysrunning,andafterthat,wewillvisitthesparkmasterfromthisURL

http://ubuntu-master:4040/

Wecanseeallthesparkjobs,executorsthere.

Ifyouplantotrysomecomplexexamplelikethisone,youneedtoopentheinterpreterandincreasethememory.

importorg.apache.spark.SparkContext

importorg.apache.spark.mllib.classification.SVMWithSGD

importorg.apache.spark.mllib.evaluation.BinaryClassificationMetrics

importorg.apache.spark.mllib.regression.LabeledPoint

importorg.apache.spark.mllib.util.MLUtils

importorg.apache.spark.rdd.RDD

importorg.apache.spark.mllib.classification.{SVMModel,SVMWithSGD}

valdata:RDD[LabeledPoint]=MLUtils.loadLibSVMFile(sc,"file:///opt/spark/data/mllib/sample_libsvm_data.txt")

//Splitdataintotraining(60%)andtest(40%).

valsplits:Array[RDD[LabeledPoint]]=data.randomSplit(Array(0.6,0.4),seed=11L)

valtraining:RDD[LabeledPoint]=splits(0).cache()

valtest:RDD[LabeledPoint]=splits(1)

//Runtrainingalgorithmtobuildthemodel

valnumIterations=100

valmodel=SVMWithSGD.train(training,numIterations)

//Clearthedefaultthreshold.

model.clearThreshold()

//Computerawscoresonthetestset.

valscoreAndLabels:RDD[(Double,Double)]=test.map{point=>

valscore=model.predict(point.features)

(score,point.label)

}

scoreAndLabels.take(10).foreach{case(score,label)=>

println("Score="+score+"Label="+label);

}

//Getevaluationmetrics.

valmetrics=newBinaryClassificationMetrics(scoreAndLabels)

valauROC=metrics.areaUnderROC()

println("AreaunderROC="+auROC)

Reference:

相关推荐