Apache Beam : Kafka to MongoDb

TECH WOLF
5 min readMay 24, 2023

--

This is a small blog on dumping the data from Kafka (source) to MongoDb (Sink).

User Story

The requirement here is, we already have a lot of record coming to different Kafka topic and what we want is any information coming to the topic need to be dumped into the mongoDb data lake with a little of transformation.

Description

Any record coming to Kafka topic need to to be ingested into mongoDb. While the record are being dumped for any error/exception occurred go to DLQ topic. All TECH error in DLQ topic would be reprocessed automatically and put into respective Kafka topic using a scheduler app. The records with business error to go into mongodB table (DLQ) table. The records to be modified manually and then would be put for reprocess manually.

Architecture

Architecture

Building producer app for Post and Customer

Framework: Spring Boot , Database: mySql (tentative)

Producer Apps: 1. bbird-post-prodcuer-app 2. bbird-customer-producer-app.

bbird-post-prodcuer-app

Topic: bbird.post.topic
Database: mySql
partitions: 3

bbird-customer-producer-app:

Topic: bbird.customer.topic
Database: mySql
partitions: 3
DLQ Topic for customer:
Topic: bbird.dlq.customer.topic

The associated code for the producer and dataflow job(APcahe Beam ) is below:

You can go through the code of kafka producer. If you are little bit familier with kafka then it’s cake walk. Otherwise refer my blog on kafka producer:

Now, the most important part Apache Beam:

Apache Beam:

As per the official documentation : Apache Beam is an open source unified programming model to define and execute data processing pipelines, including ETL, batch and stream (continuous) processing.Beam Pipelines are defined using one of the provided SDKs and executed in one of the Beam’s supported runners (distributed processing back-ends) including Apache Flink, Apache Samza, Apache Spark, and Google Cloud Dataflow.

In the example above we have tested the code with DirectRunner on local. But you can use any of the runner above on cloud.

Now the question comes why Apache Beam. We could have gone with other many options like there are lot of tools from Kafka. If we want Kafka Topic A as source and Kafka Topic B as Sink then KStream could have been used or if we want to dump record from kafka to some Db then KafkaConnecter could have been an option. Then why Beam???

Well over the time I have been using Apache Beam and the other frameworks as well but some of the advantages that I have seen choosing beam over other system are

  1. It can operate on both bounded (batch) and unbounded (realtime streaming) data with the same efficency. No other or verfy few had the similar capablity.
  2. Windowing is possible for Apache Beam. This is very useful while dealing with infinite dataset (streaming data). Although so far I have never used this in the projects I have worked on. But here is the offical documentation to refer: https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/transforms/windowing/Window.html#:~:text=with%20windows%20works.-,Windowing,placed%20into%20a%20related%20window.
  3. All short of parallel processing by a distributed system is taken by the framework. So extra code headache is gone using Beam.

There are many other keep exploring and if you find one add to the comment.

A general Apache Beam code consists of these three:

  • Pipeline — A pipeline is a user-constructed graph of transformations that defines the desired data processing operations.
  • PCollection — A PCollection is a data set or data stream. The data that a pipeline processes is part of a PCollection.
  • PTransform — A PTransform (or transform) represents a data processing operation, or a step, in your pipeline. A transform is applied to zero or more PCollection objects, and produces zero or more PCollection objects.

A Beam pipeline is a graph (specifically, a directed acyclic graph) of all the data and computations in your data processing task. This includes reading input data, transforming that data, and writing output data. A pipeline is constructed by a user in their SDK of choice. Then, the pipeline makes its way to the runner either through the SDK directly or through the Runner API’s RPC interface. For example, this diagram shows a branching pipeline:

For the scenario in the example above our Pipeline consists of:

Kafka (bbird.post.topic) as source → MongoDb (Sink)

In case of any failure the record ends up in the dlq topic. Please refer the Architecture to have detailed view.

A PCollection is an unordered bag of elements. Each PCollection is a potentially distributed, homogeneous data set or data stream, and is owned by the specific Pipeline object for which it is created. Multiple pipelines cannot share a PCollection. Beam pipelines process PCollections, and the runner is responsible for storing these elements.

A PCollection generally contains “big data” (too much data to fit in memory on a single machine). Sometimes a small sample of data or an intermediate result might fit into memory on a single machine, but Beam’s computational patterns and transforms are focused on situations where distributed data-parallel computation is required. Therefore, the elements of a PCollection cannot be processed individually, and are instead processed uniformly in parallel.

A PTransform (or transform) represents a data processing operation, or a step, in your pipeline. A transform is usually applied to one or more input PCollection objects. Transforms that read input are an exception; these transforms might not have an input PCollection.

You provide transform processing logic in the form of a function object (colloquially referred to as “user code”), and your user code is applied to each element of the input PCollection (or more than one PCollection). Depending on the pipeline runner and backend that you choose, many different workers across a cluster might execute instances of your user code in parallel. The user code that runs on each worker generates the output elements that are added to zero or more output PCollection objects.

Above is the code for the example explained. Have a look and comment in case of any doubt. Will try to explain. It will be a good learning for me as well.

--

--