scala - Issues with datastax spark-cassandra connector -
before go ahead , explain question can please tell me difference between sparksql
, cassandrasqlcontext
?
i trying run scala code(don't want create jar testing purpose) on spark-cassandra cluster. so, have following code basic query on cassandra. every time run code following error :
java.lang.classnotfoundexception: com.datastax.spark.connector.rdd.partitioner.cassandrapartition
even though have mentioned same in build.sbt. moreover, have tried give explicit path of connector(in scala code using sc.addjar
or using sparkconf.set()
) have created separately. still doesn't work.
fyi, using spark-1.6, cassandra-2.1.12 , scala-2.10
import org.apache.spark.sparkcontext import org.apache.spark.sparkconf import org.apache.spark.sql.cassandra.cassandrasqlcontext object simpleapp { def main(args: array[string]) { val conf = new sparkconf(true).set("spark.cassandra.connection.host", "172.16.4.196").set("spark.executor.extraclasspath", "/home/naresh/desktop/spark-cassandrawork/spark-cassandra-connector_1.6/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.6.0-m1.jar") //sc.addjar("/home/naresh/desktop/spark-cassandrawork/spark-cassandra-connector_1.6/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.6.0-m1.jar") val sc = new sparkcontext("spark://naresh-pc:7077", "test", conf) val csc = new cassandrasqlcontext(sc) csc.setkeyspace("cw") val rdd = csc.sql("some_query") rdd.collect().foreach(a => println(a)) } }
build.sbt:
name := "sparkcassandrademo" version := "1.0" scalaversion := "2.11.8" val sparkdependencies = seq( "org.apache.spark" %% "spark-core" % "1.6.1", "org.apache.spark".%%("spark-sql") % "1.6.1", "com.datastax.spark" %% "spark-cassandra-connector" % "1.6.0-m2" ) lazy val sparkdebugger = (project in file("spark-debugger")) .settings( librarydependencies ++= sparkdependencies.map(_ % "compile") ) librarydependencies ++= sparkdependencies.map(_ % "provided")
error:
exception in thread "main" org.apache.spark.sql.catalyst.errors.package$treenodeexception: execute, tree: exchange rangepartitioning(cnt#0l asc,200), none +- converttosafe +- tungstenaggregate(key=[useragent#10], functions=[(count(if ((gid#12 = 1)) cookie#13 else null),mode=final,isdistinct=false)], output=[cnt#0l,useragent#10]) +- tungstenexchange hashpartitioning(useragent#10,200), none +- tungstenaggregate(key=[useragent#10], functions=[(count(if ((gid#12 = 1)) cookie#13 else null),mode=partial,isdistinct=false)], output=[useragent#10,count#16l]) +- tungstenaggregate(key=[useragent#10,cookie#13,gid#12], functions=[], output=[useragent#10,cookie#13,gid#12]) +- tungstenexchange hashpartitioning(useragent#10,cookie#13,gid#12,200), none +- tungstenaggregate(key=[useragent#10,cookie#13,gid#12], functions=[], output=[useragent#10,cookie#13,gid#12]) +- expand [list(useragent#10, cookie#3, 1)], [useragent#10,cookie#13,gid#12] +- scan org.apache.spark.sql.cassandra.cassandrasourcerelation@18f60dc[useragent#10,cookie#3] @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:49) @ org.apache.spark.sql.execution.exchange.doexecute(exchange.scala:247) @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:132) @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:130) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150) @ org.apache.spark.sql.execution.sparkplan.execute(sparkplan.scala:130) @ org.apache.spark.sql.execution.converttounsafe.doexecute(rowformatconverters.scala:38) @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:132) @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:130) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150) @ org.apache.spark.sql.execution.sparkplan.execute(sparkplan.scala:130) @ org.apache.spark.sql.execution.sort.doexecute(sort.scala:64) @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:132) @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$5.apply(sparkplan.scala:130) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150) @ org.apache.spark.sql.execution.sparkplan.execute(sparkplan.scala:130) @ org.apache.spark.sql.execution.sparkplan.executecollect(sparkplan.scala:166) @ org.apache.spark.sql.execution.sparkplan.executecollectpublic(sparkplan.scala:174) @ org.apache.spark.sql.dataframe$$anonfun$org$apache$spark$sql$dataframe$$execute$1$1.apply(dataframe.scala:1499) @ org.apache.spark.sql.dataframe$$anonfun$org$apache$spark$sql$dataframe$$execute$1$1.apply(dataframe.scala:1499) @ org.apache.spark.sql.execution.sqlexecution$.withnewexecutionid(sqlexecution.scala:56) @ org.apache.spark.sql.dataframe.withnewexecutionid(dataframe.scala:2086) @ org.apache.spark.sql.dataframe.org$apache$spark$sql$dataframe$$execute$1(dataframe.scala:1498) @ org.apache.spark.sql.dataframe$$anonfun$org$apache$spark$sql$dataframe$$collect$1.apply(dataframe.scala:1503) @ org.apache.spark.sql.dataframe$$anonfun$org$apache$spark$sql$dataframe$$collect$1.apply(dataframe.scala:1503) @ org.apache.spark.sql.dataframe.withcallback(dataframe.scala:2099) @ org.apache.spark.sql.dataframe.org$apache$spark$sql$dataframe$$collect(dataframe.scala:1503) @ org.apache.spark.sql.dataframe.collect(dataframe.scala:1480) @ simpleapp$.main(simpleapp.scala:20) @ simpleapp.main(simpleapp.scala) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ com.intellij.rt.execution.application.appmain.main(appmain.java:144) caused by: org.apache.spark.sparkexception: job aborted due stage failure: task 2 in stage 0.0 failed 4 times, recent failure: lost task 2.3 in stage 0.0 (tid 9, pratik-virtualbox): java.lang.classnotfoundexception: com.datastax.spark.connector.rdd.partitioner.cassandrapartition @ java.net.urlclassloader.findclass(urlclassloader.java:381) @ java.lang.classloader.loadclass(classloader.java:424) @ java.lang.classloader.loadclass(classloader.java:357) @ java.lang.class.forname0(native method) @ java.lang.class.forname(class.java:348) @ org.apache.spark.serializer.javadeserializationstream$$anon$1.resolveclass(javaserializer.scala:68) @ java.io.objectinputstream.readnonproxydesc(objectinputstream.java:1613) @ java.io.objectinputstream.readclassdesc(objectinputstream.java:1518) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1774) @ java.io.objectinputstream.readobject0(objectinputstream.java:1351) @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:2000) @ java.io.objectinputstream.readserialdata(objectinputstream.java:1924) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1801) @ java.io.objectinputstream.readobject0(objectinputstream.java:1351) @ java.io.objectinputstream.readobject(objectinputstream.java:371) @ org.apache.spark.serializer.javadeserializationstream.readobject(javaserializer.scala:76) @ org.apache.spark.serializer.javaserializerinstance.deserialize(javaserializer.scala:115) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:194) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) driver stacktrace: @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1431) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1419) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1418) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1418) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:799) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:799) @ scala.option.foreach(option.scala:236) @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:799) @ org.apache.spark.scheduler.dagschedulereventprocessloop.doonreceive(dagscheduler.scala:1640) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1599) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1588) @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) @ org.apache.spark.scheduler.dagscheduler.runjob(dagscheduler.scala:620) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1832) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1845) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1858) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1929) @ org.apache.spark.rdd.rdd$$anonfun$collect$1.apply(rdd.scala:927) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:111) @ org.apache.spark.rdd.rdd.withscope(rdd.scala:316) @ org.apache.spark.rdd.rdd.collect(rdd.scala:926) @ org.apache.spark.rangepartitioner$.sketch(partitioner.scala:264) @ org.apache.spark.rangepartitioner.<init>(partitioner.scala:126) @ org.apache.spark.sql.execution.exchange.prepareshuffledependency(exchange.scala:179) @ org.apache.spark.sql.execution.exchange$$anonfun$doexecute$1.apply(exchange.scala:254) @ org.apache.spark.sql.execution.exchange$$anonfun$doexecute$1.apply(exchange.scala:248) @ org.apache.spark.sql.catalyst.errors.package$.attachtree(package.scala:48) ... 34 more caused by: java.lang.classnotfoundexception: com.datastax.spark.connector.rdd.partitioner.cassandrapartition @ java.net.urlclassloader.findclass(urlclassloader.java:381) @ java.lang.classloader.loadclass(classloader.java:424) @ java.lang.classloader.loadclass(classloader.java:357) @ java.lang.class.forname0(native method) @ java.lang.class.forname(class.java:348) @ org.apache.spark.serializer.javadeserializationstream$$anon$1.resolveclass(javaserializer.scala:68) @ java.io.objectinputstream.readnonproxydesc(objectinputstream.java:1613) @ java.io.objectinputstream.readclassdesc(objectinputstream.java:1518) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1774) @ java.io.objectinputstream.readobject0(objectinputstream.java:1351) @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:2000) @ java.io.objectinputstream.readserialdata(objectinputstream.java:1924) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1801) @ java.io.objectinputstream.readobject0(objectinputstream.java:1351) @ java.io.objectinputstream.readobject(objectinputstream.java:371) @ org.apache.spark.serializer.javadeserializationstream.readobject(javaserializer.scala:76) @ org.apache.spark.serializer.javaserializerinstance.deserialize(javaserializer.scala:115) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:194) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745)
the error receiving telling execution context of job lack's spark cassandra connector on runtime classpath.
the common way of setting using --packages
spark-submit yourjar --packages datastax:spark-cassandra-connector:1.6.0-m2-s_2.10
http://spark-packages.org/package/datastax/spark-cassandra-connector
note mixing scala versions application build file states scalaversion := "2.11.8"
Comments
Post a Comment