mars
mars copied to clipboard
How does this compare to Dask?
Yeah, the question about the difference between Mars and Dask should be inevitable.
First of all, I do want to thank Dask project sincerely for the reason that it leads a bright path that parallelize/distributed scipy stack is workable, and some tiling methods in Mars have referred to the counterparts in Dask.
Mars tensor and Dask array both mimic the numpy API, they look pretty similar, however, except the same goal like parallelizing/distributing scipy stack, Mars shares a totally different architecture with Dask.
Also, we look forward that Mars could become a production-level project from beginning, thus the reliability and availability are extremely important and need to be taken care of and implemented cautiously. We have done a lot of design and work to ensure this, I will talk about this soon later.
The comparison below is based on my knowledge of Dask, anyone can feel free to point out the misunderstood and error I have made, I will thank in advance.
Data structure and expressions
Mars has a clear expression system, for Mars tensor, the expressions include TensorData, ChunkData, Operand. Each TensorData and ChunkData should own an Operand and an Operand can have multiple inputs and outputs. We also has a definite graph representation.
Briefly, Mars has two kinds of graph, one is tensor graph which only exists on the client side and consist of TensorData; the other is chunk graph which is tiled in the scheduler side and consist of ChunkData. Graph can be serialized and deserialized in two ways including json and protobuf. This indicates that Mars client can actually be language-irrelevant, but for now we only have Python client integrated in Mars project.
What is interesting that the Graph in Mars can support loop, we may implement some loop semantic in the future. I think it might be helpful for some iterative algorithm especially for those in machine learning.
As I known, the graph in Dask is expressed in Python dict, for me, dict can only represent directed acyclic graph, and obscure to implement graph algorithms like topological sort and so forth.
Client
Client is extremely lightweight for Mars. When a user writes the code about mars tensor, only a tensor and operand expression tree is generated. and once user calls the execute method, a tensor graph mentioned before will be created, serialized, then submitted to the RESTful API provided by Mars web ui.
No tiling happens on the client, this makes the client neat enough, and easy to be implemented by other languages.
Another bonus is that, the client shall not rely on a variety of hardwares or third-party libraries. For example, by specifying a tensor created on GPU like mt.ones((10, 10), gpu=True), the client does not need any GPU or cupy installed.
In my knowledge, Dask do tiling on the client side into a graph consist of a lot of Python functions which requires the existence of library like cupy, and cupy can only be installed only if the GPU is attached.
Fusion and optimization
As Mars represent the caculation with Operand expression, for example, the (t + 1).sum() will be recorded as a TensorAddConstant(lhs=t, rhs=1) comes up with TensorSum(input=tplus1), so we can fuse the operations into one operation, and try to use some library like numexpr for the CPU setting to codegen the fused numexpr calculation sum(t+1) according to the Mars expressions to accelerate the execution.
So does the GPU part, Mars takes advantage of cupy's ElementwiseKernel and ReductionKernel to fuse as many operands as possible. Unfortunately, Mars optimization is currently naïve that it only fuses the operands on a straight line in the chunk graph.
Instead, If the graph is represented as Python functions, it's hard to tell what's going on in each function, and hard to do more optimization.
Distributed computing
Master-workers is a classic architecture for distributed computing, and the vulnerability is obvious that the master is a single point, if the master fail, everything will down. Besides, if workers become more and more, the master may be too busy to handle each requests from workers normally. What's more, the master's code may become chaos with the more and more functions implemented and hard to maintain any more.
In order to address these problems, we found that Actor model is quite suitable. But unlike akka which is widely used in Java/Scala technique stack, there's no de facto actor model library for Python. However, to meet our requirements, we actually need a lightweight one, thus we develop a lightweight actor model library called Mars actors. We can separate our scheduler(master part) logic into actors, and distribute them to different schedulers according to consistent hashing. So Mars schedulers can be multiple machines which is no longer a single point, and can handle more requests obviously. According to the restriction of Actor model, each actor should be in charge of one duty, thus the schedulers' logic become more clear and simpler to maintain. Schedulers code is here.
Mars worker is also built on Mars actors, thus has more than one process. We use arrow plasma as the shared memory store to reach zero copy.
Also, we are working on the fault tolerance. First, we will make sure any worker's down can be recovered according to the lineage rebuilding of chunks. After this worker part of fault tolerance, we will try to do the scheduler part. Availability and robustness are truly important for Mars.
Conclusion
Dask does have a more complete API like DataFrame and delayed which is obviously an advantage than Mars. However, Mars is designed to consider more on performance, availability, reliability and robustness.
I think Dask and Mars can play different roles in different scenarios, and from my heart, I look forward to see both Dask and Mars grow together to make the Python distributed data processing world better.
The comparison below is based on my knowledge of Dask, anyone can feel free to point out the misunderstood and error I have made, I will thank in advance.
Everything above seems reasonable to me. In particular I think that Mars seems to have two main differences from Dask's approach to multi-dimensional arrays:
- A high level expression system, eventually allowing for high level optimization
- from this building a client library is easy and convenient
- A different distributed computing architecture where each worker can schedule work
First, let me say that to me this library seems to be really well put together. Parallelizing Numpy is a large effort. So too is building a distributed system. If I were to push in some direction, it would be modularity with the ecosystem. I would push on this in two ways:
- The high level expression system could be broken out so that it could be used with any numpy-like library. This might help out both Mars, Dask Array, or Numpy itself. I suspect that a generic symbolic numpy library with expression optimization would be broadly popular beyond just this project.
- I would also break out the distributed computing system to be standalone. For example it would be nice to try running other Dask workloads on the distributed system here. We found that there was much more interest in using dask's schedulers for non-array workloads than we expected.
Finally, I encourage Mars developers to engage in community efforts to support backend-agnostic numpy protocols like NEP-18. I think that your experience and perspective would be very valuable to have in those conversations (doing a github issue search on numpy for __array_function__ is probably also a good idea).
I hope this feedback is ok. Again this looks like really nice work to me.
The comparison below is based on my knowledge of Dask, anyone can feel free to point out the misunderstood and error I have made, I will thank in advance.
Everything above seems reasonable to me. In particular I think that Mars seems to have two main differences from Dask's approach to multi-dimensional arrays:
A high level expression system, eventually allowing for high level optimization
- from this building a client library is easy and convenient
A different distributed computing architecture where each worker can schedule work
First, let me say that to me this library seems to be really well put together. Parallelizing Numpy is a large effort. So too is building a distributed system. If I were to push in some direction, it would be modularity with the ecosystem. I would push on this in two ways:
- The high level expression system could be broken out so that it could be used with any numpy-like library. This might help out both Mars, Dask Array, or Numpy itself. I suspect that a generic symbolic numpy library with expression optimization would be broadly popular beyond just this project.
- I would also break out the distributed computing system to be standalone. For example it would be nice to try running other Dask workloads on the distributed system here. We found that there was much more interest in using dask's schedulers for non-array workloads than we expected.
Finally, I encourage Mars developers to engage in community efforts to support backend-agnostic numpy protocols like NEP-18. I think that your experience and perspective would be very valuable to have in those conversations (doing a github issue search on numpy for
__array_function__is probably also a good idea).I hope this feedback is ok. Again this looks like really nice work to me.
Really appreciate your comment. About the directions, we will discuss more for the Mars developers, and we will update our thinking here at any time. And also thanks a lot for the suggestion to engage the community about the protocols, they are really helpful, we hope we can join in recently.
And may I ask how is mars different from Apache Ray?
Looks like they all got actors and plasma. Same question as @dclong
Looks like they all got actors and plasma. Some question as @dclong
Actors means different for Ray and Mars.
I hope we could give a detailed comparison between Mars and Ray at a right time, but not in this thread. If you are really interested in it, please open a new issue to discuss it.
@qinxuye no big deal. Just out of curiosity. Anyway, I am also not familiar with Ray and Dask. I can dive deep into Mars first.
I notice that you say that multiple Schedulers are implemented in Mars, that is different with Dask. But I only saw the Supervisor in the Docs, So What is the difference between Supervisor and Scheduler?How do multiple Schedulers synchronize data when a scheduler is down ? Thank you in advance
Hi, I am just curious about this: "What is interesting that the Graph in Mars can support loop, we may implement some loop semantic in the future. I think it might be helpful for some iterative algorithm especially for those in machine learning."
Is this feature valid in current release? I just can't find it in the documents.