Apache Flink Dataset And DataStream APIs

Apache Flink provides a rich set of APIs which are used to perform the transformation on the batch as well as the streaming data. Different types of Apache Flink transformation functions are joining, mapping, filtering, aggregating, sorting, and so on. We can use these transformations to apply to the distributed data in the Apache Flink cluster.

Let us see the different types of APIs offered by Apache Flink.


Apache Flink Dataset API

Apache Flink Dataset API performs the batch operation on the dataset. The dataset can be received by reading the local file or from different sources. Flink performs the transformation on the dataset using different types of transformation functions such as grouping, filtering, joining, after that the result is written on a distributed file or a standard output such as a command-line interface. We can use Dataset API with Java, Python, and Scala.

The following is an example of the Word count program that counts the number of word appearances. Please refer "Run a Flink Program" section to run this program.

public class WordCountProgram {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> text = env.fromElements( "Hello There! ", "Welcome to the Cloudduggu Apache Flink Tutorial");

        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new LineSplitter())
            .groupBy(0)
            .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

Apache Flink DataStream API

Apache DataStream API handles the data that is continuous in form also called a stream. It receives stream data from various data sources such as files, message queue, socket streams and provides various transformation functions that we can use to perform a transformation on the stream data set such as aggregation, filtering, state updates, and so on. After processing the output is written to the file or on the command line terminal. We can use Java and Scala both to use Apache DataStream API.

The following is an example of the Word count program that receives data from another window in a stream form and calculates the word count.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWord_program {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);

        dataStream.print();
        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}