In this scenario first we will look into the source code of continuous file write application. This application is writes some data to this file in every second.
This is the complete java program code.
import java.io.File; import java.io.File; import java.io.FileWriter; import java.io.PrintWriter; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Random; import java.util.Timer; import java.util.TimerTask; /** * Hello world! * */ public class App { public static void main( String[] args ) { new Timer().scheduleAtFixedRate(new TimerTask() { public void run() { try { long Mobilenumber = (long) Math.floor(Math.random()*9000000000L) + 1000000000L; int cellID = (int) Math.floor(Math.random()*99)+10; File file = new File("/home/hadoop/lookup_example/data.txt"); FileWriter outFile = new FileWriter(file,true); final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); final Date date = new Date(); PrintWriter out = new PrintWriter(outFile); out.println(Mobilenumber+"\t"+cellID);//+"\t"+(dateFormat.format(date))); out.close(); if(Mobilenumber>5000000000L && Mobilenumber<6000000000L) { File lookupFile = new File("/home/hadoop/lookup_example/lookupfile.txt"); FileWriter lookupOutFile = new FileWriter(lookupFile,true); PrintWriter lookupOut = new PrintWriter(lookupOutFile); lookupOut.println(Mobilenumber); lookupOut.close(); } } catch(Exception e) { } } },new Date(), 1000); } }In this program write data to data.txt and lookupfile.txt data.txt file is the file that contain our main data stream while lookupfile.txt will update time to time when a value go beyond the specified limit because we need some data in the lookup.txt file as well.
Next we will move to flink project.
To create a flink project using command line just type the following code.
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=0.10.1
This also prompt you those groupId and artifactId parameters to fill. After Successful project creation build a program like this.
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; import java.util.concurrent.TimeUnit; import javax.xml.soap.Node; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.IterativeStream; import org.apache.flink.streaming.api.datastream.JoinedStreams; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.types.Key; import org.apache.flink.util.Collector; import pacl.WordCount.LineSplitter; class class1 { String val1; String val2; class1(String v1,String v2) { val1=v1; val2=v2; } } class class2{ String val1; String val2; class2(String v1,String v2) { val1=v1; val2=v2; } public String toString() { return val1+","+val2; } } class MyKeySelector implements KeySelector{ @Override public String getKey(class2 arg0) throws Exception { // TODO Auto-generated method stub return arg0.val1; } } class MyKeySelector2 implements KeySelector{ @Override public String getKey(class1 arg0) throws Exception { // TODO Auto-generated method stub return arg0.val1; } } public class lookup { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream lookupData = env .readFileStream("/home/hadoop/lookup_example/lookupfile.txt",1000,WatchType.PROCESS_ONLY_APPENDED) .flatMap(new SplitterLookup()); lookupData.writeAsText("/home/hadoop/lookup_example/out_1.txt",FileSystem.WriteMode.OVERWRITE); DataStream dataStream = env .readFileStream("/home/hadoop/lookup_example/data.txt",1000,WatchType.PROCESS_ONLY_APPENDED) .flatMap(new Splitter()); DataStream d1 = dataStream.join(lookupData).where(new MyKeySelector()). equalTo(new MyKeySelector2()) .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .apply(new MyFlatJoinFunction()); d1.writeAsText("/home/hadoop/lookup_example/out.txt",FileSystem.WriteMode.OVERWRITE); env.execute("Window Wordcount"); } public static class MyFlatJoinFunction implements JoinFunction{ public void join(class2 arg0, class1 arg1, Collector arg2) throws Exception { // TODO Auto-generated method stub arg2.collect(arg0.val1+","+arg0.val2+","+arg1.val1+","+arg1.val2); } @Override public String join(class2 arg0, class1 arg1) throws Exception { // TODO Auto-generated method stub return arg0.val1+","+arg0.val2+","+arg1.val1+","+arg1.val2; } } public static class SplitterLookup implements FlatMapFunction{ @Override public void flatMap(String sentence,Collector out) throws Exception{ for (String word: sentence.split(" ")){ out.collect(new class1(word," ")); } } } public static class Splitter implements FlatMapFunction{ @Override public void flatMap(String sentence, Collector out) throws Exception{ String[] values = sentence.split("\t"); out.collect(new class2(values[0],values[1])); } } }
-
In main Flink documentation there is mention normal flat file can be used in DataStreams but that is not correct. you have to use readFileStream in order to achieve continuous read from the file.
No comments:
Post a Comment