scala - How to create a TypedColumn in a Spark Dataset and manipulate it? -


i'm trying perform aggregation using mapgroups returns sparsematrix 1 of columns, , sum columns.

i created case class schema mapped rows in order provide column names. matrix column typed org.apache.spark.mllib.linalg.matrix. if don't run todf before performing aggregation (select(sum("mycolumn")) 1 type mismatch error (required: org.apache.spark.sql.typedcolumn[myschema,?]). if include todf type mismatch error: cannot resolve 'sum(mycolumn)' due data type mismatch: function sum requires numeric types, not org.apache.spark.mllib.linalg.matrixudt. what's right way it?

it looks struggle @ least 2 distinct problems here. lets assume have dataset this:

val ds = seq(   ("foo",  matrices.dense(3, 2, array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))),    ("foo",  matrices.dense(3, 2, array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))) ).tods 

selecting typedcolumn:

  • using implicit conversions $:

    ds.select(col("_1").as[string]) 
  • using o.a.s.sql.functions.col:

    ds.select(col("_1").as[string]) 

adding matrices:

  • mllib matrix , matrixudt don't implement addition. means won't able sum function or reduce +
  • you can use third party linear algebra library not supported in spark sql / spark dataset

if want datsets can try this:

ds.groupbykey(_._1).mapgroups(   (key, values) => {     val matrices = values.map(_._2.toarray)     val first = matrices.next     val sum = matrices.foldleft(first)(       (acc, m) => acc.zip(m).map { case (x, y) => x + y }     )     (key, sum) }) 

and map matrices convert rdd , use breeze.


Comments