The Sales Kafka Mega Blog

Part 2: The Sales Producer

TECH WOLF
3 min readJun 4, 2022

Architecture of the Sales Producer

Wire Diagram of the Sales Producer

You can find the whole code here:

So lets start a code walk through and understand what each part represent.

First let’s have a look into the project structure of the app:

project structure

So here’s the thing. As clear from the architecture. The data would comes from Sales Table and then would produce the json message to Kafka and as soon the message is sent to Kafka, the same record is saved to Sales History Table.

Some what like this. The code of model would look something loke this:
Sales Model

Sales Model

The Sales History Model would look like this:

Sales History Table

Now let’s move on to the Data Access Object (DAO):

This is what the DAO code look like:

Over here :

public List<SalesHistory> getSalesData()
{
return namedParameterJdbcTemplate.query(selectSales,new SalesMapper());
}

The getSalesData method fetches the data from Sales table every 3 second and process the data.

Now, updateSalesHistory method insert the fetched and processed data in the salesHistory Table:

public boolean updateSalesHistory(SalesHistory salesHistory)
{
try{
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("uuid",salesHistory.getUuid());
params.addValue("MESSAGECREATETIME",salesHistory.getTimestamp());
params.addValue("MESSAGEPAYLOAD",new SqlLobValue(String.valueOf(salesHistory.getSales()),new DefaultLobHandler()),Types.CLOB);
namedParameterJdbcTemplate.update(insertSalesHistory,params);
return true;
}catch(Exception e)
{
return false;
}

}

Here in this method, The SalesMapper is a custom RowMapper which basically is a middleware which decides what we need to do with the sales data we fetched and and then save it into the salesHistory Table. Here’s the code.

So, well done with the microservice part let’s now focus ourselves to the Kafka part:

Lets start with the configuration:

This is a basic configuration of the Producer in kafka client (for Java). Once we are done with this configuration. The we would simply import this as an annotation to the Main Application.

@Import({RootConfig.class})
@SpringBootApplication
public class SalesPublisherApplication {

Over here @Import was used for the same. Now done with the configuration. Let’s move on to the next part. Over here we will be writing the Kafka Producer Client.

If you are familiar with springboot the the code is in itself self explanatory.

Done with the the kafka, let’s move on to the Scheduler.

Scheduler

So like controller there is a concept of scheduler in Java. What it does is basically. It hits itself every “n” seconds unlike Controller which requires user interaction.

Here is the code for the Scheduler, which perform the actions:

The following are the actions peformed by the above scheduler ever 3 seconds:

Fetch Sales Data → Produce the sales info → Saves to SalesHistory → Delete from Sales

I think this is it for the Part 2 of the blog. Next we will move on to the part 3:

Now in part 3 we would be writing the App to consume the data from kafka topic and then use that data for certain business processing and then interface it to UI as part of the part 4.

— — — — — — — — — END — — — — — — — — — — — — — — — — —

--

--