Skip to content

Example: K Means

Timothy Hunter edited this page May 16, 2016 · 4 revisions

Tutorial: K-Means with TensorFrames

This example explains how to use TensorFrames to implement the K-Means clustering algorithm. We will compare three versions of the algorithm:

  • the MLlib implementation that comes with Spark
  • a first, simple implementation that relies on Spark for data movements
  • a more optimized version that minimizes data movements between Spark and TensorFlow

If you want to run this demo, the whole code is available here

In MLlib, using K-Means works as follows:

mllib_df = # DataFrame[features: vector]

kmeans = KMeans()\
	.setK(k).setSeed(1)\
	.setFeaturesCol(FEATURES_COL)\
	.setInitMode("random").setMaxIter(num_iters)
mod = kmeans.fit(mllib_df)

The initialization mode (random) is not optimal, but it is here so that we have a fair comparison between MLlib and TensorFrames. On my machine on a single CPU, this takes 6.2 seconds to run. This will be our baseline.

Simple Implementation

Let us start with a simple implementation, in which we take each point, compute the distance and index to the closest centroid and then aggregate the points, using the index to the closest centroid. This is what MLlib does essentially. In order to be reasonably efficient, though, the data is considered in block.

The details on how to compute the distance are not very import here, and the reader can refer to the implementation:

def tf_compute_distances(points, start_centers):
    """
    Given a set of points and some centroids, computes the distance from each point to each
    centroid.

    :param points: a 2d TF tensor of shape num_points x dim
    :param start_centers: a numpy array of shape num_centroid x dim
    :return: a TF tensor of shape num_points x num_centroids
    """
    ...

Here is how one step of computation is run, given an input dataframe and a set of centroids. First a new dataframe is computed with the index to the closest centroid and the distance of this point to the closest centroid:

with tf.Graph().as_default() as g:
    # The placeholder for the input: we use the block format.
    # The first dimension (left unknown) is the number of points in the block.
    points = tf.placeholder(tf.double, shape=[None, num_features], name='features')
    # The shape of the block is extracted as a TF variable.
    num_points = tf.pack([tf.shape(points)[0]], name="num_points")
    distances = tf_compute_distances(points, start_centers)
    # The outputs of the program.
    # The closest centroids are extracted.
    indexes = tf.argmin(distances, 1, name='indexes')
    # This could be done based on the indexes as well.
    min_distances = tf.reduce_min(distances, 1, name='min_distances')
    counts = tf.tile(tf.constant([1]), num_points, name='count')
    df2 = tfs.map_blocks([indexes, counts, min_distances], dataframe)

All these extra columns are appendend to the existing feature column so there is no need to pass that one around. In a second step, the rows are aggregated by the indexes:

gb = df2.groupBy("indexes")

And then the new centroids are the mean of each aggregate of points. Since the mean is not an associative operation, we compute the sum of the points and the count in the partition:

with tf.Graph().as_default() as g:
    # Look at the documentation of tfs.aggregate for the naming conventions of the placeholders.
    x_input = tfs.block(df2, "features", tf_name="features_input")
    count_input = tfs.block(df2, "count", tf_name="count_input")
    md_input = tfs.block(df2, "min_distances", tf_name="min_distances_input")
    # Each operation is just the sum.
    x = tf.reduce_sum(x_input, [0], name='features')
    count = tf.reduce_sum(count_input, [0], name='count')
    min_distances = tf.reduce_sum(md_input, [0], name='min_distances')
    df3 = tfs.aggregate([x, count, min_distances], gb)

The output can finally be aggregated:

df3_c = df3.collect()
# The new centroids.
new_centers = np.array([np.array(row.features) / row['count'] for row in df3_c])
total_distances = np.sum([row['min_distances'] for row in df3_c])

The whole process is repeated until convergence. Running the program on the same conditions as MLlib gives a run time of 129.9 seconds, which is much worse! What is happening? The culprit here is the aggregation operation. While the aggregation operation is very simple, the cost of transfering one row after the other and calling the TensorFlow libraries repeatedly is very high. It would be faster to move as many operations as possible into the TensorFlow runtime, without having to do multiple exchanges. This is what the next section will do.

