Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused RPCs and disable reconfiguration #2970

Merged
merged 4 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ class ControllerAsyncRPCHandlerInitializer(
with PortCompletedHandler
with ConsoleMessageHandler
with RetryWorkflowHandler
with ModifyLogicHandler
with EvaluatePythonExpressionHandler
with DebugCommandHandler
with TakeGlobalCheckpointHandler
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package edu.uci.ics.amber.engine.architecture.controller

import edu.uci.ics.amber.engine.common.model.tuple.Tuple
import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand
import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity
import edu.uci.ics.amber.engine.common.workflowruntimestate.{
Expand All @@ -16,11 +15,6 @@ object ControllerEvent {
operatorMetrics: Map[String, OperatorMetrics]
) extends ControlCommand[Unit]

case class ReportCurrentProcessingTuple(
operatorID: String,
tuple: Array[(Tuple, ActorVirtualIdentity)]
) extends ControlCommand[Unit]

case class WorkerAssignmentUpdate(workerMapping: Map[String, Seq[String]])
extends ControlCommand[Unit]

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@ package edu.uci.ics.amber.engine.architecture.controller.promisehandlers
import com.twitter.util.Future
import edu.uci.ics.amber.engine.architecture.controller.ControllerEvent.{
ExecutionStateUpdate,
ExecutionStatsUpdate,
ReportCurrentProcessingTuple
ExecutionStatsUpdate
}
import edu.uci.ics.amber.engine.architecture.controller.promisehandlers.PauseHandler.PauseWorkflow
import edu.uci.ics.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.PauseHandler.PauseWorker
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.QueryCurrentInputTupleHandler.QueryCurrentInputTuple
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.QueryStatisticsHandler.QueryStatistics
import edu.uci.ics.amber.engine.common.model.tuple.Tuple
import edu.uci.ics.amber.engine.common.rpc.AsyncRPCServer.ControlCommand
import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity

import scala.collection.mutable

object PauseHandler {

Expand All @@ -38,9 +32,6 @@ trait PauseHandler {
.flatMap(_.getAllOperatorExecutions)
.map {
case (physicalOpId, opExecution) =>
// create a buffer for the current input tuple
// since we need to show them on the frontend
val buffer = mutable.ArrayBuffer[(Tuple, ActorVirtualIdentity)]()
Future
.collect(
opExecution.getWorkerIds
Expand All @@ -52,22 +43,13 @@ trait PauseHandler {
send(PauseWorker(), worker).flatMap { state =>
workerExecution.setState(state)
send(QueryStatistics(), worker)
.join(send(QueryCurrentInputTuple(), worker))
// get the stats and current input tuple from the worker
.map {
case (metrics, tuple) =>
workerExecution.setStats(metrics.workerStatistics)
buffer.append((tuple, worker))
.map { metrics =>
workerExecution.setStats(metrics.workerStatistics)
}
}
}.toSeq
)
.map { _ =>
// for each paused operator, send the input tuple
sendToClient(
ReportCurrentProcessingTuple(physicalOpId.logicalOpId.id, buffer.toArray)
)
}
}
.toSeq
)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,12 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor)
with OpenExecutorHandler
with PauseHandler
with AddPartitioningHandler
with QueryCurrentInputTupleHandler
with QueryStatisticsHandler
with ResumeHandler
with StartHandler
with AssignPortHandler
with AddInputChannelHandler
with ShutdownDPThreadHandler
with FlushNetworkBufferHandler
with UpdateExecutorHandler
with RetrieveStateHandler
with PrepareCheckpointHandler
with FinalizeCheckpointHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.AddPartition
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.AssignPortHandler.AssignPort
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.OpenExecutorHandler.OpenExecutor
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.PauseHandler.PauseWorker
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.QueryCurrentInputTupleHandler.QueryCurrentInputTuple
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.QueryStatisticsHandler.QueryStatistics
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.ResumeHandler.ResumeWorker
import edu.uci.ics.amber.engine.architecture.worker.promisehandlers.StartHandler.StartWorker
Expand Down Expand Up @@ -48,8 +47,6 @@ object ControlCommandConvertUtils {
AddInputChannelV2(channelId, portId)
case QueryStatistics() =>
QueryStatisticsV2()
case QueryCurrentInputTuple() =>
QueryCurrentInputTupleV2()
case InitializeExecutor(_, opExecInitInfo, isSource) =>
val (code, language) = opExecInitInfo.asInstanceOf[OpExecInitInfoWithCode].codeGen(0, 0)
InitializeExecutorV2(
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import edu.uci.ics.texera.web.model.websocket.response.{HeartBeatResponse, Modif
new Type(value = classOf[OperatorStatisticsUpdateEvent]),
new Type(value = classOf[WebResultUpdateEvent]),
new Type(value = classOf[ConsoleUpdateEvent]),
new Type(value = classOf[OperatorCurrentTuplesUpdateEvent]),
new Type(value = classOf[CacheStatusUpdateEvent]),
new Type(value = classOf[PaginatedResultEvent]),
new Type(value = classOf[PythonExpressionEvaluateResponse]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@ import edu.uci.ics.amber.engine.architecture.worker.controlcommands.ConsoleMessa
import edu.uci.ics.amber.engine.common.{AmberConfig, VirtualIdentityUtils}
import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.common.virtualidentity.ActorVirtualIdentity
import edu.uci.ics.amber.engine.common.workflowruntimestate.WorkflowAggregatedState.{
RESUMING,
RUNNING
}
import edu.uci.ics.texera.web.model.websocket.event.TexeraWebSocketEvent
import edu.uci.ics.texera.web.model.websocket.event.python.ConsoleUpdateEvent
import edu.uci.ics.texera.web.model.websocket.request.RetryRequest
import edu.uci.ics.texera.web.model.websocket.request.python.{
DebugCommandRequest,
PythonExpressionEvaluateRequest
}
import edu.uci.ics.texera.web.model.websocket.response.python.PythonExpressionEvaluateResponse
import edu.uci.ics.texera.web.storage.ExecutionStateStore
import edu.uci.ics.texera.web.storage.ExecutionStateStore.updateWorkflowState
import edu.uci.ics.amber.engine.common.workflowruntimestate.WorkflowAggregatedState.{
RESUMING,
RUNNING
}
import edu.uci.ics.amber.engine.common.workflowruntimestate.{
EvaluatedValueList,
ExecutionConsoleStore,
OperatorConsole
}
import edu.uci.ics.texera.web.model.websocket.request.RetryRequest
import edu.uci.ics.texera.web.storage.ExecutionStateStore.updateWorkflowState
import edu.uci.ics.texera.web.{SubscriptionManager, WebsocketInput}

import java.time.Instant
Expand Down
Loading
Loading