Wednesday, February 24, 2016

Read File as a Stream and Lookup for some Data and Write the Results to a file.

This topic may feel bit difficult to understand. Following diagram will help to understand the concept behind this project.

Using a simple java application continuous file write program is executing. This will act like a continuous data stream to the flink. Lookup file contain some data that we need to check are there any matching records in both files and those records to be written to the output file.

In this scenario first we will look into the source code of continuous file write application. This application is writes some data to this file in every second.

This is the complete java program code.





import java.io.File;
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;




/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args )
    {
   new Timer().scheduleAtFixedRate(new TimerTask()
   {
       public void run()
       {
        try
        {
         long Mobilenumber = (long) Math.floor(Math.random()*9000000000L) + 1000000000L;
         int cellID = (int) Math.floor(Math.random()*99)+10;
         
         File file = new File("/home/hadoop/lookup_example/data.txt");
         FileWriter outFile = new FileWriter(file,true);
      
      final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
      final Date date = new Date();
         PrintWriter out = new PrintWriter(outFile);
         
            out.println(Mobilenumber+"\t"+cellID);//+"\t"+(dateFormat.format(date)));
            out.close();
            
            if(Mobilenumber>5000000000L && Mobilenumber<6000000000L)
            {
             File lookupFile = new File("/home/hadoop/lookup_example/lookupfile.txt");
             FileWriter lookupOutFile = new FileWriter(lookupFile,true);
       
          PrintWriter lookupOut = new PrintWriter(lookupOutFile);
          
          lookupOut.println(Mobilenumber);
          lookupOut.close();
             
            }
        }
        catch(Exception e)
     {
      
     }    
       }
   },new Date(), 1000);   
    }   
    
}
In this program write data to data.txt and lookupfile.txt data.txt file is the file that contain our main data stream while lookupfile.txt will update time to time when a value go beyond the specified limit because we need some data in the lookup.txt file as well.

Next we will move to flink project.
To create a flink project using command line just type the following code. 

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=0.10.1

This also prompt you those groupId and artifactId parameters to fill. After Successful project creation build a program like this.


import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;

import java.util.concurrent.TimeUnit;

import javax.xml.soap.Node;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.JoinedStreams;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.types.Key;
import org.apache.flink.util.Collector;

import pacl.WordCount.LineSplitter;

class class1 {
 String val1;
 String val2;
 
 class1(String v1,String v2)
 {
  val1=v1;
  val2=v2;
 }
}

class class2{
 String val1;
 String val2;
 
 class2(String v1,String v2)
 {
  val1=v1;
  val2=v2;
 }
 
 public String toString()
 {
  return val1+","+val2;
 }
}

class MyKeySelector implements KeySelector{

 @Override
 public String getKey(class2 arg0) throws Exception {
  // TODO Auto-generated method stub
  return arg0.val1;
 }
 
 
 
}

class MyKeySelector2 implements KeySelector{

 @Override
 public String getKey(class1 arg0) throws Exception {
  // TODO Auto-generated method stub
  return arg0.val1;
 }
 
}

public class lookup {
 
 public static void main(String[] args) throws Exception {

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  
  DataStream lookupData = env
    .readFileStream("/home/hadoop/lookup_example/lookupfile.txt",1000,WatchType.PROCESS_ONLY_APPENDED)
    .flatMap(new SplitterLookup());
  
  lookupData.writeAsText("/home/hadoop/lookup_example/out_1.txt",FileSystem.WriteMode.OVERWRITE);
  
  DataStream dataStream = env
    .readFileStream("/home/hadoop/lookup_example/data.txt",1000,WatchType.PROCESS_ONLY_APPENDED)
    .flatMap(new Splitter());
  
  DataStream d1 = 
    dataStream.join(lookupData).where(new MyKeySelector()).
    equalTo(new MyKeySelector2())
    .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
    .apply(new MyFlatJoinFunction());
 
  d1.writeAsText("/home/hadoop/lookup_example/out.txt",FileSystem.WriteMode.OVERWRITE);
  env.execute("Window Wordcount");
 }
 
 public static class MyFlatJoinFunction implements JoinFunction{
  
  public void join(class2 arg0, class1 arg1, Collector arg2) throws Exception {
   // TODO Auto-generated method stub
   arg2.collect(arg0.val1+","+arg0.val2+","+arg1.val1+","+arg1.val2);
  }

  @Override
  public String join(class2 arg0, class1 arg1) throws Exception {
   // TODO Auto-generated method stub
   return arg0.val1+","+arg0.val2+","+arg1.val1+","+arg1.val2;
  }
 }
 
 public static class SplitterLookup implements FlatMapFunction{
  
  @Override
  public void flatMap(String sentence,Collector out) throws Exception{
   for (String word: sentence.split(" ")){
    out.collect(new class1(word," "));
   }
  }
 }
 
 public static class Splitter implements FlatMapFunction{
  
  @Override
  public void flatMap(String sentence, Collector out) throws Exception{
    String[] values = sentence.split("\t");
     out.collect(new class2(values[0],values[1]));
  }
 }
}
    In main Flink documentation there is mention normal flat file can be used in DataStreams but that is not correct. you have to use readFileStream in order to achieve continuous read from the file.


  • It is advisable to create a classes to store the file read outcomes. This will also give easy access the values.
  • You need to create keySelector that will use to match two keys together and decide both are same and write to the file.
  • Also need to use a window to aggregate data together and process.
  • No comments:

    Post a Comment