Skip to content

Commit

Permalink
[SPARK-7526] [SPARKR] Specify ip of RBackend, MonitorServer and RRDD …
Browse files Browse the repository at this point in the history
…Socket server

These R process only used to communicate with JVM process on local, so binding to localhost is more reasonable then wildcard ip.

Author: linweizhong <linweizhong@huawei.com>

Closes apache#6053 from Sephiroth-Lin/spark-7526 and squashes the following commits:

5303af7 [linweizhong] bind to localhost rather than wildcard ip
  • Loading branch information
Sephiroth-Lin authored and jeanlyn committed May 28, 2015
1 parent 92572d8 commit d230d2f
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/api/r/RBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.api.r

import java.io.{DataOutputStream, File, FileOutputStream, IOException}
import java.net.{InetSocketAddress, ServerSocket}
import java.net.{InetAddress, InetSocketAddress, ServerSocket}
import java.util.concurrent.TimeUnit

import io.netty.bootstrap.ServerBootstrap
Expand Down Expand Up @@ -65,7 +65,7 @@ private[spark] class RBackend {
}
})

channelFuture = bootstrap.bind(new InetSocketAddress(0))
channelFuture = bootstrap.bind(new InetSocketAddress("localhost", 0))
channelFuture.syncUninterruptibly()
channelFuture.channel().localAddress().asInstanceOf[InetSocketAddress].getPort()
}
Expand Down Expand Up @@ -101,7 +101,7 @@ private[spark] object RBackend extends Logging {
try {
// bind to random port
val boundPort = sparkRBackend.init()
val serverSocket = new ServerSocket(0, 1)
val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
val listenPort = serverSocket.getLocalPort()

// tell the R process via temporary file
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/api/r/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.api.r

import java.io._
import java.net.ServerSocket
import java.net.{InetAddress, ServerSocket}
import java.util.{Map => JMap}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -55,7 +55,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
val parentIterator = firstParent[T].iterator(partition, context)

// we expect two connections
val serverSocket = new ServerSocket(0, 2)
val serverSocket = new ServerSocket(0, 2, InetAddress.getByName("localhost"))
val listenPort = serverSocket.getLocalPort()

// The stdout/stderr is shared by multiple tasks, because we use one daemon
Expand Down Expand Up @@ -414,7 +414,7 @@ private[r] object RRDD {
synchronized {
if (daemonChannel == null) {
// we expect one connections
val serverSocket = new ServerSocket(0, 1)
val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
val daemonPort = serverSocket.getLocalPort
errThread = createRProcess(rLibDir, daemonPort, "daemon.R")
// the socket used to send out the input of task
Expand Down

0 comments on commit d230d2f

Please sign in to comment.