Matrix Multiplication in Apache Spark
All depends on the input data and dimensions but generally speaking what you want is not a RDD
but one of the distributed data structures from org.apache.spark.mllib.linalg.distributed
. At this moment it provides four different implementations of the DistributedMatrix
IndexedRowMatrix
- can be created directly from aRDD[IndexedRow]
whereIndexedRow
consist of row index andorg.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.{Vectors, Matrices}
import org.apache.spark.mllib.linalg.distributed.{IndexedRowMatrix,
IndexedRow}
val rows = sc.parallelize(Seq(
(0L, Array(1.0, 0.0, 0.0)),
(0L, Array(0.0, 1.0, 0.0)),
(0L, Array(0.0, 0.0, 1.0)))
).map{case (i, xs) => IndexedRow(i, Vectors.dense(xs))}
val indexedRowMatrix = new IndexedRowMatrix(rows)RowMatrix
- similar toIndexedRowMatrix
but without meaningful row indices. Can be created directly fromRDD[org.apache.spark.mllib.linalg.Vector]
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val rowMatrix = new RowMatrix(rows.map(_.vector))BlockMatrix
- can be created fromRDD[((Int, Int), Matrix)]
where first element of the tuple contains coordinates of the block and the second one is a localorg.apache.spark.mllib.linalg.Matrix
val eye = Matrices.sparse(
3, 3, Array(0, 1, 2, 3), Array(0, 1, 2), Array(1, 1, 1))
val blocks = sc.parallelize(Seq(
((0, 0), eye), ((1, 1), eye), ((2, 2), eye)))
val blockMatrix = new BlockMatrix(blocks, 3, 3, 9, 9)CoordinateMatrix
- can be created fromRDD[MatrixEntry]
whereMatrixEntry
consist of row, column and value.import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,
MatrixEntry}
val entries = sc.parallelize(Seq(
(0, 0, 3.0), (2, 0, -5.0), (3, 2, 1.0),
(4, 1, 6.0), (6, 2, 2.0), (8, 1, 4.0))
).map{case (i, j, v) => MatrixEntry(i, j, v)}
val coordinateMatrix = new CoordinateMatrix(entries, 9, 3)
First two implementations support multiplication by a local Matrix
:
val localMatrix = Matrices.dense(3, 2, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
indexedRowMatrix.multiply(localMatrix).rows.collect
// Array(IndexedRow(0,[1.0,4.0]), IndexedRow(0,[2.0,5.0]),
// IndexedRow(0,[3.0,6.0]))
and the third one can be multiplied by an another BlockMatrix
as long as number of columns per block in this matrix matches number of rows per block of the other matrix. CoordinateMatrix
doesn't support multiplications but is pretty easy to create and transform to other types of distributed matrices:
blockMatrix.multiply(coordinateMatrix.toBlockMatrix(3, 3))
Each type has its own strong and weak sides and there are some additional factors to consider when you use sparse or dense elements (Vectors
or block Matrices
). Multiplying by a local matrix is usually preferable since it doesn't require expensive shuffling.
You can find more details about each type in the MLlib Data Types guide.
Element by Element Matrix Multiplication in Scala
Below is one way of doing it. Here we iterate both the matrix column wise and find their element multiplication. This solution assumes that both the matrix are of same dimensions.
First let's create test matrix as given in question.
//creating example matrix as per the question
val m1: Matrix = new DenseMatrix(3, 4, Array(1.0, 0.0, 2.0, 0.0, 3.0, 1.0, 2.0, 1.0, 0.0, 1.0, 1.0, 0.0))
val m2: Matrix = new DenseMatrix(3, 4, Array(3.0, 1.0, 2.0, 0.0, 9.0, 5.0, 2.0, 5.0, 0.0, 1.0, 1.0, 0.0))
Now let's define a function which takes two Matrix
and returns their element multiplication.
//define a function to calculate element wise multiplication
def elemWiseMultiply(m1: Matrix, m2: Matrix): Matrix = {
val arr = new ArrayBuffer[Array[Double]]()
val m1Itr = m1.colIter //operate on each columns
val m2Itr = m2.colIter
while (m1Itr.hasNext)
//zip both the columns and then multiple element by element
arr += m1Itr.next.toArray.zip(m2Itr.next.toArray).map { case (a, b) => a * b }
//return the resultant matrix
new DenseMatrix(m1.numRows, m1.numCols, arr.flatten.toArray)
}
You can then call this function for your element multiplication.
//call the function to m1 and m2
elemWiseMultiply(m1, m2)
//output
//3.0 0.0 4.0 1.0
//0.0 27.0 5.0 1.0
//4.0 5.0 0.0 0.0
Apache Spark java heap space error during matrix multiplication
It appeared that sparse matrix multiplication is not implemented in a way I thought about it. Spark naturally multiplies block matrices even with zeros in almost all cells. We implemented our own multiplication. Here is Scala code (so also copied from some place):
def multiply(left: CoordinateMatrix, right: CoordinateMatrix): CoordinateMatrix = {
val leftEntries = left.entries.map({ case MatrixEntry(i, j, v) => (j, (i, v)) })
val rightEntries = right.entries.map({ case MatrixEntry(j, k, w) => (j, (k, w)) })
val productEntries = leftEntries
.join(rightEntries)
.map({ case (_, ((i, v), (k, w))) => ((i, k), (v * w)) })
.reduceByKey(_ + _)
.map({ case ((i, k), sum) => MatrixEntry(i, k, sum) })
new CoordinateMatrix(productEntries)
}
Related Topics
Trouble with Gson Serializing an Arraylist of Pojo'S
Spring Cache with Collection of Items/Entities
Retrieve an Image Stored as Blob on a MySQL Db
Steps Needed to Use MySQL Database with Play Framework 2.0
Using an Instance of an Object as a Key in Hashmap, and Then Access It with Exactly New Object
Transactional Saves Without Calling Update Method
How to Create an 2D Arraylist in Java
JSONobject - How to Get a Value
Determine Whether Daylight Savings Time (Dst) Is Active in Java for a Specified Date
How to Get the Http Status Code Out of a Servletresponse in a Servletfilter
Making a Log4J Console Appender Use Different Colors for Different Threads
How to Get Method Parameter Names in Java 8 Using Reflection
Take Full Page Screenshot in Chrome with Selenium
When Do You Need to Explicitly Call a Superclass Constructor