In the realm of local-scale data pipelines, where data processing occurs on a single machine or a small cluster, choosing the right tool is paramount. Each tool brings unique strengths and weaknesses to the table, influencing factors such as ease of use, performance, flexibility, and scalability.
In this article we’ll explore three options for small-medium scale aggregation: Spark, chDB, and DuckDB.
Our product, DT Exchange, is a programmatic advertising platform that facilitates the buying and selling of ad inventory between publishers and advertisers, optimizing ad targeting and delivery.
As part of training our internal models, we needed to produce an hourly report about bids that were excluded. For that reason we take an hourly anonymized aggregation of our bid events and extract all the bids that have an “Excluded” status.
Our criteria for delivery are:
Our main tool for this kind of job has always been Spark, but upon examining the size of the hourly anonymized aggregated data (~40 GiB), we’ve decided to explore other options: chDB and DuckDB.
While benchmarks can be a valuable tool for comparing the performance of different systems, it's important to keep in mind that they can also be very use-case specific. This is especially true for broad targets like SQL databases, which can be used for a wide variety of workloads.
The results of this benchmark are relevant to the use case described in the article. In other cases, we may have obtained different results.
It's also important to note that benchmarks are often run on specific hardware and software configurations. The results may not be the same on different hardware or with different software versions.
You may want to run your own benchmarks to see how the different databases perform on your data and queries.
The input data was downloaded from GCS in advance. All the solutions that we’re exploring have good integration with Amazon S3 and GCS, but for the purposes of this article, we’ll use pre-downloaded local data.
The input data is 40 GiB over 52 parquet files. Their schema contains 94 columns.
The tests run on a c2d-standard-16
(16 core, 64 GB, x86).
Our required data can be extracted with the following query (the real field names changed for simplicity):
select fieldA, fieldB, fieldC, fieldD, fieldE
from '../data/*.parquet' where bidStatus ='Excluded'
group by 1, 2, 3, 4, 5
The result should be saved into a single parquet file, with gzip
compression and row_group_size
of 15000.
The DuckDB implementation is very simple, and has a very good integration with Python. Due to its nature as an embedded database, it supports many other languages, but Python is probably the easiest.
The implementation requires pip install duckdb
and the following script:
import duckdb
query = """copy(
select fieldA, fieldB, fieldC, fieldD, fieldE
from '../data/*.parquet' where bidStatus ='Excluded'
group by 1, 2, 3, 4, 5
)
to './duckdb_local.parquet' (FORMAT 'parquet', COMPRESSION 'gzip', ROW_GROUP_SIZE 15000);"""
duckdb.sql(query)
chDB has a very similar implementation:
import chdb
query = """
select fieldA, fieldB, fieldC, fieldD, fieldE
from '../data/*.parquet' where bidStatus ='Excluded'
group by 1, 2, 3, 4, 5
INTO OUTFILE './chdb_local.parquet'
FORMAT Parquet
SETTINGS
output_format_parquet_compression_method='gzip',
output_format_parquet_row_group_size=15000"""
chdb.query(query)
Now, Spark is a different story. Even if you have (as we do) all the templates and settings for creating a new Spark project quickly, the implementation is still quite verbose. First of all, instead of a single Python script, we must have a project:
spark
├── project
│ ├── build.properties
│ └── plugins.sbt
├── src
│ └── main
│ ├── resources
│ │ └── log4j.properties
│ └── scala
│ └── com
│ └── dt
│ └── spark
│ └── Main.scala
└── build.sbt
With roughly 100 lines of sbt code, containing battle tested exclusions, assembly settings, merge rules, and shading rules. Not all of them are needed for this use case, but figuring out which ones are safe to remove is usually not worth the effort.
Now for the code:
package com.dt.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object Main {
def localSparkSession(name: String, sparkConf: SparkConf): SparkSession = {
SparkSession
.builder()
.master("local[*]")
.appName(name)
.config(sparkConf)
.getOrCreate()
}
def main(args: Array[String]): Unit = {
val spark = localSparkSession("spark_agg", new SparkConf())
val df = spark.read.parquet("../data/*.parquet")
df.createOrReplaceTempView("df")
val result = spark.sql(
s"""select fieldA, fieldB, fieldC, fieldD, fieldE
|from df where bidStatus ='Excluded'
|group by 1, 2, 3, 4, 5""".stripMargin)
result.coalesce(1).write // we must have it as a single file
.option("compression", "gzip")
.option("parquet.row.group.size", 15000)
.mode(SaveMode.Overwrite)
.parquet(s"spark_local.parquet")
}
}
This is probably the most difficult to predict, but it could be argued that less code is always easier to maintain than more code. When new requirements come, we might see that we must implement the Spark solution, but we should pay that price (and the price of debugging Spark dependency issues) only when we must.
As mentioned above, benchmarks can be misleading. In this case we’ve optimized for developer experience and used mostly default settings. One can tune Spark to be more optimized for this specific task, but would take some effort. Even without that, we can still learn some things.
Running perf stats
and /usr/bin/time -v
on the code above resulted in the following:
We see that DuckDB crushed the competition, with more than half of the time that chDB took. Spark is lagging far behind with 2.5 longer processing time compared to DuckDB. Let’s try to analyze those numbers:
CPU Utilization & Lower Task Clock Time
Context Switches & CPU Migrations
Page Faults & Memory
To get a rough estimate of the image size we’ll look at the dependencies required to run each job:
pyarrow
, pandas
, and numpy
.It’s quite easy to see that DuckDB takes this category as it is designed as an embedded, lightweight, standalone database.
Spark's rich ecosystem and support for diverse data sources and formats grant it unparalleled flexibility. Its ability to integrate with various storage systems, stream processing engines, and machine learning libraries makes it a versatile tool for building complex data pipelines. Its strength lies in its code. It’s not restricted to SQL, and uses Scala or Python natively.
chDB offers moderate flexibility with its SQL-based interface and support for user-defined functions enabling customization and extension. It’s obviously not as powerful as Spark, but it does support an immense amount of capabilities.
DuckDB, being more specialized, exhibits lower flexibility. While it excels at in-memory processing and SQL queries, its support for external data sources and advanced analytics may be less extensive compared to Spark and Clickhouse. Important to note is that it is a newer product than the others, and its ecosystem of plugins is still growing.
Spark's distributed computing capabilities make it the undisputed champion of scalability. By distributing data and computations across a cluster of machines, Spark can handle massive datasets that exceed the capacity of a single node.
chDB also demonstrates impressive scalability, particularly for analytical workloads. Its ability to partition data, distribute queries, and leverage columnar storage enables efficient processing of large datasets on a single machine. For a clustered solution, Clickhouse is very powerful, though it can be tricky to get working.
DuckDB, being single-node, has inherent limitations in scalability. While it can handle substantial datasets that fit in memory, its performance may degrade as data volumes grow beyond the capacity of a single machine.
For small to medium-sized datasets and straightforward queries, DuckDB's ease of use and speed make it a compelling choice. Its minimal setup and fast performance can significantly accelerate time-to-market, allowing developers to quickly deploy fast solutions.
When dealing with more complex queries or larger datasets that still reside on a single machine, chDB (or clickhouse-local) emerges as a strong contender. chDB offers a mature and extensive set of capabilities, including support for complex data types, advanced indexing techniques, and efficient query processing. It has more capabilities than DuckDB, at a similar or better performance level. A good example of chDB’s capabilities is its array functions (e.g., arrayJoin
) that perform significantly better than DuckDB.
For truly massive datasets that surpass the capacity of a single machine, Spark's distributed computing prowess provides the most robust solution. Its ability to scale horizontally across a cluster enables processing of petabytes of data, making it suitable for large-scale data warehousing, machine learning, and real-time analytics.
For our stated problem we ended up using DuckDB. It was quick to implement, and due to its low overhead and startup time, could be scheduled to run every few minutes with a Kubernetes CronJob.
For other pipelines we ended up using clickhouse-local
, due to its amazing array handing functions.
That said, for most of our pipelines we use Spark as it's the only solution that can easily handle the vast amount of data we process. Even the input for the job in this article is generated by a Spark job.
Generally speaking, in the realm of local-scale data pipelines, there is no one-size-fits-all solution. The optimal tool hinges on your specific requirements, data volumes, and query complexity. For many use cases, DuckDB can be a surprisingly powerful and efficient option, delivering rapid results with minimal setup. As your needs evolve and data volumes grow, Clickhouse and Spark offer avenues for scaling up without compromising performance. By carefully evaluating your requirements and understanding the strengths and weaknesses of each tool, you can confidently choose the right solution for your local-scale data pipeline, unlocking valuable insights and driving data-driven decision-making.