26 June 2022

Apache Wayang: More than a Big Data Abstraction

Recently, Paul King (V.P. and Chair of Groovy PMC) highlighted the big data abstraction [1] that Apache Wayang [2] provides. He mainly showed that users specify an application in a logical plan (a Wayang Plan) that is platform agnostic: Apache Wayang, in turn, transforms a logical plan into a set of execution (physical) operators to be executed by specific underlying processing platforms, such as Apache Flink and Apache Spark.

In this post, we elaborate on the cross-platform optimizer that comes with Apache Wayang, which decides how to generate execution plans. When a user specifies an application on the so called Wayang plan, 

Apache Wayang runs an optimisation process that decides the right execution platform (e.g., Apache Flink) to execute each operator in the Wayang plan so that the overall execution time (or monetary cost) is reduced. All this without users noticing it!


Cross-Platform Data Processing


Today’s data analytics often need to perform tasks on more than one data processing platform, that is they are cross-platform analytics. We have identified four situations in which an application requires support for cross-platform data processing:
  • Platform independence. Applications run an entire task on a single platform but may require switching platforms for different input datasets or tasks usually with the goal of achieving better performance. Paul King has highlighted this case is his blog post [1].

  • Opportunistic cross-platform. Applications might benefit performance-wise from using multiple platforms to run one single task. We will highlight this case in this post.

  • Mandatory cross-platform. Applications may require multiple processing platforms because the platform where the input data resides, e.g., PostgreSQL, cannot perform an incoming task, e.g., a machine learning task. Thus, data should be moved from the platform in which it resides to another platform to be able to run the incoming task.

  • Polystore. Applications may require multiple processing platforms because the input data spread across several data stores, e.g., in a data lake setting.


Current Practice


The current practice to cope with cross-platform requirements is either to build specialised systems that inherently combine two or more platforms. The first approach results in being tied to specific platforms, which can either become outdated or outperformed by newer ones. Re-implementing such specialised systems to incorporate newer systems is very often prohibitively time-consuming. Although the second approach is not coupled with specific platforms, it is expensive, error-prone, and requires expertise on different platforms to achieve high efficiency.


Apache Wayang: a Systematic Solution for Cross-Platform Data Processing


The research and industry communities have identified the need for a systematic solution that decouples applications from the underlying processing platforms and enables efficient cross-platform data processing, transparently from applications and users.

The ultimate goal would be to replicate the success of DBMSs for cross-platform applications: Users formulate platform-agnostic data analytic tasks, and an intermediate system decides on which platforms to execute each (sub)task with the goal of minimizing cost (e.g., runtime or monetary cost). 

The key component of Apache Wayang to realise this is its cross-platform optimiser. More concretely, Wayang’s optimiser tackles the problem of finding an execution plan able to run across multiple platforms that minimises the execution cost of a given task. Let us explain the cross-platform optimiser of Apache Wayang via a running example.


Figure 1. SGD Wayan Plan (with input data in a database)

Figure 1 shows a Wayang plan for the stochastic gradient descent (SGD) algorithms when the initial data is stored in a database. In more detail, the input data points are read via a TableSource and filtered via a Filter operator. Then, they are (i) stored into a file for visualisation using a CollectionSink and (ii) parsed using a Map, while the initial weights are read via a CollectionSource. The main operations of SGD (i.e., sampling, computing the gradients of the sampled data point(s) and updating the weights) are repeated until convergence (i.e., the termination condition of RepeatLoop). The resulting weights are output in a collection.
Given this input plan, the cross-platform optimizer passes the Wayang plan into several phases: the plan inflation, operator costs, movement costs, and plan enumeration phases.

Figure 2. The end-to-end cross-platform optimization pipeline

Figure 2 depicts the workflow of Wayang’s optimiser. At first, given a Wayang plan, the optimiser passes the plan through a plan enrichment phase where the optimiser inflates the input plan by applying a set of mappings to actual execution operators. In other words, these mappings list how each of the platform-agnostic Wayang operators can be implemented on the different platforms with execution operators. The resulting inflated Wayang thus contains all its execution alternatives. The optimiser then annotates the inflated plan with estimates for both data cardinalities and the costs of executing each execution operator. Next, it takes a graph-based approach [3] to determine how data can be moved most efficiently among different platforms and annotates the inflated plan accordingly. It then uses all these annotations to determine the optimal execution plan via an enumeration algorithm.  Eventually, the resulting execution plan can be enacted by the executor of Apache Wayang on all the selected processing platforms.

For example, Wayang’s optimiser outputs the execution plan illustrated in Figure 3 for our SGD example in Figure 1.
 

Figure 3. SGD Execution Plan in Apache Wayang

The above plan shows the execution plan for the SGD Rheem plan when Postgres, Spark, and JavaStreams are the only available platforms. This plan exploits Postgres to extract the desired data points, Spark’s high parallelism for the large input dataset and at the same time benefits from the low latency of JavaStreams for the small collection of centroids. Also note the three additional execution operators for data movement (Results2Stream, Broadcast) and to make data reusable (Cache). 

Benefit?


The reader might wonder what is the benefit of such hybrid plans that Apache Wayang outputs. Figure 4 illustrated the benefits in terms of execution times.

Figure 4. SGD Execution Times in Apache Wayang

We observe that the cross-platform optimizer allows Apache Wayang to run the SGD tasks more than one order of magnitude faster than any single-platform execution (Apache Spark, Apache Flink, or stand-alone Java):
Apache Wayang can execute the SGD task in few second while all other processing platforms does so in the order of minutes!


What Do Apache Wayang’s Users Have to Do?


Actually, Wayang’s users have nothing to do besides declaring their available processing paltforms. For example, taking the following code snippet from Paul’s blog post [1], 
 

users simply have to leave enabled all the platform plugins, instead of selecting only one: that is the .withPlugin(Java.basicPluging()) and .withPlugin(Spark.basicPluging()) lines must be active in the above code snippet. Eventually, users can add any other available processing platform they might have.
Users simply specify their tasks in Apache Wayang in a platform-agnostic manner and let Wayang do the rest for them to achieve the best performance!


Apache Wayang at the Core of Databloom Blossom


Databloom Blossom has Apache Wayang at its core and extends it with new features that Wayang does not have today, such as powerful ML-based query optimiser, federated learning, data debugging and a compliant SQL optimiser.

References

[2] Apache Wayang: https://wayang.apache.org/

[3] Sebastian Kruse, Zoi Kaoudi, Jorge-Arnulfo Quiané-Ruiz, Sanjay Chawla, Felix Naumann, Bertty Contreras-Rojas: Optimizing Cross-Platform Data Movement. ICDE 2019: 1642-1645


most read