In the ad tech industry, managing large-scale data is a crucial task. Data scientists must process huge volumes of real-time data, often generated by millions of ad requests per second, to optimize campaigns and improve user engagement. This involves cross-correlating multiple features such as revenue, click-through rates (CTR), and user behavior, enabling precise decision-making and performance enhancement, as the data often exhibit highly skewed distributions with significant outliers. Traditional methods of thresholding and classifying such data, like using quantiles and classic classification methods, can be somewhat arbitrary and dependent on predefined configurations. Here, we introduce a novel, two-step approach to thresholding and classifying large-scale data that enhances determinism and precision. Two use cases will be presented: the first demonstrates skewed histogram thresholding on a large dataset, and the second showcases the classification of real-life eCPM data using a novel unsupervised classification method.

Our approach involves two main steps:

**Dimensionality Reduction**. This can be achieved using one of the following approaches:- Histograms.
- Cumulative sum of sorted data. By sorting the processed series following by cumulative sum, we can amplify the difference in the size of the population values and therefore to classify them with higher accuracy.

**Piecewise Linear Fit (PWLF)**. This method is used to detect changes in the average gradient and set the threshold according to the breakpoints between the optimized piecewise linear lines.

**Piecewise Linear Fit (PWLF) Library**: The `pwlf`

Python library is a powerful tool for fitting 1D continuous piecewise linear functions. It provides a robust method for identifying segments in a data series where linear approximations are optimal. For our optimization needs, we used the Differential Evolution (DE) algorithm to fit a specified number of line segments, the Differential Evolution (DE) algorithm is a robust and efficient method for global optimization over continuous spaces.

We deal with a PySpark DataFrame consisting of several tens of millions of rows. Our column of interest is of a log-normal distribution with a long tail and some random noise. The objective is to determine the threshold to filter out the long tail.

**Histogram Calculation**. A histogram with 100 bins is calculated. By calculating the histogram we reduce the dimensionality of the problem from a 70 million rows PySpark DataFrame to a simple 100 rows Pandas DataFrame.

**Histogram values cumulative sum calculation**.

**PWLF Fitting**. We fit a PWLF with N lines to the cumulative sum of the histogram.

```
import pwlf
x = histogram_df['row_num'].values
y = histogram_df['hist_cumsum_norm'].values
# Initiating the PiecewiseLinFit object with the normalized cumulative sum of the histogram count
my_pwlf = pwlf.PiecewiseLinFit(x, y)
# Fitting N piece wise linear lines
N = 5
res = my_pwlf.fit(N)
```

**Threshold Determination**. The threshold is identified at the most left breakpoint, which indicates the point where the change in the cumulative sum of the histogram is close to zero.

```
# indicating the break point to use as threshold
res_ind = -2
pwlf_threshold = histogram_bins_sql[int(np.round(res[res_ind]))]
print(f"r_squared = {my_pwlf.r_squared():.3f}")
```

**Results**

The PWLF approach proves more deterministic and less arbitrary compared to quantiles (0.95, 0.98, 0.99). Note the significant difference in the threshold calculated by quantiles 95%, 98% and 99%.

The number of bins and PWLF lines are the only parameters, providing flexibility and precision. For large-scale data, 100 bins is a good rule of thumb. One can use a more deterministic methods to determine the required number of bins by the size of the population (e.g., the Rice Rule where the number of bins is set to num bins = 2n1/3) .

An iterative approach can be used for determining the number of PWLF lines, starting with a few (e.g., three) and increasing until the R-square exceeds a certain threshold (e.g., 0.98) to avoid overfitting.

In this scenario, we utilize real-life eCPM (effective Cost Per Mille) data collected from several apps and ad sources over a couple of days. The dataset consists of several tens of millions of rows. The objective is to classify the eCPM values into N classes using an unsupervised classifier.

In this case, instead of calculating the histogram like in Use Case 1, we reduce the dimensionality of the data by subsampling the data after it has been sorted in ascending order and the cumulative sum has been calculated.

**Sorting and Cumulative Sum Calculation**. The data is sorted in descending order to retain rows with low values, We calculate the cumulative sum using a window function, This step helps in amplifying the changes in the nature of the population.

