Spark DataFrames and Datasets in Scala

TT

Spark DataFrames and Datasets in Scala

DataFrames and Datasets are Spark's high-level structured APIs, preferred over RDDs for most production work. DataFrames offer SQL-like operations with automatic optimization. Datasets add compile-time type safety on top. This module covers both in depth.

DataFrames vs Datasets vs RDDs

FeatureRDDDataFrameDataset
Type safetyCompile-timeRuntimeCompile-time
OptimizationNoneCatalystCatalyst
API styleFunctionalSQL-likeFunctional + SQL
SerializationJava/KryoTungsten (binary)Encoders

Use DataFrame for ETL, SQL queries, and data exploration. Use Dataset[T] when you want type safety with Spark's optimization. Use RDD for unstructured data or low-level control.

Note: In Scala, DataFrame is just a type alias for Dataset[Row].

Creating DataFrames

scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
  .appName("DataFrames")
  .master("local[*]")
  .getOrCreate()

import spark.implicits._

// From a local sequence (for testing)
val df = Seq(
  ("Alice", 30, "Engineering"),
  ("Bob",   25, "Marketing"),
  ("Carol", 35, "Engineering")
).toDF("name", "age", "department")

df.show()
// +-----+---+------------+
// | name|age|  department|
// +-----+---+------------+
// |Alice| 30| Engineering|
// |  Bob| 25|   Marketing|
// |Carol| 35| Engineering|
// +-----+---+------------+

df.printSchema()
// root
//  |-- name: string (nullable = true)
//  |-- age: integer (nullable = false)
//  |-- department: string (nullable = true)

Reading Data

scala
// CSV with header and schema inference
val csv = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("/path/to/employees.csv")

// Parquet (preserves schema automatically)
val parquet = spark.read.parquet("/path/to/data.parquet")

// JSON
val json = spark.read
  .option("multiLine", "true")
  .json("/path/to/data.json")

Defining an Explicit Schema

Schema inference is convenient but slow for large files. Define the schema explicitly in production:

scala
import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id",         IntegerType, nullable = false),
  StructField("name",       StringType,  nullable = true),
  StructField("salary",     DoubleType,  nullable = true),
  StructField("department", StringType,  nullable = true)
))

val df = spark.read
  .schema(schema)
  .option("header", "true")
  .csv("/path/to/employees.csv")

DataFrame Transformations

scala
// select columns
val names = df.select("name", "age")
val computed = df.select($"name", $"age" + 1 as "age_next_year")

// filter
val seniors = df.filter($"age" > 30)
val engineers = df.where($"department" === "Engineering")

// withColumn — add or replace a column
val withSenior = df.withColumn("is_senior", $"age" > 30)

// drop columns
val noDept = df.drop("department")

// rename
val renamed = df.withColumnRenamed("name", "full_name")

// orderBy
val sorted = df.orderBy($"age".desc, $"name".asc)

// distinct
val unique = df.select("department").distinct()

// groupBy and aggregations
val deptStats = df.groupBy("department")
  .agg(
    count("*") as "headcount",
    avg("age") as "avg_age",
    max("age") as "max_age"
  )

deptStats.show()
// +------------+---------+-------+-------+
// |  department|headcount|avg_age|max_age|
// +------------+---------+-------+-------+
// | Engineering|        2|   32.5|     35|
// |   Marketing|        1|   25.0|     25|
// +------------+---------+-------+-------+

Spark SQL

Register a DataFrame as a temporary view and query it with SQL:

scala
df.createOrReplaceTempView("employees")

val result = spark.sql("""
  SELECT department,
         COUNT(*) as headcount,
         AVG(age) as avg_age
  FROM employees
  WHERE age > 20
  GROUP BY department
  ORDER BY headcount DESC
""")

result.show()

Typed Datasets

A Dataset[T] is a DataFrame with a known Scala type at each row:

scala
case class Employee(name: String, age: Int, department: String)

val employees: Dataset[Employee] = df.as[Employee]

// Now you get type-safe operations
val names: Dataset[String] = employees.map(_.name)
val engineers: Dataset[Employee] = employees.filter(_.department == "Engineering")

// Accessing fields is type-safe — no column strings
employees.map(e => e.copy(age = e.age + 1)).show()

Joins

scala
val depts = Seq(
  ("Engineering", "New York"),
  ("Marketing", "San Francisco")
).toDF("department", "location")

// Inner join
val joined = df.join(depts, "department")

// Left outer join
val leftJoined = df.join(depts, Seq("department"), "left_outer")

// Explicit join condition
val explicitJoin = df.join(depts, df("department") === depts("department"), "inner")

Writing Data

scala
// Write Parquet (preferred for big data)
df.write
  .mode("overwrite")  // or "append", "ignore", "error"
  .parquet("/output/parquet")

// Write CSV
df.write
  .option("header", "true")
  .mode("overwrite")
  .csv("/output/csv")

// Write partitioned
df.write
  .partitionBy("department")
  .mode("overwrite")
  .parquet("/output/partitioned")

Frequently Asked Questions

Q: What is the difference between $"column" and col("column") in Spark? Both create a Column object referring to a column by name. $"column" is syntactic sugar enabled by import spark.implicits._. col("column") from import org.apache.spark.sql.functions._ works without the implicits import. In practice they're interchangeable; col() is slightly more explicit and works in more contexts (like when writing library code where you can't guarantee the implicits are in scope).

Q: When should I use Dataset[T] over DataFrame? Use Dataset[T] when type safety matters — when the structure of your data is well-defined and you want compile-time errors rather than runtime errors. It's especially valuable for business logic layers in ETL pipelines. Use DataFrame for exploratory work, ad-hoc SQL queries, and when working with schema-on-read data where the exact structure may vary. Both use Catalyst optimization, so the performance is similar.

Q: Why is Parquet the preferred file format for Spark? Parquet is columnar — it stores data column by column rather than row by row. When you query only a few columns from a wide table, Parquet reads only those columns from disk (column pruning), which is dramatically faster than CSV or JSON which must read every row completely. Parquet also stores the schema, supports efficient compression, and is natively supported by all major big data tools (Hive, Presto, Athena, Databricks).


Part of Scala Mastery Course — Module 17 of 22.