Skip to content

Commit

Permalink
MapR [SPARK-514] Recovery from checkpoint is broken (apache#461)
Browse files Browse the repository at this point in the history
  • Loading branch information
ekrivokonmapr committed Sep 19, 2019
1 parent 9fe4072 commit 6b0267c
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ private case class Subscribe[K, V](
val shouldSuppress =
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
try {
// consumer.poll(0)
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
if (KafkaUtils.isStreams(currentOffsets.asScala.toMap.map(a => (a._1, a._2.toLong)))) {
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
} else {
consumer.poll(0) }
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
logWarning("Catching NoOffsetForPartitionException since " +
Expand Down Expand Up @@ -159,8 +161,11 @@ private case class SubscribePattern[K, V](
val shouldSuppress =
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
try {
// consumer.poll(0)
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
if (KafkaUtils.isStreams(currentOffsets.asScala.toMap.map(a => (a._1, a._2.toLong)))) {
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
} else {
consumer.poll(0)
}
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
logWarning("Catching NoOffsetForPartitionException since " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData

// Indicates whether Apache Kafka is used instead of MapR Streams
private var isStreams : Boolean = false

/**
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
*/
Expand Down Expand Up @@ -307,9 +304,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
}.toMap
}

// Determine if Apache Kafka is used instead of MapR Streams
isStreams = currentOffsets.keys.map(_.topic()).exists(topic => topic.startsWith("/") && topic.contains(":"))

// don't actually want to consume any messages, so pause all partitions
c.pause(currentOffsets.keySet.asJava)
}
Expand Down Expand Up @@ -354,7 +348,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
osr = commitQueue.poll()
}
if (!m.isEmpty) {
if(isStreams) {
if (KafkaUtils.isStreams(currentOffsets)) {
serviceConsumer.commitAsync(m, commitCallback.get)
} else {
consumer.commitAsync(m, commitCallback.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,12 @@ object KafkaUtils extends Logging {
timeout += 500
}
}

// Determine if Apache Kafka is used instead of MapR Streams
def isStreams(currentOffsets: Map[TopicPartition, Long]): Boolean =
currentOffsets.keys.map(_.topic()).exists(topic => topic.startsWith("/") && topic.contains(":"))


}

object KafkaUtilsPythonHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ private case class Subscribe[K, V](
val shouldSuppress =
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
try {
// consumer.poll(0)
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
if (KafkaUtils.isStreams(currentOffsets.asScala.toMap.map(a => (a._1, a._2.toLong)))) {
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
} else {
consumer.poll(0)
}
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
logWarning("Catching NoOffsetForPartitionException since " +
Expand Down Expand Up @@ -160,8 +163,11 @@ private case class SubscribePattern[K, V](
val shouldSuppress =
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
try {
// consumer.poll(0)
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
if (KafkaUtils.isStreams(currentOffsets.asScala.toMap.map(a => (a._1, a._2.toLong)))) {
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
} else {
consumer.poll(0)
}
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
logWarning("Catching NoOffsetForPartitionException since " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData

// Indicates whether Apache Kafka is used instead of MapR Streams
private var isStreams : Boolean = false

/**
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
*/
Expand Down Expand Up @@ -297,9 +294,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
}.toMap
}

// Determine if Apache Kafka is used instead of MapR Streams
isStreams = currentOffsets.keys.map(_.topic()).exists(topic => topic.startsWith("/") && topic.contains(":"))

// don't actually want to consume any messages, so pause all partitions
c.pause(currentOffsets.keySet.asJava)
}
Expand Down Expand Up @@ -344,7 +338,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
osr = commitQueue.poll()
}
if (!m.isEmpty) {
if(isStreams) {
if (KafkaUtils.isStreams(currentOffsets)) {
serviceConsumer.commitAsync(m, commitCallback.get)
} else {
consumer.commitAsync(m, commitCallback.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ object KafkaUtils extends Logging {
timeout += 500
}
}

// Determine if Apache Kafka is used instead of MapR Streams
def isStreams(currentOffsets: Map[TopicPartition, Long]): Boolean =
currentOffsets.keys.map(_.topic()).exists(topic => topic.startsWith("/") && topic.contains(":"))

}

@deprecated("Use kafka10 package instead of kafka09", "MapR Spark-2.3.2")
Expand Down

0 comments on commit 6b0267c

Please sign in to comment.