Skip to content

Commit

Permalink
SPARK-1509: add zipWithIndex zipWithUniqueId methods to java api
Browse files Browse the repository at this point in the history
Author: witgo <witgo@qq.com>

Closes apache#423 from witgo/zipWithIndex and squashes the following commits:

039ec04 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex
24d74c9 [witgo] review commit
763a5e4 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex
59747d1 [witgo] review commit
7bf4d06 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex
daa8f84 [witgo] review commit
4070613 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex
18e6c97 [witgo] java api zipWithIndex test
11e2e7f [witgo] add zipWithIndex zipWithUniqueId methods to java api
  • Loading branch information
witgo authored and pdeyhim committed Jun 25, 2014
1 parent ad3106d commit ee8363c
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
22 changes: 21 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.api.java

import java.util.{Comparator, List => JList, Iterator => JIterator}
import java.lang.{Iterable => JIterable}
import java.lang.{Iterable => JIterable, Long => JLong}

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -264,6 +264,26 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
}

/**
* Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
* 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
*/
def zipWithUniqueId(): JavaPairRDD[T, JLong] = {
JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, JLong]]
}

/**
* Zips this RDD with its element indices. The ordering is first based on the partition index
* and then the ordering of items within each partition. So the first item in the first
* partition gets index 0, and the last item in the last partition receives the largest index.
* This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
* This method needs to trigger a spark job when this RDD contains more than one partitions.
*/
def zipWithIndex(): JavaPairRDD[T, JLong] = {
JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, JLong]]
}

// Actions (launch a job to return a value to the user program)

/**
Expand Down
31 changes: 24 additions & 7 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,30 @@ public void call(String s) {
Assert.assertEquals(2, foreachCalls);
}

@Test
public void toLocalIterator() {
List<Integer> correct = Arrays.asList(1, 2, 3, 4);
JavaRDD<Integer> rdd = sc.parallelize(correct);
List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
Assert.assertTrue(correct.equals(result));
}
@Test
public void toLocalIterator() {
List<Integer> correct = Arrays.asList(1, 2, 3, 4);
JavaRDD<Integer> rdd = sc.parallelize(correct);
List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
Assert.assertTrue(correct.equals(result));
}

@Test
public void zipWithUniqueId() {
List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithUniqueId();
JavaRDD<Long> indexes = zip.values();
Assert.assertTrue(new HashSet<Long>(indexes.collect()).size() == 4);
}

@Test
public void zipWithIndex() {
List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithIndex();
JavaRDD<Long> indexes = zip.values();
List<Long> correctIndexes = Arrays.asList(0L, 1L, 2L, 3L);
Assert.assertTrue(indexes.collect().equals(correctIndexes));
}

@SuppressWarnings("unchecked")
@Test
Expand Down

0 comments on commit ee8363c

Please sign in to comment.