Apache Spark: A Note When Using JDBC Partitioning

Overview

Partitioning JDBC reads can be a powerful tool for parallelization of I/O bound tasks in Spark; however, there are a few things to consider before adding this option to your data pipelines.

How It Works

As with many of the data sources available in Spark, the JDBC data source is highly configurable. Sticking to the subject, this article will explore the following options and their interactions/use cases:

  • partitionColumn
    • This is the name of the column in the underlying table to be used for partitioning.
  • upperBound
    • This is the upper-bound value for use when partitioning the partition column.
  • lowerBound
    • This is the lower-bound value for use when partitioning the partition column.
  • numPartitions
    • This is the limit on the number of concurrent open JDBC connections. In conjunction with the upper and lower bounds it will also be used to determine the size of each partition (source code for partition generation).
  • dbtable
    • This is used to specify the table and columns to fetch from the external table.

When loading data from a JDBC source, the partitionColumn option along with the numPartitions, lowerBound, and upperBound options allow for parallel reads via multiple connections. For example, imagine you had a table test_schema.test_table with 10 million rows and had 4 cores available on the spark cluster. For the sake of simplicity, your table has a monotonically increasing unique id column id. When reading in this table, you wish to reap the benefits of parallelization so you specify the following configuration options:

partitionColumn: id
lowerBound: 0
upperBound: 10,000,000
numPartitions: 4
dbtable: test_schema.test_table

In this case, the JDBC connector will open up 4 connections, one for each core, and it will submit 4 separate queries to the test_schema.test_table database in the form:

select * from test_schema.test_table where id < 2500000 or id is null
select * from test_schema.test_table where id >= 2500000 and id < 5000000
select * from test_schema.test_table where id >= 5000000 and id < 7500000
select * from test_schema.test_table where id >= 7500000 

This can greatly increase throughput, especially for very large tables where I/O throughput is a concern. Alternatively, with no partitioning options specified, the query will simply be of the following form as no partitioning specification defaults to reading in on one core with only one open connection.

select * from test_schema.test_table

Tradeoffs

Despite the allure of faster throughput, there are a few common questions and variables to consider before employing this built-in strategy. These include:

  • Is my table actually large enough where I/O is a substantial bottleneck?
    • Considering modern EC2 hardware can now read at over 10Gbps (with some reaching into the 50Gbps or even 100Gbps range), the diminishing returns for adding parallel I/O to small to medium size table reads can materialize fairly quickly.
  • Does my read table have a column suitable for partitioning on?
    • Given the implementation of the partition query generation, partition columns must be of timestamp, date, or numeric type. Likewise, the best form of column for partitioning is one that is evenly distributed over its partitions (in other words, each read in bucket will have an equal number of rows inside it: for a numeric column this could be a monotonic id column). While I/O benefits can still be seen when partitioning on a skewed column, it is worth taking into consideration first if an apt column is available as well as any potential skew.
    • Important Note: While the spark documentation states you can specify your own partition column inside of the dbtable option, our use warns against this as it can cause unintended side effects such as duplicate rows and non-deterministic result sets (see the Warning section below).
  • Can the database I’m reading from comfortably handle a large number of concurrent reads/connections?
    • If you are connecting to a large database cluster the answer is likely yes; however, it is worth taking into account the load you could put on a small database instance with this option as it could exceed the number of concurrent connections supported by the database and/or degrade performance for other concurrent queries. As stated with the first consideration, if the dataset is small, it is unlikely the job will be I/O bound.

Warning

Expanding on a point mentioned in the considerations section, we suggest employing great caution before implementing a custom partitionColumn inside of the dbtable option. To illustrate this, let’s consider two implementations: first, the intuitive but dangerous implementation, and second, a safer way if you still wish to specify a custom partitionColumn.

The first implementation:

