Spark Structured Streaming in Scala
Spark Structured Streaming in Scala
Spark Structured Streaming extends the DataFrame API to real-time data processing. You write the same DataFrame transformations you'd use for batch data, and Spark handles the streaming execution automatically. This module covers reading from sources, transformations, windowing, watermarks, and writing to sinks.
What Is Structured Streaming?
Structured Streaming treats a live data stream as an unbounded table. New data continuously appends rows to this table, and your queries run on the latest state. You write batch-style code; Spark handles micro-batches or continuous processing internally.
Key advantages over Spark's older DStream API:
- Same DataFrame/SQL API as batch processing
- Exactly-once fault tolerance via checkpointing
- Event-time processing with watermarks
- Kafka, files, sockets, and custom sources
Reading from a File Source (Getting Started)
The simplest way to start — Spark watches a directory for new files:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
val spark = SparkSession.builder()
.appName("StructuredStreaming")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
val schema = StructType(Array(
StructField("event_time", TimestampType),
StructField("user_id", StringType),
StructField("action", StringType)
))
val stream = spark.readStream
.schema(schema)
.option("header", "true")
.csv("/path/to/watch/directory/")Any new CSV files added to that directory will be processed automatically.
Reading from Kafka
Kafka is the most common source for production streaming:
// Add to build.sbt:
// "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest") // or "earliest"
.load()
// Kafka data arrives as (key, value, topic, partition, offset, timestamp)
// The value is binary — parse it:
import org.apache.spark.sql.functions._
import spark.implicits._
val events = kafkaStream
.select(
$"timestamp",
from_json($"value".cast("string"), schema) as "data"
)
.select("timestamp", "data.*")Streaming Transformations
Most DataFrame operations work on streaming DataFrames:
// Filter
val clicks = events.filter($"action" === "click")
// Select and compute
val withHour = events.withColumn("hour", hour($"event_time"))
// GroupBy with aggregation (requires output mode)
val actionCounts = events
.groupBy($"action")
.count()Note: Not all operations are supported in streaming mode — operations that require seeing all data at once (like sort without a window, distinct) are not allowed.
Output Modes
Streaming queries have three output modes:
// append — only new rows (default for queries without aggregation)
// update — only changed rows (aggregations)
// complete — all rows every time (small aggregations only)
query.writeStream
.outputMode("update")
...Writing to Console (Testing)
val query = actionCounts.writeStream
.outputMode("update")
.format("console")
.option("truncate", "false")
.start()
query.awaitTermination()Writing to Kafka
val kafkaOutput = processedStream
.select(
$"user_id" as "key",
to_json(struct($"*")) as "value"
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "processed-events")
.option("checkpointLocation", "/path/to/checkpoint")
.start()The checkpointLocation is required for fault tolerance — it stores the stream's progress.
Writing to Parquet Files
val parquetOutput = events.writeStream
.outputMode("append")
.format("parquet")
.option("checkpointLocation", "/checkpoint/events")
.option("path", "/output/events")
.partitionBy("action")
.start()Event-Time Windowing
Process events based on when they occurred (event time) rather than when they arrived:
import org.apache.spark.sql.functions._
val windowedCounts = events
.groupBy(
window($"event_time", "10 minutes", "5 minutes"), // 10-min window, 5-min slide
$"action"
)
.count()
.select(
$"window.start",
$"window.end",
$"action",
$"count"
)This counts actions in 10-minute windows, sliding every 5 minutes.
Watermarks: Handling Late Data
Without a watermark, Spark keeps state for all windows forever. A watermark tells Spark it can discard state for windows older than a threshold:
val withWatermark = events
.withWatermark("event_time", "10 minutes") // allow 10 minutes of late data
.groupBy(
window($"event_time", "5 minutes"),
$"action"
)
.count()This means: any event more than 10 minutes behind the latest event time is dropped. Windows older than (latest event time - 10 minutes) are finalized and their state is freed.
Checkpointing and Fault Tolerance
Checkpointing is mandatory for production streaming:
val query = stream.writeStream
.outputMode("append")
.format("parquet")
.option("checkpointLocation", "/hdfs/checkpoints/my-stream")
.option("path", "/hdfs/output/my-stream")
.start()
// Monitor the query
println(query.status)
println(query.lastProgress)
// Stop gracefully
query.stop()If the application restarts, it reads the checkpoint and continues from where it left off.
Frequently Asked Questions
Q: What is the difference between event time and processing time in Spark Streaming? Event time is when the event actually occurred (e.g., a user clicked at 14:05:30). Processing time is when Spark processes the event (e.g., it arrived at 14:06:15 due to network lag). Structured Streaming lets you window and aggregate on event time, which is more meaningful for analytics. Without watermarks, late-arriving events can always update past windows; with watermarks, you trade correctness (some late events are dropped) for bounded state size.
Q: What does the checkpointLocation do and why is it required? The checkpoint directory stores the stream's read offsets (e.g., Kafka offsets), aggregation state, and metadata. This enables exactly-once processing: if the application crashes and restarts, Spark reads the checkpoint and continues from the last committed offset. Without a checkpoint, the stream starts over. For stateful operations (windowing, aggregations), checkpointing is required — Spark will throw an error if you omit it.
Q: When should I use Spark Structured Streaming vs Apache Flink? Both are mature streaming frameworks. Use Spark if your team already uses Spark for batch processing — you get a unified API and platform, and the operational overhead is lower. Use Flink when you need true streaming (not micro-batch), very low latency (milliseconds), or more advanced stateful processing primitives. Flink is generally better for complex event processing and stateful applications; Spark Structured Streaming is better when you want batch-streaming unification on an existing Spark cluster.
Part of Scala Mastery Course — Module 18 of 22.
