Learn More About the Alternative App Future

A Novel Approach to Thresholding and Classifying Large-Scale Data

A Novel Approach to Thresholding and Classifying Large-Scale Data

Introduction

In the ad tech industry, managing large-scale data is a crucial task. Data scientists must process huge volumes of real-time data, often generated by millions of ad requests per second, to optimize campaigns and improve user engagement. This involves cross-correlating multiple features such as revenue, click-through rates (CTR), and user behavior, enabling precise decision-making and performance enhancement, as the data often exhibit highly skewed distributions with significant outliers. Traditional methods of thresholding and classifying such data, like using quantiles and classic classification methods, can be somewhat arbitrary and dependent on predefined configurations. Here, we introduce a novel, two-step approach to thresholding and classifying large-scale data that enhances determinism and precision. Two use cases will be presented: the first demonstrates skewed histogram thresholding on a large dataset, and the second showcases the classification of real-life eCPM data using a novel unsupervised classification method.

The Two-Step Approach

Our approach involves two main steps:

  1. Dimensionality Reduction. This can be achieved using one of the following approaches:
    • Histograms. 
    • Cumulative sum of sorted data. By sorting the processed series following by cumulative sum, we can amplify the difference in the size of the population values and therefore to classify them with higher accuracy.
  2. Piecewise Linear Fit (PWLF). This method is used to detect changes in the average gradient and set the threshold according to the breakpoints between the optimized piecewise linear lines.

Theoretical Background

Piecewise Linear Fit (PWLF) Library: The pwlf Python library is a powerful tool for fitting 1D continuous piecewise linear functions. It provides a robust method for identifying segments in a data series where linear approximations are optimal. For our optimization needs, we used the Differential Evolution (DE) algorithm to fit a specified number of line segments, the Differential Evolution (DE) algorithm is a robust and efficient method for global optimization over continuous spaces. 

Use Case 1: Skewed Histogram Thresholding

Experimental Setup

We deal with a PySpark DataFrame consisting of several tens of millions of rows. Our column of interest is of a log-normal distribution with a long tail and some random noise. The objective is to determine the threshold to filter out the long tail.

Procedure

  1. Histogram Calculation. A histogram with 100 bins is calculated. By calculating the histogram we reduce the dimensionality of the problem from a 70 million rows PySpark DataFrame to a simple 100 rows Pandas DataFrame.
Figure 1. This figure illustrates the histogram of a dataset with highly skewed values. The x-axis represents the value bins, while the y-axis shows the count of occurrences in each bin. The vertical dashed lines indicate the thresholds calculated using quantiles (red line: quantile 95%, green line: quantile 98%, blue line: quantile 99%) and the PWLF thresholding method (yellow line).
  1. Histogram values cumulative sum calculation. By calculating the cumulative sum, we produce a curve that reflects the change in the cumulative sum of the histogram data, when the cumulative sum curve converges to its maximum asymptote, the data can be thresholded.
Figure 2. This figure presents the cumulative sum of the histogram values, normalized to range between 0 and 1.
  1. PWLF Fitting. We fit a PWLF with N lines to the cumulative sum of the histogram.
import pwlf


x = histogram_df['row_num'].values
y = histogram_df['hist_cumsum_norm'].values
# Initiating the PiecewiseLinFit object with the normalized cumulative sum of the histogram count 
my_pwlf = pwlf.PiecewiseLinFit(x, y)
# Fitting N piece wise linear lines
N = 5
res = my_pwlf.fit(N)
  1. Threshold Determination. The threshold is identified at the most left breakpoint, which indicates the point where the change in the cumulative sum of the histogram is close to zero.
# indicating the break point to use as threshold
res_ind = -2
pwlf_threshold = histogram_bins_sql[int(np.round(res[res_ind]))]
  
print(f"r_squared = {my_pwlf.r_squared():.3f}")

Figure 3. This figure displays the cumulative sum curve fitted into five piecewise linear segments. The green dots represent the breakpoints of the piecewise linear fit, and the red lines indicate the fitted segments. The blue dashed line shows the threshold determined by the most left breakpoint of the PWLF algorithm, indicating where the change in the cumulative sum is minimal.

Results

The PWLF approach proves more deterministic and less arbitrary compared to quantiles (0.95, 0.98, 0.99). Note the significant difference in the threshold calculated by quantiles 95%, 98% and 99%.

The number of bins and PWLF lines are the only parameters, providing flexibility and precision. For large-scale data, 100 bins is a good rule of thumb. One can use a more deterministic methods to determine the required number of bins by the size of the population (e.g., the Rice Rule where the number of bins is set to num bins = 2n1/3) .

An iterative approach can be used for determining the number of PWLF lines, starting with a few (e.g., three) and increasing until the R-square exceeds a certain threshold (e.g., 0.98) to avoid overfitting.

Use Case 2: Unsupervised Data Classification

Experimental Setup

In this scenario, we utilize real-life eCPM (effective Cost Per Mille) data collected from several apps and ad sources over a couple of days. The dataset consists of several tens of millions of rows. The objective is to classify the eCPM values into N classes using an unsupervised classifier. 

In this case, instead of calculating the histogram like in Use Case 1, we reduce the dimensionality of the data by subsampling the data after it has been sorted in ascending order and the cumulative sum has been calculated.

Procedure

  1. Sorting and Cumulative Sum Calculation. The data is sorted in descending order to retain rows with low values, We calculate the cumulative sum using a window function, This step helps in amplifying the changes in the nature of the population.
from pyspark.sql import functions as F, Window


