What is Apache Spark Streaming?
Apache Spark Streaming is used to process live streaming data. It provides high-throughput, scalable, and fault-tolerant stream processing. The source of input data could be Kinesis, Flume, Flume that is processed by applying functions such as a map, reduce, join. Once the processing is completed the output is pushed to a database, filesystems, or a live running dashboard.
Apache Spark Graphics and Machine learning can also be applied to data streaming.
The following figure shows the processing of Apache Spark Streaming. It receives live data from multiple sources and those data in a small batch and then the Spark engine process those batches and produces the result.
The Spark StreamingContext is created using the SparkConf object.
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1))
- Here the appName parameter represents the name of the Application.
- The Master represents Kubernetes, Mesos, YARN cluster URL detail otherwise for local mode, it can be local server detail.
Additionally, we can create a StreamingContext object from an existing SparkContext as well.
import org.apache.spark.streaming._ val sparkc = ... // existing SparkContext val ssc = new StreamingContext(sparkc, Seconds(1))
The following points should be considered once the Spark context is defined.
- Create an input DStreams to set the input source.
- Apply transformation using streaming computations.
- Use streamingContext.start() to receive and start processing data.
- Use streamingContext.awaitTermination() and wait till the processing is stopped.
- By using streamingContext.stop(), the processing can be stopped.
The following points should be remembered.
- After a context is started, new streaming computations can't be set or can't be added.
- In case the context is ended then it won't be restarted.
- At the same time, one StreamingContext is active in JVM.
- Stop() on StreamingContext also stops the SparkContext and if you want to stop the StreamingContext then call stopSparkContext to false.
- If a StreamingContext is stoped then a SparkContext can be used again to produce many StreamingContexts.
Apache Spark Streaming Program Example
Let us understand Apache Spark streaming using a word count program.
- In this section, we will use the word count scala script (NetworkWordCount.scala) which is located below the path. Please check your SPARK_HOME examples directory.
- Next, we will open two terminals from the first terminal we will pass value using Netcat UNIX utility and from the second terminal, we will run NetworkWordCount.scala script.
- In the first terminal, we will run the Netcat server using the below command.
- $nc -lk 9999
- In the second terminal, we will run NetworkWordCount.scala script from SPARK_HOME using the below command.
- $ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999
- Now from the First terminal, we will type “this is word count program” (refer screenshot).
- Once you type “this is word count program” you can see each word count result in the second terminal which indicates that Apache Spark Streaming is working fine in real-time.
Click Here To Download Code File.
DStream is the basic concept of Apache Spark Streaming that is characterized as constant streaming data. It receives an input data stream from sources or the data stream is produced by applying the transformation on the input stream. DStream internally is an RDD that contains data for a certain time interval as projected in the below figure.
If a user is applying any operation on the DStream then internally it will be applied on RDD and the Spark engine performs computation on those RDDs.
Input DStreams and Receivers
The input data received from streaming sources are called Input DStreams. In the (NetworkWordCount) scala program the stream of data was received from Netcat server, every Input DStreams is attached with a Receiver object which receives the data from a source and stores it in Spark’s memory for processing.
We have the following two types of build-in streaming sources provided by Apache Spark Streaming.
- Basic Sources
- Advanced Sources
Please note the following points.
- When we are running Apache Spark Streaming program locally then we should not use “local” or “local” as the master URL because that represents a thread that will run for a program despite we can use “local[n]” for the master URL in which "n" represents the number of the receivers to run.
- The second point is that the system core for the Apache Spark Streaming application should be greater than the number of receivers if it does not happen then the system won't be able to process data, it will just receive data.
Let us understand each build-in Spark Streaming Source.
The basic source for DStream is available in StreamingContext API such as socket connections or file systems. The StreamingContext.fileStream creates a DStream to read data from S3, HDFS, NFS, and so on.
In this category sources require interfacing with external non-Spark libraries with complex dependencies such as Kafka and Flume and to avoid version conflicts the functionality to create DStreams from these sources has been moved to separate libraries that can be linked to explicitly when necessary.
These advanced sources are not available in the Spark shell, if we want to use them in the Spark shell then we can download the corresponding Maven artifact’s JAR and their dependencies and set it the classpath.
The following is a list of few advanced sources.
- Apache Kafka: Apache Spark streaming 2.4.5 is fit with Kafka broker versions 0.8.2.1.
- Apache Flume: Apache Spark streaming 2.4.5 is fit with Flume 1.6.0.
- Kinesis: Apache Spark streaming 2.4.5 is fit with Kinesis Client Library 1.2.1.
Transformations on DStreams
We can perform transformation methods on DStreams as well which helps to modify the input DStream data.
The following are some of the important Transformations functions.
It returns a new DStream by giving each element of the source DStream through a function func.
It is comparable to the map method, but each input item can be mapped to 0 or more output items.
It will take source DStream and apply the filter function, if the function returns true then it will return a new DStream.
It will change the level of parallelism in this DStream by creating more or fewer partitions.
It returns a new DStream that contains the union of the elements in the source DStream and others stream.
It calculates the number of elements for every RDD of Source DStream and returns a new DStream of single-element RDDs.
It uses a function(func) to perform aggregation and returns a new DStream.
If we use this function on a DStream element of type A then it will calculate and return a new DStream of (A, Long) pair.
By using an RDD-to-RDD function on each RDD, it returns a new DStream.
Output Operations on DStreams
The output Operations work on the processed data and send the result to external sources such as file systems, databases, storage systems, and so on. These Operations allow external sources to employ transformed data. The example of output Operations is print, saveAsObjectFiles,saveAsTextFiles, and so on.
The following are some of the important Output Operations.
The print function is used to print an initial 10 elements of each batch of data in a DStream. It is helpful in testing and development.
This function is used to save the DStream's contents in a sequence file of a Java serialized object. The filename prefix and suffix would be "prefix-TIME_IN_MS[.suffix].
This function is used to save the DStream's contents in the form of text files and the generated file prefix and suffix would be prefix-TIME_IN_MS[.suffix].
This function is used to save the file in the format of Hadoop. The filename prefix and suffix would be prefix-TIME_IN_MS[.suffix].
It is a very common operator that uses a function(Func) for every RDD produced from the stream and executes in the driver program of a streaming application. It is used to send the RDD data to external systems for example saving data in databases or files.
DStreams Caching / Persistence
Caching data in memory provides faster execution in case the data is going to be used multiple time and we can do by using persist(). This method is used to persist the DStream data in memory so that computation on that data performs fast. The default persists level of data is set to two nodes for fault-tolerance.
Checkpointing in Spark Streaming
Apache Spark Streaming application works 24/4 in industries and keeps processing data which is very crucial for the organization and hence the streaming application is designed to be flexible in case of failures and recover soon. By using the Checkpoint, Apache Spark applications can be recovered from failures.
The following are the two types of checkpoints.
This type of checkpointing is used in case of a node failure. It saves data about streaming in fault-tolerant storage such as HDFS. The Metadata checkpointing runs the driver of a streaming application.
Metadata checkpointing includes the following.
It is used for creating a streaming application.
It is the DStream operation which is defined on streaming applications.
It defines those RDD batch jobs which are in the queue but yet to complete.
The Data checkpointing is used to save the data on a secure storage system. Data checkpointing is needed in the transformation in which a new generated RDD is dependent on the previous batches of RDD that creates a dependency chain and continues to increase. To overcome this. a regular checkpoint happens to transfer data on secure storage to break a dependency chain.
The major difference between Metadata & Data Checkpointing is that Metadata Checkpointing is used in case of driver failure recovery and Data Checkpointing is used for the stateful transformation.
We can monitor the Apache Spark Streaming application using Spark Web User Interface. The Spark UI shows the statistics of running receivers such as how many receivers are active, how many records are received, any error received at the receiver end, and so on. It also shows the completed batches such as processing time, delays, queues, and so on.
In the Apache Spark Web interface the following two points are important.
- Processing Time:It shows the time to process each batch of data.
- Scheduling Delay:It shows the time a batch waits in a queue for the processing of previous batches to finish.
In some cases, if the batch processing time is greater than the batch interval and the delay is also increasing then it is an indication that the system is not processing the batch as fast as it is being generated so try to reduce the batch process time.
The following are the number of parameters and configurations that are tuned to improve the performance of an application.
At high level, We can consider the following points.
Reducing the Batch Processing Times
We can reduce each batch processing time by utilizing cluster resources. Apache Spark supports various types of optimization to reduce the processing time of individual batches.
At a high level, We can consider the following points.
- Level of Parallelism in Data Receiving
- Level of Parallelism in Data Processing
- Data Serialization
- Task Launching Overheads
Setting the Right Batch Interval
By setting the batch size such that the batches of data can be processed as fast as they are received (that is, data processing keeps up with the data ingestion).
The following are some of the parameters which can be used to tune the memory usage and Garbage collection overhead.
Persistence Level of DStreams
By default, the Apache Spark RDD and the input data are persisted in serialized bytes which decreases the utilization of memory and garbage collection burden. Further by using Kryo serialization, memory utilization is reduced. Another option is to reduce memory utilization is through compression(Using spark.rdd.compress parameter).
Clearing Old Data
The data cleaning is an automated process in Apache Spark streaming. Further data cleaning is decided by the transformation which is used. Data can be held for a longer duration using streamingContext.remember. parameter.
CMS Garbage Collector
The garbage collection can be tuned using mark-and-sweep GC that keeps the GC-related stays consistently lower side. We need to make sure that CMS GC is set on both sides, at the driver end using --driver-java-options in spark-submit and at the executors, side using spark.executor.extraJavaOptions configuration.