apache spark - How to declare Iterator and DenseMatrix type variable in operation mapPartition of RDD? -
i have double array in mappartition of rdd,and when declare iterator or densematrix double array.i found iterator , matrix empty.my test codes below:
val randxts = data.map{ line => val fields = patterns.findallin(line).toarray.map(_.todouble) fields(fields.length-1) }.repartition(2) val res = randxts.mappartitions(x=>{ val matrix = new densematrix(x.length,1,x.toarray) val arr = matrix.toarray println(k.length) (matrix::list[densematrix[double]]()).iterator })
as shown above,the line declare matrix cause error.the error message below:
java.lang.indexoutofboundsexception: storage array has size 0 indices can grow large 124
how can fix problem?
your code calls x.length
, passes x
again densematrix
constructor. since x
iterator, calling length
"consumes" iterator, hence passing again result in empty collection.
to fix - scan iterator once (by converting array):
val res: rdd[densematrix] = randxts.mappartitions(x => { val arr = x.toarray // scan iterator once val matrix = new densematrix(arr.length, 1, arr) seq(matrix).iterator // return value })
for example, input val randxts = sc.parallelize(seq(0.5, 0.6, 0.8))
, printing result shows contains single matrix these values:
res.collect().foreach(println) // prints: // 0.5 // 0.6 // 0.8
Comments
Post a Comment