# Define the window specification for ordering
order_spec = F.col(f'{column_name}').asc()
windowSpec = Window.orderBy(order_spec).rowsBetween(Window.unboundedPreceding, Window.currentRow)
cumsum_col_name = f"cumulative_sum_{column_name}"
# Cumulative sum
df = df.withColumn(cumsum_col_name, F.sum(F.col(f'{column_name}')).over(windowSpec))


2. Adding Row Number. A row number is added to each row using a window function to facilitate subsampling later.

# 'A' is a dummy order since row_number requires an ordering
windowSpec = Window.orderBy(F.lit('A'))  
# Add row number
df = df.withColumn("row_num", F.row_number().over(windowSpec))

3. Subsampling. We subsample the PySpark DataFrame to num_records rows and convert the subsampled DataFrame to a Pandas DataFrame.

total_rows = df.count()
# Calculate N to achieve approximately 1000 rows
num_records = 1000
n = total_rows // num_records

# Apply modulo operation on 'row_num' and filter rows where the result is 1
df_subsampled = df.filter((F.col("row_num") % int(n)) == 1)
df_subsampled_pd = df_subsampled.toPandas()

4. PWLF Fitting. We iterate over the number of PWLF fitted lines until the r squared value exceeds a certain threshold (0.99 in this case). This process helps determine the optimal number of clusters without overfitting.

x = np.linspace(0, len(df_subsampled_pd) - 1, len(df_subsampled_pd))
y = df_subsampled_pd[f'{cumsum_col_name}'].values
my_pwlf = pwlf.PiecewiseLinFit(x, y)
n_pwlf_lines=1
r2_pwlf = 0
while r2_pwlf < 0.99:
   res = my_pwlf.fit(n_pwlf_lines)
   r2_pwlf = my_pwlf.r_squared()
   print(f"n_pwlf_lines = {n_pwlf_lines}, r_squared = {r2_pwlf:.3f}")
   n_pwlf_lines += 1

n_pwlf_lines = 1, r_squared = 0.711
n_pwlf_lines = 2, r_squared = 0.972
n_pwlf_lines = 3, r_squared = 0.993

Figure 4. Piecewise Linear Fit. This figure shows the piecewise linear fit applied to the cumulative sum of the eCPM values. The x-axis represents the row number, and the y-axis shows the cumulative sum of the eCPM values. The green dots indicate the breakpoints of the piecewise linear fit, while the red lines represent the linear segments between these breakpoints. This fit helps identify significant changes in the distribution of eCPM values.The leftmost segment with the lowest slope represents cluster #1, while the rightmost segment with the highest slope corresponds to cluster #3, and so on.

  1. Assigning Clusters. Each row is assigned to a cluster according to the segment it belongs to, using the breakpoints from the PWLF results.
def create_class_column(df: DataFrame, df_subsampled_pd: DataFrame, col_name: str, res: list) -> DataFrame:
   """
   Adds a new column 'class' to the DataFrame based on conditions specified by the list 'res' and values from a subsampled DataFrame.


   Parameters:
   -----------
   df : DataFrame
       The input Spark DataFrame.
   df_subsampled_pd : DataFrame
       The subsampled Pandas DataFrame used to determine threshold values.
   col_name : str
       The name of the column to apply the conditions to.
   res : list
       A list of row numbers to define the class intervals.


   Returns:
   --------
   DataFrame
       The DataFrame with the added 'class' column.
   """
   expr = None
   for i in range(1, len(res)):
       threshold_row_num_upper = int(np.round(res[i]))
       threshold_value_upper = (df_subsampled_pd.loc[df_subsampled_pd.index == threshold_row_num_upper, f'{cumsum_col_name}'].values[0]
       threshold_row_num_lower = int(np.round(res[i-1]))
       threshold_value_lower = df_subsampled_pd.loc[df_subsampled_pd.index == threshold_row_num_lower, f'{cumsum_col_name}'].values[0]
       condition = (F.col(col_name) > threshold_value_lower) & (F.col(col_name) <= threshold_value_upper)
       if expr is None:
           expr = F.when(condition, i)
       else:
           expr = expr.when(condition, i)
  
   # Handle values outside the specified ranges if needed (optional)
   expr = expr.otherwise(0)  # You can specify a default value or class
  
   df = df.withColumn('class', expr)
   return df


df = create_class_column(df, df_subsampled_pd, f'{cumsum_col_name}', res)

Figure 5. Histogram of eCPM Values. This histogram visualizes the distribution of eCPM values across different classes. The x-axis represents the eCPM values, and the y-axis shows the count of occurrences in each bin. The bars are color-coded to distinguish between the different calculated classes.

Figure 6. Box Plot of eCPM Values by Class. This box plot displays the distribution of eCPM values for each class. The x-axis represents the different classes, and the y-axis shows the eCPM values on a logarithmic scale. 

Results

The PWLF approach effectively classifies the eCPM values into optimal clusters based on the changes in the cumulative sum curve. This method avoids the arbitrariness of predefined configurations like quantiles and provides a robust way to handle highly skewed distributions in real-life data.

Even though the size of the data is relatively large, the processing time take only several seconds

The above visualizations collectively illustrate the classification of eCPM values using the piecewise linear fit approach, highlighting the distinct characteristics of each class and the effectiveness of the classification method.

Conclusion

This presented two-step approach to thresholding large-scale data using PWLF provides a deterministic and flexible method for handling highly skewed distributions and outliers, improving the precision and reliability of data processing for various tasks.

You Might Also Like
Data Pipeline Migration from AWS to GCP
Apache Druid’s Lookups as Code
Testing Environments For Micro Frontends Simplified with ArgoCD ApplicationSet

Newsletter Sign-Up

Get our mobile expertise straight to your inbox.

Explore More

Harnessing Mobile Gaming Apps For Attention & Engagement
Why Sustainability in Advertising Matters
Data Pipeline Migration from AWS to GCP