Two Sigma researcher Li Jin introduces the Vectorized UDFs feature in the upcoming Apache Spark 2.3 release.
This article—a version of which originally appeared on the Databricks blog—introduces the Vectorized UDFs feature in the upcoming Apache Spark 2.3 release, which substantially improves the performance and usability of user-defined functions (UDFs) in Python.
Over the past few years, Python has become the default language for data scientists. Packages such as pandas, numpy, statsmodel, and scikit-learn have gained great adoption and become the mainstream toolkits. At the same time, Apache Spark has become the de facto standard in processing big data. To enable data scientists to leverage the value of big data, Spark added a Python API in version 0.7, with support for user-defined functions. These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala, and then invoke them from Python.
Vectorized UDFs built on top of Apache Arrow bring you the best of both worlds—the ability to define low-overhead, high performance UDFs entirely in Python.
In Spark 2.3, there will be two kinds of vectorized UDFs: scalar and grouped. Next, we illustrate their usage using four example programs: Plus One, Cumulative Probability, Subtract Mean, Ordinary Least Squares Linear Regression.
Scalar Vectorized UDFs
Scalar vectorized UDFs are used for vectorizing scalar operations. To define a scalar vectorized UDF, simply use @pandas_udf to annotate a Python function that takes in pandas.Series as arguments and returns another pandas.Series of the same size. Below we illustrate using two examples: Plus One and Cumulative Probability.
Computing v + 1 is a simple example for demonstrating differences between row-at-a-time UDFs and vectorized UDFs. Note that built-in column operators can perform much faster in this scenario.
Using row-at-a-time UDFs:
Using vectorized UDFs:
The examples above define a row-at-a-time UDF “plus_one” and a vectorized UDF “vectorized_plus_one” that performs the same “plus one” computation. The UDF definitions are the same except the function decorators: “udf” vs “pandas_udf”.
In the row-at-a-time version, the user-defined function takes a double “v” and returns the result of “v + 1” as a double. In the vectorized version, the user-defined function takes a pandas.Series “v” and returns the result of “v + 1” as a pandas.Series. Because “v + 1” is vectorized on pandas.Series, the vectorized version is much faster than the row-at-a-time version.
Note that there are two important requirement when using scalar vectorized UDFs:
- The input and output series must have the same size.
- How a column is splitted into multiple pandas.Series is internal to Spark, and therefore the result of user-defined function must be independent of the splitting.
stats.norm.cdf works both on a scalar value and pandas.Series, and this example can be written with the row-at-a-time UDFs as well. Similar to the previous example, the vectorized version runs much faster, as shown later in the “Performance Comparison” section.
Grouped Vectorized UDFs
Python users are fairly familiar with the split-apply-combine pattern in data analysis. The grouped vectorized UDFs are designed for this scenario, and they operate on all the data for some group, e.g., “for each date, apply this operation”.
Grouped vectorized UDFs first splits a Spark DataFrame into groups based on the conditions specified in the groupby operator, applies a vectorized user-defined function (pandas.DataFrame -> pandas.DataFrame) to each group, combines and returns the results as a new Spark DataFrame.
Grouped vectorized UDFs uses the same function decorator pandas_udf as scalar vectorized UDFs, but they have a few differences:
- Input of the user-defined function:
- Scalar: pandas.Series
- Grouped: pandas.DataFrame
- Output of the user-defined function:
- Scalar: pandas.Series
- Grouped: pandas.DataFrame
- Grouping semantics:
- Scalar: no grouping semantics
- Grouped: defined by “groupby” clause
- Output size:
- Scalar: same as input size
- Grouped: any size
- Return types in the function decorator:
- Scalar: a DataType that specifies the type of the returned pandas.Series
- Grouped: a StructType that specifies each column name and type of the returned pandas.DataFrame
Next, let us walk through two examples to illustrate the use cases of grouped vectorized UDFs.
This example shows a simple use of grouped vectorized udf: subtracting mean from each value in the group.
In this example, we subtract mean of v from each value of v for each group. The grouping semantics is defined by the “groupby” function, i.e, each input pandas.DataFrame to the user-defined function has the same “id” value. The input and output schema of this user-defined function are the same, so we pass “df.schema” to the decorator pandas_udf for specifying the schema.
Grouped vectorized UDFs can also be called as standalone Python functions on the driver. This is very useful for debugging, for example:
In the example above, we first convert a small subset of Spark DataFrame to a pandas.DataFrame, and then run subtract_mean as a standalone Python function on it. After verifying the function logics, we can call grouped vectorized UDFs as Spark functions over the entire dataset.
Ordinary Least Squares Linear Regression
The last example shows how to run OLS linear regression for each group using statsmodels. For each group, we calculate beta b = (b1, b2) for X = (x1, x2) according to statistical model Y = bX + c.
This example demonstrates that grouped vectorized UDFs can be used with any arbitrary python function: pandas.DataFrame -> pandas.DataFrame. The returned pandas.DataFrame can have different number rows and columns as the input.
Lastly, we want to show performance comparison between row-at-a-time UDFs and the new vectorized UDFs. We ran micro benchmarks for three of the above examples (plus one, cumulative probability and subtract mean).
Configuration and Methodology
We ran the benchmark on a single node Spark cluster on Databricks community edition.
Data: A 10M-row DataFrame with a Int column and a Double column
Cluster: 6.0 GB Memory, 0.88 Cores, 1 DBU
Databricks runtime version: Latest RC (3.4, Scala 2.11)
For the detailed implementation of the benchmark, check the vectorized UDF Notebook.
As shown in the charts, vectorized UDFs perform much better than row-at-a-time UDFs across the board, ranging from 3x to over 100x.
Conclusion and Future Work
The upcoming Spark 2.3 release lays down the foundation for substantially improving the capabilities and performance of user-defined functions in Python. In the future, we plan to introduce support for vectorized UDFs in aggregations and window functions. The related work can be tracked in SPARK-22216.
Vectorized UDFs is a collaborative effort by many people in the Apache Spark and Apache Arrow community, including Bryan Cutler, Hyukjin Kwon, Jeff Reback, Liang-Chi Hsieh, Leif Walsh, Li Jin, Reynold Xin, Takuya Ueshin, Wenchen Fan, Wes McKinney, Xiao Li, and many others.