i'm trying perform aggregation using mapgroups
returns sparsematrix 1 of columns, , sum columns.
i created case class
schema mapped rows in order provide column names. matrix column typed org.apache.spark.mllib.linalg.matrix
. if don't run todf
before performing aggregation (select(sum("mycolumn")
) 1 type mismatch error (required: org.apache.spark.sql.typedcolumn[myschema,?]
). if include todf
type mismatch error: cannot resolve 'sum(mycolumn)' due data type mismatch: function sum requires numeric types, not org.apache.spark.mllib.linalg.matrixudt
. what's right way it?
it looks struggle @ least 2 distinct problems here. lets assume have dataset
this:
val ds = seq( ("foo", matrices.dense(3, 2, array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))), ("foo", matrices.dense(3, 2, array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))) ).tods
selecting typedcolumn
:
using implicit conversions
$
:ds.select(col("_1").as[string])
using
o.a.s.sql.functions.col
:ds.select(col("_1").as[string])
adding matrices:
- mllib
matrix
,matrixudt
don't implement addition. means won't ablesum
function or reduce+
- you can use third party linear algebra library not supported in spark sql / spark dataset
if want datsets
can try this:
ds.groupbykey(_._1).mapgroups( (key, values) => { val matrices = values.map(_._2.toarray) val first = matrices.next val sum = matrices.foldleft(first)( (acc, m) => acc.zip(m).map { case (x, y) => x + y } ) (key, sum) })
and map matrices convert rdd , use breeze
.
Comments
Post a Comment