Vectorized aggregation

Instead of relying on Spark for the aggregation, we can do it ourselves, by relying on the fact that there is a small number of clusters compared to the number of points. For each block, we can compute a matrix with all the centroids, and sum these matrices at the end. We end up collapsing all the rows into a single matrix tensor.

with tf.Graph().as_default() as g:
    # The placeholder for the input: we use the block format
    points = tf.placeholder(tf.double, shape=[None, num_features], name='features')
    # The distances
    distances = tf_compute_distances(points, start_centers)
    # The rest of this block performs a pre-aggregation step in TF, to limit the
    # communication between TF and Spark.
    # The closest centroids are extracted.
    indexes = tf.argmin(distances, 1, name='indexes')
    min_distances = tf.reduce_min(distances, 1, name='min_distances')
    num_points = tf.pack([tf.shape(points)[0]], name="num_points")
    counts = tf.tile(tf.constant([1]), num_points, name='count')
    # These compute the aggregate based on the indexes.
    block_points = tf.unsorted_segment_sum(points, indexes, num_centroids, name="block_points")
    block_counts = tf.unsorted_segment_sum(counts, indexes, num_centroids, name="block_counts")
    block_distances = tf.reduce_sum(min_distances, name="block_distances")
    # One leading dimension is added to express the fact that the previous elements are just
    # one row in the final dataframe.
    # The final dataframe has one row per block.
    agg_points = tf.expand_dims(block_points, 0, name="agg_points")
    agg_counts = tf.expand_dims(block_counts, 0, name="agg_counts")
    agg_distances = tf.expand_dims(block_distances, 0, name="agg_distances")
    # Using trimming to drop the original data (we are just returning one row of data per
    # block).
    df2 = tfs.map_blocks([agg_points, agg_counts, agg_distances],
                         dataframe, trim=True)

It is instructive to print the schema of df2:

tfs.print_schema(df2)
root
 |-- agg_counts: array (nullable = false) integer[1,10]
 |-- agg_distances: double (nullable = false) double[1]
 |-- agg_points: array (nullable = false) double[1,10,100]

Even if the data is split into multiple partitions across the machines, TensorFrames knows that each partion contains exactly one row of a vector, a single number and a matrix. This will limit significantly the data to transfer between Spark and TensorFrames.

Now we can collapse all these partitions into one:

with tf.Graph().as_default() as g:
    # Look at the documentation of tfs.aggregate for the naming conventions of the placeholders.
    x_input = tf.placeholder(tf.double,
                             shape=[None, num_centroids, num_features],
                             name='agg_points_input')
    count_input = tf.placeholder(tf.int32,
                                 shape=[None, num_centroids],
                                 name='agg_counts_input')
    md_input = tf.placeholder(tf.double,
                              shape=[None],
                              name='agg_distances_input')
    # Each operation is just the sum.
    x = tf.reduce_sum(x_input, [0], name='agg_points')
    count = tf.reduce_sum(count_input, [0], name='agg_counts')
    min_distances = tf.reduce_sum(md_input, [0], name='agg_distances')
    (x_, count_, total_distances) = tfs.reduce_blocks([x, count, min_distances], df2)
# The new centers
new_centers = (x_.T / (count_ + 1e-7)).T

With the same conditions as before, the runtime of the algorithm is 6.5 seconds, just as fast as using pure MLlib.

Conclusion

Here is the final tally:

Method Time (seconds)
MLlib (java) 6.2
TensorFrames + Spark aggregation 129.9
TensorFrames 6.5

In fact, that last number could be significantly improved in later versions of Spark and TensorFrames, without additional changes to the user code. Indeed, about 80% of this time is spent on slow data conversions between Spark and TensorFlow, which could be significantly improved. Furthermore, this implementation only uses CPU. We could turn to GPUs with a few instructions to boost the performance further.

Clone this wiki locally