IKH

Introduction To DataSet API

By now, you are familiar with the basic concepts of Apache Flink. In this segment, you will look at the DataSet API of Apache Flink, which is used for batch processing.

Before starting with the DataSet API, let’s set up our system to run the Flink program in Java. Refer to the following video to install Maven in your system.

Note:

Please note that you can use the link below to download Apache Maven on your local machine.https://ds-de-flink.s3.amazonaws.com/apache-maven-3.6.3-bin.zip

Now that you have installed Maven in your system, the following video will provide steps to create a project.

The following attached document provides steps explained in the above videos.

You can find the link to Apache Maven installation files at the start of this segment.
You can follow the document below to install Apache Maven on your Windows machine.

In the following video, you will learn about datasets, tuples and anatomy of Apache Flink programs.

Note:

While creating the input file for Email IDs, input the text into a file in separate lines.

Let’s summarise what you have learnt in the video above.

DataSet

  • A Dataset is a finite collection of data objects. Datasets are immutable, i.e., they cannot be altered.
  • Datasets can contain duplicate elements.
  • Data is read from the source into a DataSet and in every transformation step, a new Dataset gets created.
  • A Dataset is created using- DataSet<String> words = …  

Tuple

  • A tuple is a finite collection of data attributes.
  • Tuples hold different attributes of a single data object.
  • Apache Flink has defined Tuple0, Tuple1, Tuple2…up to Tuple25.
  • A tuple is created using-DataSet<Tuple2<String, Integer>> wordCounts = …  

Anatomy of a Flink Program

  • Obtain an execution environment
  • Initially, load data from the data source
  • Specify transformations on this data
  • Specify the data sink
  • Trigger the program execution

The following code is used in the above video. This code checks the validity of an email address. Refer to the comments added in the code for better understanding.packageupgrd; importorg.apache.flink.api.common.functions.MapFunction; importorg.apache.flink.api.java.DataSet; importorg.apache.flink.api.java.ExecutionEnvironment; importorg.apache.flink.api.java.tuple.Tuple2; importorg.apache.flink.api.java.utils.ParameterTool; importorg.apache.flink.core.fs.FileSystem; /** * Skeleton for a Flink Batch Job. * * <p>For a tutorial how to write a Flink batch application, check the * tutorials and examples on the <a href=”https://flink.apache.org/docs/stable/”>Flink Website</a>. * * <p>To package your application into a JAR file for execution, * change the main class in the POM.xml file to this class (simply search for ‘mainClass’) * and run ‘mvn clean package’ on the command line. */ publicclassBatchEmailChecker { publicstaticvoidmain(String[] args) throws Exception { // set up the batch execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Reading Parameters from command line ParameterTool parameters = ParameterTool.fromArgs(args); String inputFilePath = parameters.getRequired(“input”); String outputFilePath = parameters.getRequired(“output”); // Read data from source DataSet<String> text = env.readTextFile(inputFilePath); // Apply transformation DataSet<Tuple2<String, Boolean>> emailValidation = text .map(new MapFunction<String, Tuple2<String, Boolean>>() { @Overridepublic Tuple2<String, Boolean> map(String input) { returnnew Tuple2<String, Boolean>(input, Utilities.isValidEmail(input)); } }); // Write data to the Sink emailValidation.writeAsCsv(outputFilePath, “\n”, “,”, FileSystem.WriteMode.OVERWRITE); //Execute the job env.execute(“Email Checker”); } }

Note:

In order to run the above program in your machine, either you can create a new project and add this java file in your project Or you can use the below-attached folder (contains all the project files).

Additional Reading

  • DataSet API – This is the official documentation link explaining basics of the DataSet API.

Report an error