06 Spark Streaming - Real-time Data Processing
Spark Streaming is Apache Spark’s scalable and fault-tolerant stream processing engine that enables processing of live data streams. It extends Spark’s core capabilities to handle real-time data ingestion and processing, making it possible to build end-to-end streaming applications.
Introduction to Stream Processing
Batch vs Stream Processing
Traditional batch processing works with finite datasets, while stream processing handles continuous, unbounded data streams in real-time.
Batch Processing:
┌─────────────────────────────────────────────────────────────┐
│ Static Dataset │
├─────────────────────────────────────────────────────────────┤
│ Input → Process → Output │
│ │
│ • Fixed size data │
│ • High latency (minutes to hours) │
│ • High throughput │
│ • Complete results │
└─────────────────────────────────────────────────────────────┘
Stream Processing:
┌─────────────────────────────────────────────────────────────┐
│ Continuous Data Stream │
├─────────────────────────────────────────────────────────────┤
│ Input Stream → Real-time Process → Output Stream │
│ │
│ • Unbounded data │
│ • Low latency (milliseconds to seconds) │
│ • Variable throughput │
│ • Incremental results │
└─────────────────────────────────────────────────────────────┘
Real-world Streaming Use Cases
// Common streaming applications
const streamingUseCases = {
// Financial Services
fraudDetection: {
input: "Credit card transactions",
processing: "Real-time anomaly detection",
output: "Fraud alerts within 100ms",
impact: "Prevent financial losses"
},
// E-commerce
recommendationEngine: {
input: "User clicks and purchases",
processing: "Real-time ML inference",
output: "Personalized recommendations",
impact: "Increase conversion rates"
},
// IoT and Manufacturing
predictiveMaintenance: {
input: "Sensor data from machinery",
processing: "Anomaly detection algorithms",
output: "Maintenance alerts",
impact: "Reduce downtime costs"
},
// Social Media
trendingTopics: {
input: "Social media posts and interactions",
processing: "Real-time aggregation and ranking",
output: "Trending topics and hashtags",
impact: "Engage users with relevant content"
},
// Gaming
realTimeAnalytics: {
input: "Player actions and game events",
processing: "Real-time metrics calculation",
output: "Live dashboards and alerts",
impact: "Optimize game experience"
}
};
Spark Streaming Architecture
Core Concepts
Spark Streaming works by discretizing the continuous input stream into batches and processing them using Spark’s batch processing engine.
Spark Streaming Architecture:
┌─────────────────────────────────────────────────────────────┐
│ Input Sources │
├─────────────────┬─────────────────┬─────────────────────────┤
│ Kafka │ Flume │ TCP Sockets │
│ (Messages) │ (Logs) │ (Network Data) │
└─────────────────┴─────────────────┴─────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Spark Streaming │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ DStream (Discretized Stream) │ │
│ │ │ │
│ │ [Batch 1] → [Batch 2] → [Batch 3] → [Batch 4] │ │
│ │ RDD RDD RDD RDD │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Spark Core Engine │ │
│ │ (Batch Processing) │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Output Sinks │
├─────────────────┬─────────────────┬─────────────────────────┤
│ Databases │ File Systems │ Message Queues │
│ (HDFS, S3) │ (Local, NFS) │ (Kafka, RabbitMQ) │
└─────────────────┴─────────────────┴─────────────────────────┘
DStreams (Discretized Streams)
DStreams are the fundamental abstraction in Spark Streaming, representing a continuous sequence of RDDs.
# DStream concept illustration
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
class DStreamExample:
def __init__(self):
self.sc = SparkContext("local[2]", "StreamingExample")
self.ssc = StreamingContext(self.sc, 1) # 1 second batch interval
def create_dstream_from_socket(self):
"""Create DStream from TCP socket"""
# Connect to localhost:9999
lines = self.ssc.socketTextStream("localhost", 9999)
# Each 'lines' represents a batch of data received in 1 second
return lines
def demonstrate_dstream_operations(self):
"""Show various DStream operations"""
lines = self.create_dstream_from_socket()
# Transformation: Split lines into words
words = lines.flatMap(lambda line: line.split(" "))
# Transformation: Map each word to (word, 1)
pairs = words.map(lambda word: (word, 1))
# Transformation: Count words in each batch
word_counts = pairs.reduceByKey(lambda x, y: x + y)
# Action: Print results
word_counts.pprint()
return word_counts
def demonstrate_windowed_operations(self):
"""Show windowed operations"""
lines = self.create_dstream_from_socket()
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
# Window operation: Count words over last 30 seconds,
# updated every 10 seconds
windowed_word_counts = pairs.reduceByKeyAndWindow(
lambda x, y: x + y, # Reduce function
lambda x, y: x - y, # Inverse reduce function
30, # Window duration (30 seconds)
10 # Slide duration (10 seconds)
)
windowed_word_counts.pprint()
return windowed_word_counts
def start_streaming(self):
"""Start the streaming context"""
self.ssc.start() # Start the computation
self.ssc.awaitTermination() # Wait for termination
Micro-batch Processing Model
Micro-batch Timeline:
Time: 0s 1s 2s 3s 4s 5s 6s
│ │ │ │ │ │ │
Input: ████ ████ ████ ████ ████ ████ ████
│ │ │ │ │ │ │
Batch: [B1] [B2] [B3] [B4] [B5] [B6] [B7]
│ │ │ │ │ │ │
Process: ▼ ▼ ▼ ▼ ▼ ▼ ▼
RDD1 RDD2 RDD3 RDD4 RDD5 RDD6 RDD7
Characteristics:
• Batch Interval: 1 second (configurable)
• Each batch becomes an RDD
• Processing happens in parallel
• Results available after each batch
Input Sources and Data Ingestion
Built-in Input Sources
// Scala examples of various input sources
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object InputSources {
val ssc = new StreamingContext(sparkConf, Seconds(1))
// 1. File-based sources
def fileSource(): DStream[String] = {
// Monitor a directory for new files
ssc.textFileStream("hdfs://namenode:port/streaming/input/")
}
// 2. Socket-based sources
def socketSource(): DStream[String] = {
// TCP socket connection
ssc.socketTextStream("localhost", 9999)
}
// 3. Kafka integration
def kafkaSource(): DStream[ConsumerRecord[String, String]] = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "streaming-consumer-group",
"auto.offset.reset" -> "latest"
)
val topics = Array("user-events", "system-logs")
KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
}
// 4. Custom receiver
def customSource(): DStream[String] = {
ssc.receiverStream(new CustomReceiver("custom-source-url"))
}
}
// Custom receiver implementation
class CustomReceiver(url: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
def onStart(): Unit = {
// Start the thread that receives data
new Thread("Custom Receiver") {
override def run(): Unit = { receive() }
}.start()
}
def onStop(): Unit = {
// Cleanup resources
}
private def receive(): Unit = {
var userInput: String = null
try {
// Simulate receiving data
while (!isStopped() && { userInput = receiveData(); userInput != null }) {
store(userInput) // Store received data
}
} catch {
case e: Exception => restart("Error receiving data", e)
}
}
private def receiveData(): String = {
// Implement actual data receiving logic
Thread.sleep(1000)
s"Data received at ${System.currentTimeMillis()}"
}
}
Kafka Integration Deep Dive
# Python Kafka integration example
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
class KafkaStreamingProcessor:
def __init__(self):
self.spark = SparkSession.builder \
.appName("KafkaStreamingApp") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint") \
.getOrCreate()
self.spark.sparkContext.setLogLevel("WARN")
def create_kafka_stream(self):
"""Create streaming DataFrame from Kafka"""
kafka_df = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user-events,system-logs,transactions") \
.option("startingOffsets", "latest") \
.load()
return kafka_df
def process_user_events(self):
"""Process user event stream"""
kafka_df = self.create_kafka_stream()
# Define schema for user events
user_event_schema = StructType([
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", LongType(), True),
StructField("properties", MapType(StringType(), StringType()), True)
])
# Parse JSON messages
parsed_df = kafka_df \
.filter(col("topic") == "user-events") \
.select(
from_json(col("value").cast("string"), user_event_schema).alias("data"),
col("timestamp").alias("kafka_timestamp"),
col("partition"),
col("offset")
) \
.select("data.*", "kafka_timestamp", "partition", "offset")
# Real-time aggregations
user_activity = parsed_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "5 minutes", "1 minute"),
col("event_type")
) \
.agg(
count("*").alias("event_count"),
countDistinct("user_id").alias("unique_users")
)
return user_activity
def detect_anomalies(self):
"""Real-time anomaly detection"""
kafka_df = self.create_kafka_stream()
# Transaction schema
transaction_schema = StructType([
StructField("transaction_id", StringType(), True),
StructField("user_id", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("merchant", StringType(), True),
StructField("timestamp", LongType(), True)
])
transactions = kafka_df \
.filter(col("topic") == "transactions") \
.select(
from_json(col("value").cast("string"), transaction_schema).alias("data")
) \
.select("data.*")
# Detect large transactions (simple rule-based)
anomalies = transactions \
.filter(col("amount") > 10000) \
.withColumn("anomaly_type", lit("large_transaction")) \
.withColumn("detected_at", current_timestamp())
return anomalies
def start_streaming_queries(self):
"""Start all streaming queries"""
# User activity monitoring
user_activity = self.process_user_events()
activity_query = user_activity \
.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.trigger(processingTime="30 seconds") \
.start()
# Anomaly detection
anomalies = self.detect_anomalies()
anomaly_query = anomalies \
.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.start()
# Wait for termination
self.spark.streams.awaitAnyTermination()
# Usage
if __name__ == "__main__":
processor = KafkaStreamingProcessor()
processor.start_streaming_queries()
Transformations and Operations
Stateless Transformations
# Stateless transformations example
class StatelessTransformations:
def basic_transformations(self, dstream):
"""Basic stateless transformations"""
# Map: Transform each element
mapped = dstream.map(lambda x: x.upper())
# Filter: Select elements based on condition
filtered = dstream.filter(lambda x: len(x) > 5)
# FlatMap: Transform and flatten
words = dstream.flatMap(lambda line: line.split(" "))
# Union: Combine multiple DStreams
combined = dstream.union(another_dstream)
return mapped, filtered, words, combined
def aggregation_transformations(self, dstream):
"""Aggregation transformations"""
# ReduceByKey: Aggregate by key within each batch
word_pairs = dstream.map(lambda word: (word, 1))
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
# CountByValue: Count occurrences of each value
value_counts = dstream.countByValue()
# Reduce: Aggregate all elements in each batch
total = dstream.reduce(lambda a, b: a + b)
return word_counts, value_counts, total
def join_operations(self, dstream1, dstream2):
"""Join operations between DStreams"""
# Join: Inner join on keys
joined = dstream1.join(dstream2)
# LeftOuterJoin: Left outer join
left_joined = dstream1.leftOuterJoin(dstream2)
# CoGroup: Group values from both streams by key
cogrouped = dstream1.cogroup(dstream2)
return joined, left_joined, cogrouped
Stateful Transformations
// Stateful transformations in Scala
import org.apache.spark.streaming.State
import org.apache.spark.streaming.StateSpec
object StatefulTransformations {
// UpdateStateByKey: Maintain state across batches
def updateStateByKey(dstream: DStream[(String, Int)]): DStream[(String, Int)] = {
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = newValues.sum + runningCount.getOrElse(0)
Some(newCount)
}
dstream.updateStateByKey(updateFunction)
}
// MapWithState: More efficient state management
def mapWithState(dstream: DStream[(String, Int)]): DStream[(String, Int)] = {
def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[(String, Int)] = {
val currentCount = value.getOrElse(0)
val previousCount = state.getOption().getOrElse(0)
val newCount = currentCount + previousCount
state.update(newCount)
Some((key, newCount))
}
val stateSpec = StateSpec.function(mappingFunction)
.initialState(initialRDD) // Optional initial state
.numPartitions(10) // Number of partitions for state
.timeout(Minutes(30)) // Timeout inactive keys
dstream.mapWithState(stateSpec)
}
// Session-based analytics example
def sessionAnalytics(userEvents: DStream[(String, Event)]): DStream[(String, Session)] = {
case class Event(timestamp: Long, action: String, page: String)
case class Session(startTime: Long, endTime: Long, pageViews: Int, actions: List[String])
def updateSession(key: String, value: Option[Event], state: State[Session]): Option[(String, Session)] = {
val event = value.get
val currentSession = state.getOption()
currentSession match {
case Some(session) =>
// Update existing session
val updatedSession = session.copy(
endTime = event.timestamp,
pageViews = session.pageViews + 1,
actions = session.actions :+ event.action
)
state.update(updatedSession)
Some((key, updatedSession))
case None =>
// Create new session
val newSession = Session(
startTime = event.timestamp,
endTime = event.timestamp,
pageViews = 1,
actions = List(event.action)
)
state.update(newSession)
Some((key, newSession))
}
}
val stateSpec = StateSpec.function(updateSession)
.timeout(Minutes(30)) // Session timeout
userEvents.mapWithState(stateSpec)
}
}
Window Operations
Time-based Windows
# Window operations example
from pyspark.streaming import StreamingContext
from datetime import datetime, timedelta
class WindowOperations:
def __init__(self, ssc):
self.ssc = ssc
def sliding_window_example(self, dstream):
"""Sliding window operations"""
# Count elements in sliding window
# Window: 30 seconds, Slide: 10 seconds
windowed_counts = dstream.countByWindow(30, 10)
# Reduce over sliding window
windowed_sum = dstream.reduceByWindow(
lambda x, y: x + y, # Reduce function
lambda x, y: x - y, # Inverse reduce function (optional)
30, # Window duration
10 # Slide duration
)
return windowed_counts, windowed_sum
def keyed_window_operations(self, pair_dstream):
"""Window operations on key-value pairs"""
# ReduceByKeyAndWindow: Aggregate by key over window
windowed_word_counts = pair_dstream.reduceByKeyAndWindow(
lambda x, y: x + y, # Reduce function
lambda x, y: x - y, # Inverse reduce function
60, # Window duration (60 seconds)
20 # Slide duration (20 seconds)
)
# CountByValueAndWindow: Count values over window
windowed_value_counts = pair_dstream.countByValueAndWindow(60, 20)
return windowed_word_counts, windowed_value_counts
def advanced_window_analytics(self, user_events):
"""Advanced window-based analytics"""
def calculate_metrics(time, rdd):
"""Calculate custom metrics for each window"""
if not rdd.isEmpty():
# Convert to DataFrame for complex operations
df = rdd.toDF(["user_id", "event_type", "timestamp"])
# Calculate various metrics
total_events = df.count()
unique_users = df.select("user_id").distinct().count()
event_distribution = df.groupBy("event_type").count().collect()
print(f"Window ending at {time}:")
print(f" Total events: {total_events}")
print(f" Unique users: {unique_users}")
print(f" Event distribution: {event_distribution}")
# Apply custom function to each window
user_events.window(60, 20).foreachRDD(calculate_metrics)
def real_time_dashboard_data(self, metrics_stream):
"""Generate data for real-time dashboard"""
# 5-minute window, updated every minute
dashboard_metrics = metrics_stream.window(300, 60).transform(
lambda time, rdd: self.calculate_dashboard_metrics(time, rdd)
)
return dashboard_metrics
def calculate_dashboard_metrics(self, time, rdd):
"""Calculate metrics for dashboard"""
if rdd.isEmpty():
return rdd
# Convert to DataFrame for SQL operations
df = rdd.toDF(["metric_name", "value", "timestamp"])
# Register as temporary table
df.createOrReplaceTempView("metrics")
# Calculate aggregated metrics using SQL
result = self.ssc.sparkContext.sql("""
SELECT
'avg_response_time' as metric,
AVG(CASE WHEN metric_name = 'response_time' THEN value END) as value,
MAX(timestamp) as timestamp
FROM metrics
UNION ALL
SELECT
'total_requests' as metric,
SUM(CASE WHEN metric_name = 'request_count' THEN value END) as value,
MAX(timestamp) as timestamp
FROM metrics
UNION ALL
SELECT
'error_rate' as metric,
SUM(CASE WHEN metric_name = 'error_count' THEN value END) /
SUM(CASE WHEN metric_name = 'request_count' THEN value END) * 100 as value,
MAX(timestamp) as timestamp
FROM metrics
""")
return result
Output Operations and Sinks
Built-in Output Operations
// Java output operations example
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.api.java.function.*;
public class OutputOperations {
public void basicOutputs(JavaDStream<String> dstream) {
// Print: Output to console (for debugging)
dstream.print();
dstream.print(20); // Print first 20 elements
// SaveAsTextFiles: Save each RDD to text files
dstream.saveAsTextFiles("hdfs://output/prefix", "suffix");
// SaveAsObjectFiles: Save as serialized objects
dstream.saveAsObjectFiles("hdfs://output/objects", "obj");
// SaveAsHadoopFiles: Save using Hadoop OutputFormat
dstream.saveAsHadoopFiles(
"hdfs://output/hadoop",
"part",
Text.class,
IntWritable.class,
TextOutputFormat.class
);
}
public void customOutputs(JavaDStream<String> dstream) {
// ForEach: Apply custom function to each RDD
dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) throws Exception {
// Custom processing for each RDD
if (!rdd.isEmpty()) {
// Save to database, send to external system, etc.
saveToDatabase(rdd.collect());
}
}
});
// Transform and save
JavaDStream<String> processed = dstream.map(new Function<String, String>() {
@Override
public String call(String s) throws Exception {
return processRecord(s);
}
});
processed.foreachRDD(rdd -> {
// Partition-wise processing for better performance
rdd.foreachPartition(partition -> {
// Initialize connection per partition
DatabaseConnection conn = new DatabaseConnection();
while (partition.hasNext()) {
String record = partition.next();
conn.insert(record);
}
conn.close();
});
});
}
private void saveToDatabase(List<String> records) {
// Database saving logic
System.out.println("Saving " + records.size() + " records to database");
}
private String processRecord(String record) {
// Record processing logic
return record.toUpperCase();
}
}
Advanced Output Patterns
# Advanced output patterns
import json
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
class AdvancedOutputs:
def __init__(self):
self.spark = SparkSession.builder.appName("AdvancedOutputs").getOrCreate()
def multi_sink_output(self, processed_stream):
"""Output to multiple sinks simultaneously"""
def multi_sink_foreach_batch(df, epoch_id):
"""Process each micro-batch"""
# Cache the DataFrame since we'll use it multiple times
df.cache()
try:
# Sink 1: Save to Parquet for analytics
df.write \
.mode("append") \
.partitionBy("date") \
.parquet("s3://data-lake/processed-events/")
# Sink 2: Save alerts to database
alerts = df.filter(col("severity") == "HIGH")
if alerts.count() > 0:
alerts.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://db:5432/alerts") \
.option("dbtable", "real_time_alerts") \
.option("user", "admin") \
.option("password", "password") \
.mode("append") \
.save()
# Sink 3: Send metrics to monitoring system
metrics = df.groupBy("event_type") \
.agg(count("*").alias("count")) \
.collect()
self.send_metrics_to_monitoring(metrics, epoch_id)
# Sink 4: Update real-time dashboard cache
dashboard_data = df.groupBy("region", "event_type") \
.agg(
count("*").alias("event_count"),
avg("processing_time").alias("avg_processing_time")
)
self.update_dashboard_cache(dashboard_data)
finally:
df.unpersist()
# Start the streaming query with custom foreach batch
query = processed_stream.writeStream \
.foreachBatch(multi_sink_foreach_batch) \
.outputMode("update") \
.trigger(processingTime="10 seconds") \
.start()
return query
def exactly_once_delivery(self, stream):
"""Implement exactly-once delivery semantics"""
def idempotent_write(df, epoch_id):
"""Idempotent write operation"""
# Add unique identifier for each batch
df_with_batch_id = df.withColumn("batch_id", lit(epoch_id))
# Use upsert operation (merge) instead of append
df_with_batch_id.createOrReplaceTempView("batch_data")
# Merge logic to handle duplicates
self.spark.sql("""
MERGE INTO target_table t
USING batch_data s
ON t.id = s.id AND t.batch_id = s.batch_id
WHEN NOT MATCHED THEN
INSERT (id, data, batch_id, processed_at)
VALUES (s.id, s.data, s.batch_id, current_timestamp())
""")
query = stream.writeStream \
.foreachBatch(idempotent_write) \
.option("checkpointLocation", "/tmp/checkpoint/exactly-once") \
.start()
return query
def send_metrics_to_monitoring(self, metrics, batch_id):
"""Send metrics to external monitoring system"""
monitoring_data = {
"timestamp": datetime.now().isoformat(),
"batch_id": batch_id,
"metrics": [{"event_type": row["event_type"], "count": row["count"]}
for row in metrics]
}
# Send to monitoring system (e.g., Prometheus, CloudWatch)
print(f"Sending metrics: {json.dumps(monitoring_data, indent=2)}")
def update_dashboard_cache(self, dashboard_data):
"""Update real-time dashboard cache"""
# Write to Redis or similar cache
dashboard_data.write \
.format("org.apache.spark.sql.redis") \
.option("table", "dashboard_metrics") \
.option("key.column", "region") \
.mode("overwrite") \
.save()
Performance Optimization
Tuning Parameters
// Performance tuning configuration
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
object PerformanceTuning {
def optimizedSparkConf(): SparkConf = {
new SparkConf()
.setAppName("OptimizedStreamingApp")
// Memory settings
.set("spark.executor.memory", "4g")
.set("spark.executor.memoryFraction", "0.8")
.set("spark.streaming.receiver.maxRate", "10000") // Max records per second per receiver
// Batch interval optimization
.set("spark.streaming.blockInterval", "200ms") // Block interval for receivers
// Backpressure (dynamic rate limiting)
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.streaming.backpressure.initialRate", "1000")
// Kafka-specific optimizations
.set("spark.streaming.kafka.maxRatePerPartition", "2000")
// Checkpointing
.set("spark.streaming.stopGracefullyOnShutdown", "true")
// Serialization
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Garbage collection
.set("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+PrintGCDetails")
}
def createOptimizedStreamingContext(): StreamingContext = {
val conf = optimizedSparkConf()
val ssc = new StreamingContext(conf, Seconds(2)) // 2-second batch interval
// Set checkpoint directory for fault tolerance
ssc.checkpoint("hdfs://namenode:port/streaming/checkpoint")
ssc
}
// Optimization techniques
def optimizationTechniques(ssc: StreamingContext): Unit = {
// 1. Appropriate batch interval
// Rule of thumb: processing time should be < batch interval
// 2. Parallelism optimization
val inputDStream = ssc.socketTextStream("localhost", 9999)
// Repartition if needed to increase parallelism
val repartitioned = inputDStream.repartition(8)
// 3. Caching for iterative operations
val words = repartitioned.flatMap(_.split(" "))
words.cache() // Cache if used multiple times
// 4. Efficient state management
val wordCounts = words.map((_, 1))
.reduceByKeyAndWindow(
_ + _, // Reduce function
_ - _, // Inverse reduce function (more efficient)
Seconds(60), // Window duration
Seconds(20) // Slide duration
)
// 5. Broadcast variables for lookup tables
val broadcastLookup = ssc.sparkContext.broadcast(loadLookupTable())
val enriched = words.map { word =>
val lookup = broadcastLookup.value
(word, lookup.getOrElse(word, "unknown"))
}
}
def loadLookupTable(): Map[String, String] = {
// Load lookup data
Map("hello" -> "greeting", "world" -> "earth")
}
}
Memory Management
# Memory management strategies
class MemoryManagement:
def configure_memory_settings(self):
"""Configure memory settings for streaming"""
spark_conf = {
# Executor memory settings
"spark.executor.memory": "8g",
"spark.executor.memoryFraction": "0.75",
"spark.executor.cores": "4",
# Storage memory settings
"spark.sql.streaming.stateStore.maintenanceInterval": "60s",
"spark.sql.streaming.stateStore.minDeltasForSnapshot": "10",
# Shuffle settings
"spark.sql.shuffle.partitions": "200",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
# Garbage collection
"spark.executor.extraJavaOptions":
"-XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+PrintGCTimeStamps"
}
return spark_conf
def memory_efficient_processing(self, stream):
"""Memory-efficient stream processing patterns"""
# 1. Process data in smaller batches
def process_partition(partition):
"""Process partition efficiently"""
batch_size = 1000
batch = []
for record in partition:
batch.append(record)
if len(batch) >= batch_size:
# Process batch
yield self.process_batch(batch)
batch = []
# Process remaining records
if batch:
yield self.process_batch(batch)
# 2. Use mapPartitions for memory efficiency
efficient_stream = stream.mapPartitions(process_partition)
# 3. Avoid collecting large datasets
def safe_foreach_batch(df, epoch_id):
"""Safe processing without collecting all data"""
# Process in chunks instead of collecting all
total_count = df.count()
chunk_size = 10000
for i in range(0, total_count, chunk_size):
chunk = df.limit(chunk_size).offset(i)
self.process_chunk(chunk.collect())
return efficient_stream
def process_batch(self, batch):
"""Process a batch of records"""
# Implement batch processing logic
return [record.upper() for record in batch]
def process_chunk(self, chunk):
"""Process a chunk of data"""
# Implement chunk processing logic
print(f"Processing chunk of {len(chunk)} records")
Fault Tolerance and Checkpointing
Checkpointing Mechanism
// Checkpointing and fault tolerance
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class FaultTolerance {
public static JavaStreamingContext createStreamingContext(String checkpointDir) {
// Function to create new StreamingContext
Function0<JavaStreamingContext> createContext = () -> {
SparkConf conf = new SparkConf()
.setAppName("FaultTolerantStreamingApp")
.setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));
// Set checkpoint directory
jssc.checkpoint(checkpointDir);
// Define streaming computation
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(line ->
Arrays.asList(line.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(word ->
new Tuple2<>(word, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((a, b) -> a + b);
wordCounts.print();
return jssc;
};
// Get or create StreamingContext from checkpoint
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(checkpointDir, createContext);
return jssc;
}
public void gracefulShutdown(JavaStreamingContext jssc) {
// Add shutdown hook for graceful termination
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down streaming application gracefully...");
jssc.stop(true, true); // Stop gracefully, wait for completion
System.out.println("Application stopped.");
}));
jssc.start();
jssc.awaitTermination();
}
public void recoverFromFailure() {
String checkpointDir = "hdfs://namenode:port/streaming/checkpoint";
try {
JavaStreamingContext jssc = createStreamingContext(checkpointDir);
gracefulShutdown(jssc);
} catch (Exception e) {
System.err.println("Failed to start streaming context: " + e.getMessage());
// Implement retry logic
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
try {
Thread.sleep(5000); // Wait before retry
JavaStreamingContext jssc = createStreamingContext(checkpointDir);
gracefulShutdown(jssc);
break;
} catch (Exception retryException) {
retryCount++;
System.err.println("Retry " + retryCount + " failed: " +
retryException.getMessage());
}
}
}
}
}
Real-world Example: Real-time Analytics Pipeline
Complete End-to-End Example
# Complete real-time analytics pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
class RealTimeAnalyticsPipeline:
def __init__(self):
self.spark = SparkSession.builder \
.appName("RealTimeAnalyticsPipeline") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint") \
.config("spark.sql.streaming.stateStore.maintenanceInterval", "60s") \
.getOrCreate()
self.spark.sparkContext.setLogLevel("WARN")
def setup_schemas(self):
"""Define schemas for different event types"""
self.user_event_schema = StructType([
StructField("user_id", StringType(), True),
StructField("session_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("page_url", StringType(), True),
StructField("timestamp", LongType(), True),
StructField("user_agent", StringType(), True),
StructField("ip_address", StringType(), True)
])
self.transaction_schema = StructType([
StructField("transaction_id", StringType(), True),
StructField("user_id", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("currency", StringType(), True),
StructField("merchant_id", StringType(), True),
StructField("timestamp", LongType(), True),
StructField("payment_method", StringType(), True)
])
def create_input_streams(self):
"""Create input streams from Kafka"""
# User events stream
user_events_raw = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \
.load()
# Parse user events
self.user_events = user_events_raw \
.select(
from_json(col("value").cast("string"), self.user_event_schema).alias("data"),
col("timestamp").alias("kafka_timestamp")
) \
.select("data.*", "kafka_timestamp") \
.withColumn("event_time", from_unixtime(col("timestamp"))) \
.withWatermark("event_time", "10 minutes")
# Transaction events stream
transactions_raw = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "transactions") \
.option("startingOffsets", "latest") \
.load()
# Parse transactions
self.transactions = transactions_raw \
.select(
from_json(col("value").cast("string"), self.transaction_schema).alias("data")
) \
.select("data.*") \
.withColumn("transaction_time", from_unixtime(col("timestamp"))) \
.withWatermark("transaction_time", "5 minutes")
def real_time_user_analytics(self):
"""Real-time user behavior analytics"""
# Page view analytics
page_views = self.user_events \
.filter(col("event_type") == "page_view") \
.groupBy(
window(col("event_time"), "5 minutes", "1 minute"),
col("page_url")
) \
.agg(
count("*").alias("view_count"),
countDistinct("user_id").alias("unique_visitors"),
countDistinct("session_id").alias("unique_sessions")
) \
.withColumn("avg_views_per_visitor",
col("view_count") / col("unique_visitors"))
# User session analytics
session_analytics = self.user_events \
.groupBy(
window(col("event_time"), "10 minutes", "2 minutes"),
col("session_id"),
col("user_id")
) \
.agg(
count("*").alias("events_in_session"),
collect_list("page_url").alias("page_sequence"),
min("event_time").alias("session_start"),
max("event_time").alias("session_end")
) \
.withColumn("session_duration_minutes",
(unix_timestamp("session_end") - unix_timestamp("session_start")) / 60)
return page_views, session_analytics
def fraud_detection(self):
"""Real-time fraud detection"""
# Detect suspicious transaction patterns
suspicious_transactions = self.transactions \
.groupBy(
window(col("transaction_time"), "1 minute"),
col("user_id")
) \
.agg(
count("*").alias("transaction_count"),
sum("amount").alias("total_amount"),
countDistinct("merchant_id").alias("unique_merchants"),
collect_list("payment_method").alias("payment_methods")
) \
.filter(
(col("transaction_count") > 10) | # Too many transactions
(col("total_amount") > 10000) | # High amount
(col("unique_merchants") > 5) # Too many different merchants
) \
.withColumn("fraud_score",
col("transaction_count") * 0.3 +
col("total_amount") / 1000 * 0.4 +
col("unique_merchants") * 0.3) \
.filter(col("fraud_score") > 5.0)
return suspicious_transactions
def real_time_recommendations(self):
"""Generate real-time recommendations"""
# User behavior patterns
user_patterns = self.user_events \
.filter(col("event_type").isin(["page_view", "click", "purchase"])) \
.groupBy(
window(col("event_time"), "30 minutes", "5 minutes"),
col("user_id")
) \
.agg(
collect_list("page_url").alias("visited_pages"),
count("*").alias("activity_level")
)
# Join with transaction data for purchase behavior
purchase_behavior = self.transactions \
.groupBy(
window(col("transaction_time"), "30 minutes", "5 minutes"),
col("user_id")
) \
.agg(
collect_list("merchant_id").alias("purchased_from"),
avg("amount").alias("avg_purchase_amount")
)
# Combine for recommendations
recommendation_data = user_patterns.join(
purchase_behavior,
["window", "user_id"],
"left_outer"
)
return recommendation_data
def setup_outputs(self):
"""Setup output sinks"""
page_views, session_analytics = self.real_time_user_analytics()
suspicious_transactions = self.fraud_detection()
recommendations = self.real_time_recommendations()
# Output 1: Page views to console (for monitoring)
page_views_query = page_views \
.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.trigger(processingTime="30 seconds") \
.start()
# Output 2: Fraud alerts to database
fraud_query = suspicious_transactions \
.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.trigger(processingTime="10 seconds") \
.start()
# Output 3: Session analytics to Parquet files
session_query = session_analytics \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/tmp/session-analytics") \
.option("checkpointLocation", "/tmp/checkpoint/sessions") \
.trigger(processingTime="60 seconds") \
.start()
# Output 4: Recommendations to Kafka
recommendations_query = recommendations \
.select(to_json(struct("*")).alias("value")) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "recommendations") \
.option("checkpointLocation", "/tmp/checkpoint/recommendations") \
.start()
return [page_views_query, fraud_query, session_query, recommendations_query]
def run_pipeline(self):
"""Run the complete pipeline"""
self.setup_schemas()
self.create_input_streams()
queries = self.setup_outputs()
# Wait for all queries to terminate
for query in queries:
query.awaitTermination()
# Usage
if __name__ == "__main__":
pipeline = RealTimeAnalyticsPipeline()
pipeline.run_pipeline()
Conclusion
Spark Streaming provides a powerful platform for real-time data processing with the following key advantages:
Key Benefits
- Unified Platform: Same API for batch and stream processing
- Fault Tolerance: Automatic recovery from failures
- Scalability: Linear scalability with cluster size
- Integration: Rich ecosystem integration (Kafka, HDFS, databases)
- Exactly-once Processing: Strong consistency guarantees
Best Practices
- Choose appropriate batch intervals based on latency requirements
- Use structured streaming for complex event processing
- Implement proper checkpointing for fault tolerance
- Monitor and tune performance continuously
- Design for exactly-once semantics when needed
When to Use Spark Streaming
- Real-time Analytics: Dashboard updates, metrics calculation
- ETL Pipelines: Continuous data transformation and loading
- Fraud Detection: Real-time anomaly detection
- IoT Processing: Sensor data analysis
- Log Processing: Real-time log analysis and alerting
In the next chapter, we’ll explore Spark SQL for structured data processing and how it complements streaming workloads.