The main problem of Mars for now is performance. We did a lot of experiments and tests to compare Mars with Dask on a single node and on multiple nodes. The results are shown in the tables below:
Tasks/Subtasks | Dask | Mars | Mars on Ray | Mars on Ray DAG |
---|---|---|---|---|
2_000 | 904.98 | 82.69 | 45.97 | 356.29 |
20_000 | 898.27 | 86.20 | 45.54 | 342.09 |
40_000 | 846.92 | 85.29 | 45.73 | 343.48 |
200_000 | 827.36 | 78.61 | 44.59 | 326.48 |
Table 1: Tps of Dask and Mars runing on one Node
Tasks/Subtasks | Dask | Mars | Mars on Ray | Mars on Ray DAG |
---|---|---|---|---|
2_000 | 1456.31 | 149.51 | 94.73 | 468.02 |
20_000 | 1501.13 | 130.43 | 92.92 | 439.88 |
40_000 | 1366.09 | 128.68 | 86.69 | 438.97 |
200_000 | 1220.73 | 129.85 | 96.65 | 369.47 |
Table 2: Tps of Dask and Mars runing on three Nodes
Tasks/Subtasks: the number of tasks of Dask job, the number of subtasks of Mars job.
We can see that tps of Mars is much smaller than Dask regardless of a single node or multiple nodes. And we draw three conclusions after detailed analysis:
- In graph generation, Mars is much slower than Dask. Mars generates three graphs:
TileableGraph
,ChunkGraph
, andSubtaskGraph
, while Dask only generates one:HighLevelGraph
. Specifically, the generation ofChunkGraph
andSubtaskGraph
takes more time, andSubtaskGraph
is even worse. - In serialization and deserialization, Mars takes longer than Dask. This can be explained by two issues: 1. data structure; 2. serialization and deserialization method.
- In rpc, Mars has much more than Dask. This is because Dask does a batch send for all rpcs, even if those rpcs are in different types.
The first two issues are related to data structure. There are several problems about complex data structures:
- The main data structure of the graph is
ChunkData
. If it takes a long time to create new chunk data and there are many chunk data, then graph generation will be very time-consuming. - Serialization and deserialization data is larger and accordingly the ser/derializing process takes longer time.
In this proposal, we focus on optimizing the first two issues by simplifying the data structure. And we'll optimize the serialization later.
We mainly make the following optimizations in simplifying the data structure.
In order to address the main problem of graph generation, we profiled the supervisor and got the following flame graph.
The above graph is the tile process of ChunkGraph generation. We can see that the chunk creatings takes too long. And the reason is that ChunkData has too many fields and its inheritance relationship is complicated.
The current data structure of Chunk
and ChunkData
are:
-
Remove
type_name
and useself.__class__.__name__
instead when necessary.Currently, the main place of use is
DataFrameData
andCategoricalData
like:class DataFrameData(_BatchedFetcher, BaseDataFrameData): type_name = "DataFrame" def _to_str(self, representation=False): if is_build_mode() or len(self._executed_sessions) == 0: # in build mode, or not executed, just return representation if representation: return ( f"{self.type_name} <op={type(self._op).__name__}, key={self.key}>" ) else: return f"{self.type_name}(op={type(self._op).__name__})" else: ...
class CategoricalData(HasShapeTileableData, _ToPandasMixin): ... def _to_str(self, representation=False): if is_build_mode() or len(self._executed_sessions) == 0: # in build mode, or not executed, just return representation if representation: return f"{self.type_name} <op={type(self.op).__name__}, key={self.key}>" else: return f"{self.type_name}(op={type(self.op).__name__})" else: data = self.fetch(session=self._executed_sessions[-1]) return repr(data) if repr(data) else str(data)
-
Remove
__allow_data_type__
which is used to check the validity of chunk data. If we removeChunkData
, it doesn't need to exist. -
Remove
_id
and keep only_key
. -
Express operand with op id and args to replace operand instance.
-
Remove
Entity
andEntityData
to reduce class inheritance hierarchy.
Finally we will get the Chunk:
class Chunk(Serializable):
__slots__
_key: str
_op_id: str
_op_args: Tuple
_inputs: Tuple
_outputs: Tuple
_index: Tuple
_is_broadcaster: bool
_extra_params: Dict
1-3 is to reduce fields like the _id
. We did a comparison test.
Firstly, we define a simple TensorChunkData
:
class TensorChunkData(Serializable):
_shape = TupleField("shape", FieldTypes.int64)
_order = ReferenceField("order", TensorOrder)
_index = TupleField("index", FieldTypes.uint32)
_extra_params = DictField("extra_params", key_type=FieldTypes.string)
_key = StringField("key", default=None)
_id = StringField("id")
def __init__(self, op=None, index=None, shape=None, dtype=None, order=None, **kw):
args = ()
kwargs = {
"_shape": shape,
"_order": order,
"_op": op,
"_index": index,
"_extra_params": {"_i": kw["_i"]},
"_key": kw["_key"],
"_dtype": kw.get("_dtype"),
"_id": str(id(self)),
}
super().__init__(*args, **kwargs)
From the graph we can see the cost of constructing the TensorChunkData
is about 2.81e-06s.
Secondly, we remove the _id
field and then do the same constructing.
The cost becomes 2.26e-06s, which is reduced by 19.6%.
4 is to change the operand instance to parameters. And also we did a comparison.
from mars.serialization.core import serialize
op = TensorRandomSample(seed=1001, size=100, gpu=False, dtype=np.dtype("f8"))
bs1 = pickle.dumps(serialize(op))
args = ("op_001", (1001, 100, False, np.dtype("f8")))
bs2 = pickle.dumps(serialize(args))
The data size is:
The serializtion cost is:
We can see that data size is reduced by 71.5%, and cost reduced by 63.8%. 5 is to reduce the inheritance level of the class and the test result is as follows.
Firstly, we define a TensorChunkData
which extends EntityData
:
class EntityData(Serializable):
_key = StringField("key", default=None)
_id = StringField("id")
_extra_params = DictField("extra_params", key_type=FieldTypes.string)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class TensorChunkData(EntityData):
_shape = TupleField("shape", FieldTypes.int64)
_order = ReferenceField("order", TensorOrder)
_index = TupleField("index", FieldTypes.uint32)
def __init__(self, op=None, index=None, shape=None, dtype=None, order=None, **kw):
args = ()
kwargs = {
"_shape": shape,
"_order": order,
"_op": op,
"_index": index,
"_extra_params": {"_i": kw["_i"]},
"_key": kw["_key"],
"_dtype": kw.get("_dtype"),
"_id": str(id(self)),
}
super().__init__(*args, **kwargs)
We constructed a TensorChunkData
:
The cost is about 3.55e-06s.
Secondly, we define a TensorChunkData
which extends Serializable
directly.
class TensorChunkData(Serializable):
_shape = TupleField("shape", FieldTypes.int64)
_order = ReferenceField("order", TensorOrder)
_index = TupleField("index", FieldTypes.uint32)
_extra_params = DictField("extra_params", key_type=FieldTypes.string)
_key = StringField("key", default=None)
_id = StringField("id")
def __init__(self, op=None, index=None, shape=None, dtype=None, order=None, **kw):
args = ()
kwargs = {
"_shape": shape,
"_order": order,
"_op": op,
"_index": index,
"_extra_params": {"_i": kw["_i"]},
"_key": kw["_key"],
"_dtype": kw.get("_dtype"),
"_id": str(id(self)),
}
super().__init__(*args, **kwargs)
We constructed a TensorChunkData
:
The cost is about 2.81e-06s, which is reduced by 20.8%.
There is another way to alleviate time-consuming effect of 5.
Firstly, we define a TensorChunk
:
class TensorChunk(Serializable):
type_name = "Tensor"
_data = ReferenceField("data", EntityData)
def __init__(self, data, **kwargs):
super().__init__(_data=data, **kwargs)
We constructed a TensorChunk
:
The cost is about 8.28e-07s which accounts for 29.5% of constructing a TensorChunkData
.
We can largely alleviate this kind of time-consuming problem by merging Chunk
and ChunkData
into Chunk
.
We need to generate a op_id
field for every Operand, and maintain a mapping from op_id
to Operand as follows to find the corresponding Operand and construct the Operand instance.
For convenience, we take the following steps to generate op_id
and mapping:
op_id = hash("Operand's full path")
- The key of mapping is
op_id
, and value is initialized as operand's full path likemars.tensor.random.core.TensorRandomOperand
. - When constructing op, first check the type of value in the mapping. If it is a
str
, load the corresponding operand class, and update value to operand class. If it is aclass
, use it directly. - Construct the mapping when building pymars wheel.
The hash function is not the builtin
hash()
. Because Python 3.4+ switched the hash function to SipHash for security (to avoid collision attack), the same string has different hash values in different Python processes. We will use murmurhash here.
And the mapping is like:
OPERAND_MAPPING = {
574610788: 'mars.tensor.random.core.TensorRandomOperand' or TensorRandomOperand,
1112862296: 'mars.core.operand.shuffle.MapReduceOperand',
710700605: 'mars.core.operand.fetch.FetchShuffle',
...
}
-
Remove
subtask_name
and onlyTaskProcessorActor.get_tileable_subtasks
use it currently. -
The
task_id
can be directly generated by a global sequencer, thestage_id
is generated by a task-level sequencer, and thesubtask_id
is generated by a stage-level sequencer. Finally, we can composesubtask_id
like:def gen_subtask_id(task_id, stage_id, subtask_id): return task_id.to_bytes(4, 'little') + stage_id.to_bytes(2, 'little') + subtask_id.to_bytes(6, 'little') subtask_id = gen_subtask_id(task_id, stage_id, subtask_id) # i.e. # task_id(4 bytes) | stage_id(2 bytes) | subtask_id(6 bytes)
For example:
# if task_id = 1 stage_id = 1 subtask_id = 1 # then # subtask_id = b'\x01\x00\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00' # and subtask_id.hex() is `010000000100010000000000`
We cat get
task_id
andstage_id
as follows:task_id = subtask_id[0::4] subtask_id = subtask_id[4::6]
We compared the new subtask id generation method with the old method. The results are as follows:
The time consumption is reduced to 2.9%. And the bytes are reduced from 24 to 12.
Finally the fields of Subtask are:
class Subtask(Serializable):
subtask_id: str
chunk_graph: ChunkGraph
expect_bands: List
virtual: bool
retryable: bool
priority: Tuple
rerun_time: int
extra_config: Dict
update_meta_chunks: List
logic_key: str
logic_index: int
logic_parallelism: int
bands_specified: bool
required_resource: Resource
execution_time_mills: int
submitted: bool
Later we will move some fields of Subtask
to SubtaskScheduleInfo
. Subtask
only keep the necessary fields at runtime, like:
class Subtask(Serializable):
subtask_id: str
chunk_graph: ChunkGraph
-
Simplify the key generation of
Chunk
by usingop key + chunk index
. We only need to generate the key once, instead of generating the same number of keys as the number of chunks. -
Construct the same Chunk of ChunkGraph when generating a SubtaskGraph.
We did a comparison, in which one creates new chunk and the other does not. The result shows that the cost is reduced from 43.4s to 14.6s with about 66.4% reduction. We note here that
FetchChunk
needs to be created. -
We generate
logic_key
once for all operands or subtasks. In fact, it is possible to generate only once for operand and subtask of the same type.
- Optimize the chunks creation in generating
SubtaskGraph
. - Remove
_id
and only keep the_key
ofTileableData
,ChunkData
andOperand
. - Merge
Chunk
andChunkData
intoChunk
and simplify the_key
generation ofChunk
. - Change operand instance to operand parameters in
ChunkData
and modify the operand. - Simplify
Subtask
- Optimize the
logic_key
generation.