In the ad tech industry, managing large-scale data is a crucial task. Data scientists must process huge volumes of real-time data, often generated by millions of ad requests per second, to optimize campaigns and improve user engagement. This involves cross-correlating multiple features such as revenue, click-through rates (CTR), and user behavior, enabling precise decision-making and performance enhancement, as the data often exhibit highly skewed distributions with significant outliers. Traditional methods of thresholding and classifying such data, like using quantiles and classic classification methods, can be somewhat arbitrary and dependent on predefined configurations. Here, we introduce a novel, two-step approach to thresholding and classifying large-scale data that enhances determinism and precision. Two use cases will be presented: the first demonstrates skewed histogram thresholding on a large dataset, and the second showcases the classification of real-life eCPM data using a novel unsupervised classification method.
Our approach involves two main steps:
Piecewise Linear Fit (PWLF) Library: The pwlf
Python library is a powerful tool for fitting 1D continuous piecewise linear functions. It provides a robust method for identifying segments in a data series where linear approximations are optimal. For our optimization needs, we used the Differential Evolution (DE) algorithm to fit a specified number of line segments, the Differential Evolution (DE) algorithm is a robust and efficient method for global optimization over continuous spaces.
We deal with a PySpark DataFrame consisting of several tens of millions of rows. Our column of interest is of a log-normal distribution with a long tail and some random noise. The objective is to determine the threshold to filter out the long tail.
import pwlf
x = histogram_df['row_num'].values
y = histogram_df['hist_cumsum_norm'].values
# Initiating the PiecewiseLinFit object with the normalized cumulative sum of the histogram count
my_pwlf = pwlf.PiecewiseLinFit(x, y)
# Fitting N piece wise linear lines
N = 5
res = my_pwlf.fit(N)
# indicating the break point to use as threshold
res_ind = -2
pwlf_threshold = histogram_bins_sql[int(np.round(res[res_ind]))]
print(f"r_squared = {my_pwlf.r_squared():.3f}")
Results
The PWLF approach proves more deterministic and less arbitrary compared to quantiles (0.95, 0.98, 0.99). Note the significant difference in the threshold calculated by quantiles 95%, 98% and 99%.
The number of bins and PWLF lines are the only parameters, providing flexibility and precision. For large-scale data, 100 bins is a good rule of thumb. One can use a more deterministic methods to determine the required number of bins by the size of the population (e.g., the Rice Rule where the number of bins is set to num bins = 2n1/3) .
An iterative approach can be used for determining the number of PWLF lines, starting with a few (e.g., three) and increasing until the R-square exceeds a certain threshold (e.g., 0.98) to avoid overfitting.
In this scenario, we utilize real-life eCPM (effective Cost Per Mille) data collected from several apps and ad sources over a couple of days. The dataset consists of several tens of millions of rows. The objective is to classify the eCPM values into N classes using an unsupervised classifier.
In this case, instead of calculating the histogram like in Use Case 1, we reduce the dimensionality of the data by subsampling the data after it has been sorted in ascending order and the cumulative sum has been calculated.
from pyspark.sql import functions as F, Window
# Define the window specification for ordering
order_spec = F.col(f'{column_name}').asc()
windowSpec = Window.orderBy(order_spec).rowsBetween(Window.unboundedPreceding, Window.currentRow)
cumsum_col_name = f"cumulative_sum_{column_name}"
# Cumulative sum
df = df.withColumn(cumsum_col_name, F.sum(F.col(f'{column_name}')).over(windowSpec))
2. Adding Row Number. A row number is added to each row using a window function to facilitate subsampling later.
# 'A' is a dummy order since row_number requires an ordering
windowSpec = Window.orderBy(F.lit('A'))
# Add row number
df = df.withColumn("row_num", F.row_number().over(windowSpec))
3. Subsampling. We subsample the PySpark DataFrame to num_records rows and convert the subsampled DataFrame to a Pandas DataFrame.
total_rows = df.count()
# Calculate N to achieve approximately 1000 rows
num_records = 1000
n = total_rows // num_records
# Apply modulo operation on 'row_num' and filter rows where the result is 1
df_subsampled = df.filter((F.col("row_num") % int(n)) == 1)
df_subsampled_pd = df_subsampled.toPandas()
4. PWLF Fitting. We iterate over the number of PWLF fitted lines until the r squared value exceeds a certain threshold (0.99 in this case). This process helps determine the optimal number of clusters without overfitting.
x = np.linspace(0, len(df_subsampled_pd) - 1, len(df_subsampled_pd))
y = df_subsampled_pd[f'{cumsum_col_name}'].values
my_pwlf = pwlf.PiecewiseLinFit(x, y)
n_pwlf_lines=1
r2_pwlf = 0
while r2_pwlf < 0.99:
res = my_pwlf.fit(n_pwlf_lines)
r2_pwlf = my_pwlf.r_squared()
print(f"n_pwlf_lines = {n_pwlf_lines}, r_squared = {r2_pwlf:.3f}")
n_pwlf_lines += 1
n_pwlf_lines = 1, r_squared = 0.711
n_pwlf_lines = 2, r_squared = 0.972
n_pwlf_lines = 3, r_squared = 0.993
def create_class_column(df: DataFrame, df_subsampled_pd: DataFrame, col_name: str, res: list) -> DataFrame:
"""
Adds a new column 'class' to the DataFrame based on conditions specified by the list 'res' and values from a subsampled DataFrame.
Parameters:
-----------
df : DataFrame
The input Spark DataFrame.
df_subsampled_pd : DataFrame
The subsampled Pandas DataFrame used to determine threshold values.
col_name : str
The name of the column to apply the conditions to.
res : list
A list of row numbers to define the class intervals.
Returns:
--------
DataFrame
The DataFrame with the added 'class' column.
"""
expr = None
for i in range(1, len(res)):
threshold_row_num_upper = int(np.round(res[i]))
threshold_value_upper = (df_subsampled_pd.loc[df_subsampled_pd.index == threshold_row_num_upper, f'{cumsum_col_name}'].values[0]
threshold_row_num_lower = int(np.round(res[i-1]))
threshold_value_lower = df_subsampled_pd.loc[df_subsampled_pd.index == threshold_row_num_lower, f'{cumsum_col_name}'].values[0]
condition = (F.col(col_name) > threshold_value_lower) & (F.col(col_name) <= threshold_value_upper)
if expr is None:
expr = F.when(condition, i)
else:
expr = expr.when(condition, i)
# Handle values outside the specified ranges if needed (optional)
expr = expr.otherwise(0) # You can specify a default value or class
df = df.withColumn('class', expr)
return df
df = create_class_column(df, df_subsampled_pd, f'{cumsum_col_name}', res)
The PWLF approach effectively classifies the eCPM values into optimal clusters based on the changes in the cumulative sum curve. This method avoids the arbitrariness of predefined configurations like quantiles and provides a robust way to handle highly skewed distributions in real-life data.
Even though the size of the data is relatively large, the processing time take only several seconds
The above visualizations collectively illustrate the classification of eCPM values using the piecewise linear fit approach, highlighting the distinct characteristics of each class and the effectiveness of the classification method.
This presented two-step approach to thresholding large-scale data using PWLF provides a deterministic and flexible method for handling highly skewed distributions and outliers, improving the precision and reliability of data processing for various tasks.