// For the sake of this example, 
// we will employ a spark cluster with 8 single core executors 
// allowing up to 8 tasks to be scheduled concurrently.
//
// The sample table has 10 million rows
// However, it does not have a suitable partition column
// (no numeric, timestamp, or date column without skew)
val df = spark.read
    .format("jdbc")
    .options(<postgresJDBCOptions>)
    .option("dbtable", 
	s"(select *, row_number() over () as rowid from sample_schema.sample_table) as tbl"
    )
    .option("lowerBound", 0)
    .option("upperBound", 10000000)
    .option("numPartitions", 8)
    .option("partitionColumn", "rowid")
    .load()

If your goal was to generate a partition column this seems like a straightforward way to do so. Using the row_number() window function, one can generate a monotonic id column and use that as an unskewed partition column to read in in parallel. So where is the problem?

Let’s consider this small sample from the dbtable option above: row_number() over () as rowid This is where things take an uneasy turn. Recall that this configuration will open up to 8 concurrent connections, each with a separate query requesting a separate chunk of 1.25 million rows dilineated by rowid. The problem is, depending on when the 8 separate queries are submitted and what data is available to read from shared buffers or database cache, the underlying database may need to recalculate the rowid column and run the queries again. Since there is no stable column to use for ordering in the over () clause, there can be no guarantee of no duplicates if rowid needs to be recalculated. This spells disaster for data pipelines that need to be able to successfully process or replicate data from database to database while maintaining the data’s integrity. So, what can we do about it?

The brings us to the second implementation:

// For this example,
// Column c is a text column we would like partition on
// We will use hashing to do this
//
// As mentioned earlier,
// Whatever column you are hashing should be 
// Evenly distributed to minimize skew for best results
val df = spark.read
    .format("jdbc")
    .options(<postgresJDBCOptions>)
    .option("dbtable", 
	s"(select *, abs(mod(hashtext(c), 8)) as bucket from sample_schema.sample_table) as tbl"
    )
    .option("lowerBound", 0)
    .option("upperBound", 8)
    .option("numPartitions", 8)
    .option("partitionColumn", "bucket")
    .load()

In this example, a hashing function is used along with the mod operator to partition the 10 million rows into 8 buckets of roughly equal size (in practice the size of these buckets will be dependent on the distribution of the data in the hashed column). By using a deterministic hash function, we can guarantee that no row will fall in more than one bucket and since the partitions are stable, duplicates will not be generated, regardless of when the queries are submitted and/or the state of the database being queried.

At this point you may be considering edge cases not limited to:

  • What if the lower bound is greater than the upper bound?
    • This will simply throw an error.
  • What if the number of partitions is greater than the difference between the bounds? Will this cause certain partitions to be read more than once?
    • In this case, the JDBC query partitioner defaults to the number of partitions as the minimum value of the following:
      • numPartitions: the value specified for the number of partitions.
      • upperBound - lowerBound: the difference between the upper and lower bound values.
    • This way, it guarantees no partition queries overlap and no duplicates are read in.
  • What if the bounded area is smaller than the range of bucketed values?
    • In this case, the query generation will generate skewed results, but it will still fetch all the values.
    • For example, imagine you use the hash and mod to bucket the data in a table into 8 buckets but set the lowerBound to 2 and the upperBound to 6.
      • In this case the queries generated with numPartitions = 4 will be:
        SELECT ... WHERE "bucket" < 3 or "bucket" is null
        SELECT ... WHERE "bucket" >= 3 or "bucket" < 4
        SELECT ... WHERE "bucket" >= 4 or "bucket" < 5
        SELECT ... WHERE "bucket" >= 5
        

Despite being a stable way to implement a custom JDBC partition column, there is no free lunch. While I/O computation time saved may be net positive, the computation on the database side to compute hashed and bucketed values for each row is not free and could very well be more expensive than a single serial I/O task to read the table into Spark cluster memory.

To recap, one should be aware of the nuances when attempting to use a custom partition column, but also be aware that there are still stable ways to implement custom partitioning if desirable.

Conclusion

Partitioning on reads with JDBC in Spark can be utilized to increase job throughput, especially for large tables and I/O bound tasks; however, as with many engineering decisions, it is best to consider the tradeoffs and nuances prior to its implementation.

For more information about both Spark and/or JDBC in Spark check out these related postings: