JavaScript Required

We're sorry, but we doesn't work properly without JavaScript enabled.

Understanding the Promise of Big Data Processing with Apache Beam

big-data-processingRecently, there has been a war going on between several big data processing framework such as Apache Spark, Apache Flink , Apache Storm etc. but it’s hard to say which one is better since these frameworks are evolving at a very fast pace and come with their own pros and cons. Therefore, a need for framework arises that can provide a clean abstraction over distributed programming model so that we can focus on application logic rather than underlying frameworks. Apache Beam introduced by google came with promise of unifying API for distributed programming. In this blog, we will take a deeper look into Apache beam and its various components.

Apache Beam

is a unified programming model that handles both stream and batch data in same way. We can create pipeline in beam Sdk (Python/Java/Go languages) which can run on top of any supported execution engine namely Apache Spark, Apache Flink, Apache Apex, Apache Samza, Apache Gearpump and Google Cloud dataflow(there are many more to join in future). With Apache beam, you can apply same operations whether it is bounded data from some batch data source like HDFS file or it is unbounded data from some streaming source like Kafka.

apache-beam-sdk

How apache driver program works?

First, create a Pipeline object and set the pipeline execution (which runner to use Apache Spark, Apache Apex etc.).Second, create Pcollection from some external storage or from in-memory data. Then apply PTransforms to transform each element in Pcollection to produce output Pcollection. You can filter, group, analyze or do any other processing on data. Finally, store the final Pcollection to some external storage system using IO libraries. When we run this driver program, it creates workflow graph out of the pipeline, which then executes as an asynchronous job on underlying runner.

apache-driver-program
PipelineOptions options =PipelineOptionsFactory.create (); // Then create the pipeline using pipeline options Pipeline p = Pipeline.create (options); // Create the PCollection reading from file PCollection<String> lines=p.apply ("TextFile", TextIO.read ().from("protocol://inputPath")); //Apply transformation PCollection<String> output = lines.apply(Some transformation) //Write final Pcollection to some external source output.apply (TextIO.write().to("protocol://outputPath"));

Apache Beam Abstractions

Beam Transformations

Pardo

Pardo transformation apply processing function on each elements in PCollection to produce zero, one or more elements in resulting PCollection. You can filter, format, compute, extract and type-convert elements in PCollection using Pardo function. To apply Pardo, you need to create a class extending DoFn that will contain a method annotated with @ProcessElement, which function will contain the processing logic to apply on each element of PCollection and give back the result.

PipelineOptions options =PipelineOptionsFactory.create (); // Then create the pipeline using pipeline options Pipeline p = Pipeline.create (options); // Create Pcollection from a text file. PCollection<String> words = p.apply (“TextFile", TextIO.read ().from("protocol://inputPath ")); // The DoFn to count length of each element in the input PCollection. static class WordLengthFn extends DoFn<String, Integer> { @ProcessElement public void processElement(@Element String word, OutputReceiver<Integer> out) { out.output(word.length()); } } // Apply a ParDo to the PCollection "words" to compute lengths for each word. PCollection<Integer> wordLengths = words.apply( ParDo .of(new ComputeWordLengthFn())); }

GroupByKey

GroupByKey group all values associated with a particular key. Suppose we have below data where key is the month, value is the name of the person whose birthday fall in that month, and we want to group all people whose birthday falls in the same month.

apache-GroupByKey

To use GroupByKey on unbounded data, you can use windowing or triggers to operate grouping on finite set of data falling in that particular window. For example, if you have defined fixed window size of 3 minutes, then all data that comes in 3 minutes span will be grouped based on the key.

CoGroupByKey

CoGroupByKey joins two or more set of data that has same key. For example, you have one file that contain person name as key with phone number as value and second file that has person name as key with email address as value. So joining these two files based on person name using CoGroupByKey will result in a new Pcollection that will have person name as key with phone and email address as values. As discussed in GroupByKey, for unbounded data we have to use windowing and triggers to aggregate data using CoGroupByKey.

apache-GroupByKey

To use GroupByKey on unbounded data, you can use windowing or triggers to operate grouping on finite set of data falling in that particular window. For example, if you have defined fixed window size of 3 minutes, then all data that comes in 3 minutes span will be grouped based on the key.

Flatten

Flatten merges list of PCollection into single PCollection.

// Flatten takes a list of Pcollection and returns a single PCollection PCollection<String> pc1 = … PCollection<String> pc2 = … PCollection<String> pc3 = … PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3); PCollection<String> mergedPCollections = collections.apply(Flatten.<String>pCollections());

Partition

As opposite to Flatten, it splits a single PCollection into multiple smaller PCollection according to partition function that user provide.

// It takes desired number of result partitions and a PartitionFn PCollection<Student> students = … // Split students up into 10 partitions, by percentile: PCollectionList<Student> studentsByPercentile = students.apply(Partition.of(10, new PartitionFn<Student> () { public int partitionFor(Student student, int numPartitions) { return student.getPercentile()* numPartitions / 100; }}));

Pipeline IO

Apache beam comes bundled with numerous IO libraries to integrate with various external sources such as File-based, Messaging and database system to read and write data. You can also write you custom libraries.

Read transforms read from external source such as File/Database/Kafka to create a PCollection

PCollection<String> lines = p.apply(TextIO.read().from("protocol://inputPath"));

Write transforms write the data in PCollection to some external source such as database/filesystem.

output.apply(TextIO.write().to("protocol://outputPath"));

Windowing

Windowing is a concept of dividing data based on timestamp of element. Windowing becomes especially critical when you are working with unbounded data and aggregated transformations like groupByKey and CoGroupByKey because to aggregate data based on some key, you need finite set of data.

You can define different kinds of windows to divide the elements of your Pcollection.

  • Fixed Time Windows:

    This is simplest which has fixed size with non-overlapping windows. Consider a window with 5-minutes duration, it means all the elements in unbounded Pcollection that falls between 0:00 mins to 0.05 mins will be captured in first window; all elements that fall between 0.05 to 0.10 mins will be captured in second window and so on.

    PCollection<String> words = … PCollection<String> fixedWindowed = words.apply( Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))));
  • Sliding Time Windows:

    Sliding time windows are overlapping windows that has a window duration, which defines time interval for which we want to capture data and has a period that defines when a new window will start. For example, suppose we have a sliding window that has window duration of 5 minutes and a period of 20 seconds , it means each window will capture five minutes worth of data, but a new window will start every twenty seconds.

    PCollection<String> words = … PCollection<String> slidingWindowed = words .apply(Window.<String>into(SlidingWindows.of(Duration.st andardSeconds(20)).every(Duration.standardSeconds(5))));
  • Per-Session Windows:

    A session window function defines windows that contain elements that are within a certain gap duration of another element. Session windowing applies on a per-key basis and is useful for data that is irregularly distributed with respect to time. In below code snippet, each session will be will be separated by time duration of atleast 10 minutes.

    PCollection<String> words = … PCollection<String> sessionWindowed = words .apply(Window.<String>into(Sessions.withGap Duration(Duration.standardMinutes(10))));
  • Single Global Window:

    By defaults, all elements in your Pcollection is assigned to a single global window. You can use single global window with both bounded and unbounded data.

    PCollection<String> items = … PCollection<String> singleWindowItems = items .apply(Window.<String>into(new GlobalWindows()));

Conclusion:

No doubt, Apache beam is future of parallel processing and its “write once and execute anywhere” is making it even popular among big data development solutions ecosystem. Currently, there is limited support and integration with backend execution runner but in future more and more frameworks will get integrated to make distributed programming more stable and unified.

Read More:

Ast Note

Some of our clients

team