```
from pyspark.sql import functions as F, Window
# Define the window specification for ordering
order_spec = F.col(f'{column_name}').asc()
windowSpec = Window.orderBy(order_spec).rowsBetween(Window.unboundedPreceding, Window.currentRow)
cumsum_col_name = f"cumulative_sum_{column_name}"
# Cumulative sum
df = df.withColumn(cumsum_col_name, F.sum(F.col(f'{column_name}')).over(windowSpec))
```

2. **Adding Row Number.** A row number is added to each row using a window function to facilitate subsampling later.

```
# 'A' is a dummy order since row_number requires an ordering
windowSpec = Window.orderBy(F.lit('A'))
# Add row number
df = df.withColumn("row_num", F.row_number().over(windowSpec))
```

3.** Subsampling**. We subsample the PySpark DataFrame to num_records rows and convert the subsampled DataFrame to a Pandas DataFrame.

```
total_rows = df.count()
# Calculate N to achieve approximately 1000 rows
num_records = 1000
n = total_rows // num_records
# Apply modulo operation on 'row_num' and filter rows where the result is 1
df_subsampled = df.filter((F.col("row_num") % int(n)) == 1)
df_subsampled_pd = df_subsampled.toPandas()
```

4. **PWLF Fitting**. We iterate over the number of PWLF fitted lines until the r squared value exceeds a certain threshold (0.99 in this case). This process helps determine the optimal number of clusters without overfitting.

```
x = np.linspace(0, len(df_subsampled_pd) - 1, len(df_subsampled_pd))
y = df_subsampled_pd[f'{cumsum_col_name}'].values
my_pwlf = pwlf.PiecewiseLinFit(x, y)
n_pwlf_lines=1
r2_pwlf = 0
while r2_pwlf < 0.99:
res = my_pwlf.fit(n_pwlf_lines)
r2_pwlf = my_pwlf.r_squared()
print(f"n_pwlf_lines = {n_pwlf_lines}, r_squared = {r2_pwlf:.3f}")
n_pwlf_lines += 1
n_pwlf_lines = 1, r_squared = 0.711
n_pwlf_lines = 2, r_squared = 0.972
n_pwlf_lines = 3, r_squared = 0.993
```

**Assigning Clusters**. Each row is assigned to a cluster according to the segment it belongs to, using the breakpoints from the PWLF results.

```
def create_class_column(df: DataFrame, df_subsampled_pd: DataFrame, col_name: str, res: list) -> DataFrame:
"""
Adds a new column 'class' to the DataFrame based on conditions specified by the list 'res' and values from a subsampled DataFrame.
Parameters:
-----------
df : DataFrame
The input Spark DataFrame.
df_subsampled_pd : DataFrame
The subsampled Pandas DataFrame used to determine threshold values.
col_name : str
The name of the column to apply the conditions to.
res : list
A list of row numbers to define the class intervals.
Returns:
--------
DataFrame
The DataFrame with the added 'class' column.
"""
expr = None
for i in range(1, len(res)):
threshold_row_num_upper = int(np.round(res[i]))
threshold_value_upper = (df_subsampled_pd.loc[df_subsampled_pd.index == threshold_row_num_upper, f'{cumsum_col_name}'].values[0]
threshold_row_num_lower = int(np.round(res[i-1]))
threshold_value_lower = df_subsampled_pd.loc[df_subsampled_pd.index == threshold_row_num_lower, f'{cumsum_col_name}'].values[0]
condition = (F.col(col_name) > threshold_value_lower) & (F.col(col_name) <= threshold_value_upper)
if expr is None:
expr = F.when(condition, i)
else:
expr = expr.when(condition, i)
# Handle values outside the specified ranges if needed (optional)
expr = expr.otherwise(0) # You can specify a default value or class
df = df.withColumn('class', expr)
return df
df = create_class_column(df, df_subsampled_pd, f'{cumsum_col_name}', res)
```

The PWLF approach effectively classifies the eCPM values into optimal clusters based on the changes in the cumulative sum curve. This method avoids the arbitrariness of predefined configurations like quantiles and provides a robust way to handle highly skewed distributions in real-life data.

Even though the size of the data is relatively large, the processing time take only several seconds

The above visualizations collectively illustrate the classification of eCPM values using the piecewise linear fit approach, highlighting the distinct characteristics of each class and the effectiveness of the classification method.

This presented two-step approach to thresholding large-scale data using PWLF provides a deterministic and flexible method for handling highly skewed distributions and outliers, improving the precision and reliability of data processing for various tasks.