Chendi Xue

I am linux software engineer, currently working on Spark, Arrow, Kubernetes, Ceph, c/c++, and etc.

Prepare TPCDS data for spark

12 Jul 2019 » Spark

I am currently working on spark sql vectorization implementation, and I felt there are so many options for preparing TPCDS data on hdfs, and I chose below method becaused the simplicity and hackability.

Install DataBricks SparkSqlPerf kit

git clone
cd spark-sql-perf
sbt +package

Install TPCDS kit

sudo yum install gcc make flex bison byacc git
git clone
cd tpcds-kit/tools
cd ..

TPCDS Preparation script and submit

vim TPCDSPreparation.scala
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
import org.apache.spark.sql._

// Set:
val rootDir: String = "hdfs://sr602:9000/tpcds_1T" // root directory of location to create data in.
val databaseName: String = "tpcds_1T" // name of database to create.
val scaleFactor: String = "1000" // scaleFactor defines the size of the dataset to generate (in GB).
val format: String = "parquet" // valid spark format like parquet "parquet".
val sqlContext = new SQLContext(sc)
// Run:
val tables = new TPCDSTables(sqlContext,
    dsdgenDir = "/mnt/nvme2/chendi/spark-sql-perf/tpcds-kit/tools", // location of dsdgen
    scaleFactor = scaleFactor,
    useDoubleForDecimal = false, // true to replace DecimalType with DoubleType
    useStringForDate = false) // true to replace DateType with StringType

    location = rootDir,
    format = format,
    overwrite = true, // overwrite the data that is already there
    partitionTables = true, // create the partitioned fact tables
    clusterByPartitionColumns = true, // shuffle to get partitions coalesced into single files.
    filterOutNullPartitionValues = false, // true to filter out the partition with NULL key value
    tableFilter = "", // "" means generate all tables
    numPartitions = 400) // how many dsdgen partitions to run - number of input tasks.

// Create the specified database
sql(s"create database $databaseName")
// Create metastore tables in a specified database for your data.
// Once tables are created, the current database will be switched to the specified database.
tables.createExternalTables(rootDir, "parquet", databaseName, overwrite = true, discoverPartitions = true)
# configure spark-defaults.conf
spark.driver.extraClassPath /mnt/nvme2/chendi/spark-sql-perf/target/scala-2.11/spark-sql-perf_2.11-0.5.1-SNAPSHOT.jar
spark.executor.extraClassPath /mnt/nvme2/chendi/spark-sql-perf/target/scala-2.11/spark-sql-perf_2.11-0.5.1-SNAPSHOT.jar

# using spark-shell to run TPCDS preparation scala
# that is one in above scala, we can assume sc(sparkcontext) exists.
spark-shell --master yarn --deploy-mode client -i TPCDSPreparation.scala

# expects output as below
[root@sr602 spark-gpu]# spark-shell --master yarn --deploy-mode client -i TPCDSPreparation.scala
19/08/12 10:35:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/08/12 10:35:06 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Spark context Web UI available at http://vsr602:4040
Spark context available as 'sc' (master = yarn, app id = application_1565311463008_0088).
Spark session available as 'spark'.
Loading TPCDSPreparation.scala...
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
import org.apache.spark.sql._
rootDir: String = hdfs://sr602:9000/tpcds_1T
databaseName: String = tpcds_1T
scaleFactor: String = 1000
format: String = parquet
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@541897c6
tables: com.databricks.spark.sql.perf.tpcds.TPCDSTables = com.databricks.spark.sql.perf.tpcds.TPCDSTables@1d1fd2aa
19/08/12 10:35:35 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
Pre-clustering with partitioning columns with query

Generating table catalog_sales in database to hdfs://sr602:9000/tpcds_1T/catalog_sales with save mode Overwrite.
[Stage 0:========>                                              (62 + 80) / 400]