From d230d2f9c4dc265308064cb5fe66793ddab72be8 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Tue, 12 May 2015 23:55:44 -0700 Subject: [PATCH] [SPARK-7526] [SPARKR] Specify ip of RBackend, MonitorServer and RRDD Socket server These R process only used to communicate with JVM process on local, so binding to localhost is more reasonable then wildcard ip. Author: linweizhong Closes #6053 from Sephiroth-Lin/spark-7526 and squashes the following commits: 5303af7 [linweizhong] bind to localhost rather than wildcard ip --- core/src/main/scala/org/apache/spark/api/r/RBackend.scala | 6 +++--- core/src/main/scala/org/apache/spark/api/r/RRDD.scala | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala index 3a2c94bd9d875..0a91977928cee 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala @@ -18,7 +18,7 @@ package org.apache.spark.api.r import java.io.{DataOutputStream, File, FileOutputStream, IOException} -import java.net.{InetSocketAddress, ServerSocket} +import java.net.{InetAddress, InetSocketAddress, ServerSocket} import java.util.concurrent.TimeUnit import io.netty.bootstrap.ServerBootstrap @@ -65,7 +65,7 @@ private[spark] class RBackend { } }) - channelFuture = bootstrap.bind(new InetSocketAddress(0)) + channelFuture = bootstrap.bind(new InetSocketAddress("localhost", 0)) channelFuture.syncUninterruptibly() channelFuture.channel().localAddress().asInstanceOf[InetSocketAddress].getPort() } @@ -101,7 +101,7 @@ private[spark] object RBackend extends Logging { try { // bind to random port val boundPort = sparkRBackend.init() - val serverSocket = new ServerSocket(0, 1) + val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost")) val listenPort = serverSocket.getLocalPort() // tell the R process via temporary file diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala index 6fea5e1144f2f..06247f7e8b78c 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala @@ -18,7 +18,7 @@ package org.apache.spark.api.r import java.io._ -import java.net.ServerSocket +import java.net.{InetAddress, ServerSocket} import java.util.{Map => JMap} import scala.collection.JavaConversions._ @@ -55,7 +55,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag]( val parentIterator = firstParent[T].iterator(partition, context) // we expect two connections - val serverSocket = new ServerSocket(0, 2) + val serverSocket = new ServerSocket(0, 2, InetAddress.getByName("localhost")) val listenPort = serverSocket.getLocalPort() // The stdout/stderr is shared by multiple tasks, because we use one daemon @@ -414,7 +414,7 @@ private[r] object RRDD { synchronized { if (daemonChannel == null) { // we expect one connections - val serverSocket = new ServerSocket(0, 1) + val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost")) val daemonPort = serverSocket.getLocalPort errThread = createRProcess(rLibDir, daemonPort, "daemon.R") // the socket used to send out the input of task