Spark DataFrames and Datasets in Scala
Spark DataFrames and Datasets in Scala
DataFrames and Datasets are Spark's high-level structured APIs, preferred over RDDs for most production work. DataFrames offer SQL-like operations with automatic optimization. Datasets add compile-time type safety on top. This module covers both in depth.
DataFrames vs Datasets vs RDDs
| Feature | RDD | DataFrame | Dataset |
|---|---|---|---|
| Type safety | Compile-time | Runtime | Compile-time |
| Optimization | None | Catalyst | Catalyst |
| API style | Functional | SQL-like | Functional + SQL |
| Serialization | Java/Kryo | Tungsten (binary) | Encoders |
Use DataFrame for ETL, SQL queries, and data exploration. Use Dataset[T] when you want type safety with Spark's optimization. Use RDD for unstructured data or low-level control.
Note: In Scala, DataFrame is just a type alias for Dataset[Row].
Creating DataFrames
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("DataFrames")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// From a local sequence (for testing)
val df = Seq(
("Alice", 30, "Engineering"),
("Bob", 25, "Marketing"),
("Carol", 35, "Engineering")
).toDF("name", "age", "department")
df.show()
// +-----+---+------------+
// | name|age| department|
// +-----+---+------------+
// |Alice| 30| Engineering|
// | Bob| 25| Marketing|
// |Carol| 35| Engineering|
// +-----+---+------------+
df.printSchema()
// root
// |-- name: string (nullable = true)
// |-- age: integer (nullable = false)
// |-- department: string (nullable = true)Reading Data
// CSV with header and schema inference
val csv = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("/path/to/employees.csv")
// Parquet (preserves schema automatically)
val parquet = spark.read.parquet("/path/to/data.parquet")
// JSON
val json = spark.read
.option("multiLine", "true")
.json("/path/to/data.json")Defining an Explicit Schema
Schema inference is convenient but slow for large files. Define the schema explicitly in production:
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("salary", DoubleType, nullable = true),
StructField("department", StringType, nullable = true)
))
val df = spark.read
.schema(schema)
.option("header", "true")
.csv("/path/to/employees.csv")DataFrame Transformations
// select columns
val names = df.select("name", "age")
val computed = df.select($"name", $"age" + 1 as "age_next_year")
// filter
val seniors = df.filter($"age" > 30)
val engineers = df.where($"department" === "Engineering")
// withColumn — add or replace a column
val withSenior = df.withColumn("is_senior", $"age" > 30)
// drop columns
val noDept = df.drop("department")
// rename
val renamed = df.withColumnRenamed("name", "full_name")
// orderBy
val sorted = df.orderBy($"age".desc, $"name".asc)
// distinct
val unique = df.select("department").distinct()
// groupBy and aggregations
val deptStats = df.groupBy("department")
.agg(
count("*") as "headcount",
avg("age") as "avg_age",
max("age") as "max_age"
)
deptStats.show()
// +------------+---------+-------+-------+
// | department|headcount|avg_age|max_age|
// +------------+---------+-------+-------+
// | Engineering| 2| 32.5| 35|
// | Marketing| 1| 25.0| 25|
// +------------+---------+-------+-------+Spark SQL
Register a DataFrame as a temporary view and query it with SQL:
df.createOrReplaceTempView("employees")
val result = spark.sql("""
SELECT department,
COUNT(*) as headcount,
AVG(age) as avg_age
FROM employees
WHERE age > 20
GROUP BY department
ORDER BY headcount DESC
""")
result.show()Typed Datasets
A Dataset[T] is a DataFrame with a known Scala type at each row:
case class Employee(name: String, age: Int, department: String)
val employees: Dataset[Employee] = df.as[Employee]
// Now you get type-safe operations
val names: Dataset[String] = employees.map(_.name)
val engineers: Dataset[Employee] = employees.filter(_.department == "Engineering")
// Accessing fields is type-safe — no column strings
employees.map(e => e.copy(age = e.age + 1)).show()Joins
val depts = Seq(
("Engineering", "New York"),
("Marketing", "San Francisco")
).toDF("department", "location")
// Inner join
val joined = df.join(depts, "department")
// Left outer join
val leftJoined = df.join(depts, Seq("department"), "left_outer")
// Explicit join condition
val explicitJoin = df.join(depts, df("department") === depts("department"), "inner")Writing Data
// Write Parquet (preferred for big data)
df.write
.mode("overwrite") // or "append", "ignore", "error"
.parquet("/output/parquet")
// Write CSV
df.write
.option("header", "true")
.mode("overwrite")
.csv("/output/csv")
// Write partitioned
df.write
.partitionBy("department")
.mode("overwrite")
.parquet("/output/partitioned")Frequently Asked Questions
Q: What is the difference between $"column" and col("column") in Spark?
Both create a Column object referring to a column by name. $"column" is syntactic sugar enabled by import spark.implicits._. col("column") from import org.apache.spark.sql.functions._ works without the implicits import. In practice they're interchangeable; col() is slightly more explicit and works in more contexts (like when writing library code where you can't guarantee the implicits are in scope).
Q: When should I use Dataset[T] over DataFrame?
Use Dataset[T] when type safety matters — when the structure of your data is well-defined and you want compile-time errors rather than runtime errors. It's especially valuable for business logic layers in ETL pipelines. Use DataFrame for exploratory work, ad-hoc SQL queries, and when working with schema-on-read data where the exact structure may vary. Both use Catalyst optimization, so the performance is similar.
Q: Why is Parquet the preferred file format for Spark? Parquet is columnar — it stores data column by column rather than row by row. When you query only a few columns from a wide table, Parquet reads only those columns from disk (column pruning), which is dramatically faster than CSV or JSON which must read every row completely. Parquet also stores the schema, supports efficient compression, and is natively supported by all major big data tools (Hive, Presto, Athena, Databricks).
Part of Scala Mastery Course — Module 17 of 22.
