Spring kafka streams example

Spring kafka streams example DEFAULT

Build a data streaming pipeline using Kafka Streams and Quarkus

In typical data warehousing systems, data is first accumulated and then processed. But with the advent of new technologies, it is now possible to process data as and when it arrives. We call this real-time data processing. In real-time processing, data streams through pipelines; i.e., moving from one system to another. Data gets generated from static sources (like databases) or real-time systems (like transactional applications), and then gets filtered, transformed, and finally stored in a database or pushed to several other systems for further processing. The other systems can then follow the same cycle—i.e., filter, transform, store, or push to other systems.

In this article, we will build a Quarkus application that streams and processes data in real-time using Kafka Streams. As we go through the example, you will learn how to apply Kafka concepts such as joins, windows, processors, state stores, punctuators, and interactive queries. By the end of the article, you will have the architecture for a realistic data streaming pipeline in Quarkus.

The traditional messaging system

As developers, we are tasked with updating a message-processing system that was originally built using a relational database and a traditional message broker. Here's the data flow for the messaging system:

  1. Data from two different systems arrives in two different messaging queues. Each record in one queue has a corresponding record in the other queue. Each record has a unique key.
  2. When a data record arrives in one of the message queues, the system uses the record's unique key to determine whether the database already has an entry for that record. If it does not find a record with that unique key, the system inserts the record into the database for processing.
  3. If the same data record arrives in the second queue within a few seconds, the application triggers the same logic. It checks whether a record with the same key is present in the database. If the record is present, the application retrieves the data and processes the two data objects.
  4. If the data record doesn't arrive in the second queue within 50 seconds after arriving in the first queue, then another application processes the record in the database.

As you might imagine, this scenario worked well before the advent of data streaming, but it does not work so well today.

The data streaming pipeline

Our task is to build a new message system that executes data streaming operations with Kafka. This type of application is capable of processing data in real-time, and it eliminates the need to maintain a database for unprocessed records. Figure 1 illustrates the data flow for the new application:

A flow diagram of the data-streaming pipeline's architecture.

In the next sections, we'll go through the process of building a data streaming pipeline with Kafka Streams in Quarkus. You can get the complete source code from the article's GitHub repository. Before we start coding the architecture, let's discuss joins and windows in Kafka Streams.

Joins and windows in Kafka Streams

Kafka allows you to join records that arrive on two different topics. You are probably familiar with the concept of joins in a relational database, where the data is static and available in two tables. In Kafka, joins work differently because the data is always streaming.

We'll look at the types of joins in a moment, but the first thing to note is that joins happen for data collected over a duration of time. Kafka calls this type of collection windowing. Various types of windows are available in Kafka. For our example, we will use a tumbling window.

Inner joins

Now, let's consider how an inner join works. Assume that two separate data streams arrive in two different Kafka topics, which we will call the left and right topics. A record arriving in one topic has another relevant record (with the same key but a different value) that is also arriving in the other topic. The second record arrives after a brief time delay. As shown in Figure 2, we create a Kafka stream for each of the topics.

A diagram of an inner join for two topics.

The inner join on the left and right streams creates a new data stream. When it finds a matching record (with the same key) on both the left and right streams, Kafka emits a new record at time t2 in the new stream. Because the B record did not arrive on the right stream within the specified time window, Kafka Streams won't emit a new record for B.

Outer joins

Next, let's look at how an outer join works. Figure 3 shows the data flow for the outer join in our example:

A diagram of an outer join.

If we don't use the "group by" clause when we join two streams in Kafka Streams, then the join operation will emit three records. Streams in Kafka do not wait for the entire window; instead, they start emitting records whenever the condition for an outer join is true. So, when Record A on the left stream arrives at time t1, the join operation immediately emits a new record. At time t2, the Kafka stream receives data from the right stream. The join operation immediately emits another record with the values from both the left and right records.

You would see different outputs if you used the and functions on these Kafka streams. In that case, the streams would wait for the window to complete the duration, perform the join, and then emit the data, as previously shown in Figure 3.

Understanding how inner and outer joins work in Kafka Streams helps us find the best way to implement the data flow that we want. In this case, it is clear that we need to perform an outer join. This type of join allows us to retrieve records that appear in both the left and right topics, as well as records that appear in only one of them.

With that background out of the way, let's begin building our Kafka-based data streaming pipeline.

Note: We can use Quarkus extensions for Spring Web and Spring DI (dependency injection) to code in the Spring Boot style using Spring-based annotations.

Step 1: Perform the outer join

To perform the outer join, we first create a class called , then add the function :

@RestController public class KafkaStreaming { private KafkaStreams streamsOuterJoin; private final String LEFT_STREAM_TOPIC = "left-stream-topic"; private final String RIGHT_STREAM_TOPIC = "right-stream-topic"; private final String OUTER_JOIN_STREAM_OUT_TOPIC = "stream-stream-outerjoin"; private final String PROCESSED_STREAM_OUT_TOPIC = "processed-topic"; private final String KAFKA_APP_ID = "outerjoin"; private final String KAFKA_SERVER_NAME = "localhost:9092"; @RequestMapping("/startstream/") public void startStreamStreamOuterJoin() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, KAFKA_APP_ID); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_NAME); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> leftSource = builder.stream(LEFT_STREAM_TOPIC); KStream<String, String> rightSource = builder.stream(RIGHT_STREAM_TOPIC); // TODO 1 - Add state store // do the outer join // change the value to be a mix of both streams value // have a moving window of 5 seconds // output the last value received for a specific key during the window // push the data to OUTER_JOIN_STREAM_OUT_TOPIC topic leftSource.outerJoin(rightSource, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, JoinWindows.of(Duration.ofSeconds(5))) .groupByKey() .reduce(((key, lastValue) -> lastValue)) .toStream() .to(OUTER_JOIN_STREAM_OUT_TOPIC); // build the streams topology final Topology topology = builder.build(); // TODO - 2: Add processor code later streamsOuterJoin = new KafkaStreams(topology, props); streamsOuterJoin.start(); } }

When we do a join, we create a new value that combines the data in the left and right topics. If any record with a key is missing in the left or right topic, then the new value will have the string as the value for the missing record. Also, the Kafka Stream function returns the last-aggregated value for all of the keys.

Note: The and comments are placeholders for code that we will add in the upcoming sections.

The data flow so far

Figure 4 illustrates the following data flow:

  1. When a record with key A and value V1 comes into the left stream at time t1, Kafka Streams applies an outer join operation. At this point, the application creates a new record with key A and the value left=V1, right=null.
  2. When a record with key A and value V2 arrives in the right topic, Kafka Streams again applies an outer join operation. This creates a new record with key A and the value left=V1, right=V2.
  3. When the function is evaluated at the end of the duration window, the Kafka Streams API emits the last value that was computed, per the unique record key. In this case, it emits a record with key A and a value of left=V1, right=V2 into the new stream.
  4. The new stream pushes the record to the  topic.
A diagram of the data streaming pipeline.
Figure 4: The data streaming pipeline so far.">

Next, we will add the state store and processor code.

Step 2: Add the Kafka Streams processor

We need to process the records that are being pushed to the topic by the outer join operation. Kafka Streams provides a Processor API that we can use to write custom logic for record processing. To start, we define a custom processor, , and add it to the streams topology in the class:

public class DataProcessor implements Processor<String, String>{ private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public void process(String key, String value) { if(value.contains("null")) { // TODO 3: - let's process later } else { processRecord(key, value); //forward the processed data to processed-topic topic context.forward(key, value); } context.commit(); } @Override public void close() { } private void processRecord (String key, String value) { // your own custom logic. I just print System.out.println("==== Record Processed ==== key: "+key+" and value: "+value); } }

The record is processed, and if the value does not contain a string, it is forwarded to the sink topic (that is, the topic). In the bolded parts of the class below, we wire the topology to define the source topic (i.e., the topic), add the processor, and finally add a sink (i.e., the topic). Once it's done, we can add this piece of code to the section of the class:

// add another stream that reads data from OUTER_JOIN_STREAM_OUT_TOPIC topic topology.addSource("Source", OUTER_JOIN_STREAM_OUT_TOPIC); // add a processor to the stream so that each record is processed topology.addProcessor("StateProcessor", new ProcessorSupplier<String, String>() { public Processor<String, String> get() { return new DataProcessor(); }}, "Source"); topology.addSink("Sink", PROCESSED_STREAM_OUT_TOPIC, "StateProcessor");

Note that all we do is to define the source topic (the topic), add an instance of our custom processor class, and then add the sink topic (the topic). The method in the custom processor sends the record to the sink topic.

Figure 5 shows the architecture that we have built so far.

A diagram of the architecture in progress.

Step 3: Add the punctuator and StateStore

If you looked closely at the class, you probably noticed that we are only processing records that have both of the required (left-stream and right-stream) key values. We also need to process records that have just one of the values, but we want to introduce a delay before processing these records. In some cases, the other value will arrive in a later time window, and we don't want to process the records prematurely.

State store

In order to delay processing, we need to hold incoming records in a store of some kind, rather than an external database. Kafka Streams lets us store data in a state store. We can use this type of store to hold recently received input records, track rolling aggregates, de-duplicate input records, and more.


Once we start holding records that have a missing value from either topic in a state store, we can use punctuators to process them. As an example, we could add a function to a method. We can set the schedule to call the method.

Add the state store

Adding the following code to the class adds a state store. Place this code where you see the comment in the class:

// build the state store that will eventually store all unprocessed items Map<String, String> changelogConfig = newHashMap<>(); StoreBuilder<KeyValueStore<String, String>> stateStore = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore(STORE_NAME), Serdes.String(), Serdes.String()) .withLoggingEnabled(changelogConfig); ..... ..... ..... ..... // add the state store in the topology builder topology.addStateStore(stateStore, "StateProcessor");

We have defined a state store that stores the key and value as a string. We've also enabled logging, which is useful if the application dies and restarts. In that case, the state store won't lose data.

We'll modify the processor's to put records with a missing value from either topic in the state store for later processing. Place the following code where you see the comment in the class:

if(value.contains("null")) { if (kvStore.get(key) != null) { // this means that the other value arrived first // you have both the values now and can process the record String newvalue = value.concat(" ").concat(kvStore.get(key)); process(key, newvalue); // remove the entry from the statestore (if any left or right record came first as an event) kvStore.delete(key); context.forward(key, newvalue); } else { // add to state store as either left or right data is missing System.out.println("Incomplete value: "+value+" detected. Putting into statestore for later processing"); kvStore.put(key, value); } }

Add the punctuator

Next, we add the punctuator to the custom processor we've just created. For this, we update the 's method to the following:

private KeyValueStore<String, String> kvStore; @Override public void init(ProcessorContext context) { this.context = context; kvStore = (KeyValueStore) context.getStateStore(STORE_NAME); // schedule a punctuate() method every 50 seconds based on stream-time this.context.schedule(Duration.ofSeconds(50), PunctuationType.WALL_CLOCK_TIME, new Punctuator(){ @Override public void punctuate(long timestamp) { System.out.println("Scheduled punctuator called at "+timestamp); KeyValueIterator<String, String> iter = kvStore.all(); while (iter.hasNext()) { KeyValue<String, String> entry = iter.next(); System.out.println(" Processed key: "+entry.key+" and value: "+entry.value+" and sending to processed-topic topic"); context.forward(entry.key, entry.value.toString()); kvStore.put(entry.key, null); } iter.close(); // commit the current processing progress context.commit(); } } ); }

We've set the punctuate logic to be invoked every 50 seconds. The code retrieves entries in the state store and processes them. The function then sends the processed record to the topic. Lastly, we delete the record from the state store.

Figure 6 shows the complete data streaming architecture:

A diagram of the complete application with the state store and punctuators added.

Interactive queries

We are finished with the basic data streaming pipeline, but what if we wanted to be able to query the state store? In this case, we could use interactive queries in the Kafka Streams API to make the application queryable. See the article's GitHub repository for more about interactive queries in Kafka Streams.


You can use the streaming pipeline that we developed in this article to do any of the following:

  • Process records in real-time.
  • Store data without depending on a database or cache.
  • Build a modern, event-driven architecture.

I hope the example application and instructions will help you with building and processing data streaming pipelines. You can get the source code for the example application from this article's GitHub repository.

Last updated: September 23, 2020
Sours: https://developers.redhat.com/blog/2020/09/28/build-a-data-streaming-pipeline-using-kafka-streams-and-quarkus


In this tutorial, I would like to show you how to do real time data processing by using Kafka Stream With Spring Boot.

Stream Processing:

In the good old days, we used to collect data, store in a database and do nightly processing on the data. It is called batch processing!

In this Microservices era, we get continuous / never ending stream of data. Sometimes delaying this data processing might have a severe impact in our business. For example, Let’s consider an application like Netflix / YouTube. Based on the movie/videos we surf, these applications show immediate recommendations. It provides much better user experience and helps with the business. Similarly when we get all the credit card transactions, a Bank might want to check if there is any fraudulent activity and block the card immediately if it is found! Credit card provider would not want to delay this as part of nightly processing.

kafka stream with spring boot

Stream processing is a real time continuous data processing. Lets see how we can achieve a simple real time stream processing using Kafka Stream With Spring Boot.


A basic knowledge on Kafka is required. Read the below articles if you are new to this topic.

Sample Application:

To demo this real time stream processing, Lets consider a simple application which contains 3 microservices.

  • Producer: This Microservice produces some data
    • In the real world, Producer could be you browser/some user action sending movie surf history / credit card transactions etc.
    • In this demo, I would be generating numbers sequentially from 1 to N, every second just to keep things simple to understand.
  • Processor: This Microservice consumes the data, does some processing on the data and writes back to another topic
    • In the real world, this could be the movie recommendation engine for Netflix.
    • In this demo, I would be skipping all the odd numbers and finding the square of the even numbers.
  • Consumer: This Microservice consumes the processed data.
    • In the real world, this could be your browser again to get the latest recommendations based on your movie browsing.
    • In this demo, I would consume the data and print it on the console.

Producer, Processor and Consumer are 3 different applications connected via 2 different Kafka topics as shown below.

Kafka Set up:

Take a look at this article Kafka – Local Infrastructure Setup Using Docker Compose, set up a Kafka cluster. Once done, create 2 topics.

    • numbers (Topic 1 / Source topic)
    • squaredNumbers (Topic 2 / Sink topic)

Spring Boot – Project Set up:

  • Create a simple spring boot application with below dependencies.

  • I create a multi-module maven project with project structure as shown below where each maven-module is a Spring Boot application. I share the link for this project at the end of this article.

Java Functional Interface:

Spring Cloud Functions simplifies these application development by using below Java functional interfaces.

Application TypeJava Functional Interface
Kafka ProducerSupplier
Kafka ConsumerConsumer
Kafka ProcessorFunction

Kafka Stream Producer:

Working on Kafka Stream with Spring Boot is very easy! Spring Boot does all the heavy lifting with its auto configuration. I create a simple bean which will produce a number every second.

  • If the bean type is supplier, Spring Boot treats it as a producer.
  • I use flux as it is going to be a data stream

Now important question is where would the data be written into? If you remember, we had created a topic for this – numbers. We configure that via application.yaml as shown below.

  • spring.cloud.stream.function.definition where you provide the list of bean names (; separated).
  • spring.cloud.stream.bindings.numberProducer-out-0.destination configures where the data has to go!
    • out indicates that Spring Boot has to write the data into the Kafka topic. As you would have guessed, to read the data, simply use in.
  • Spring does it own serialization/deserialization. I skip that and I go with Kafka native serialization and deserialization with these properties
    • spring.cloud.stream.bindings.numberProducer-out-0.producer.use-native-encoding
    • spring.cloud.stream.kafka.bindings.numberProducer-out-0.producer.configuration.value
  • Then I configure the kafka brokers address.

Kafka Stream Consumer:

As you had seen above, Spring Boot does all the heavy lifting. This is what I have to do to consume the data.

  • Create a bean of type Consumer to consume the data from a Kafka topic.
  • KStream<String, Long>
    • Key type is String
    • Value type is Long
  • We simply print the consumed data.
  • application.yaml
    • As usual I update the spring cloud function bean name
    • We assume that squaredNumbers topic is created already and we consume the data from the topic.
    • To consume the data I use in

Kafka Stream Processor:

Processor is both Producer and Consumer. It consumes the data from 1 topic and produces data for another topic.

In our case, we have to do the following

  • consume the data from numbers topic
  • remove the odd numbers
  • squares the even number
  • write back into another topic.

Lets create the processor by using the corresponding Functional Interface in Java which is Function<T, R>. 

  • We consume the data which is KStream<String, Long>
  • We do some processing
  • Then we return the KStream<String, Long>. Do note that the return type could be anything. Does not have to be same as Input type.
  • application.yaml
    • Using in we consume the data, then to write we use out.

Kafka Stream Processing:

Now at this point, everything seems to be ready. Start the applications. Once the apps are connected to the Kafka brokers, we can see the console outputs as shown below.

Bonus: Kafka + Spring Boot – Event Driven:

When we have multiple microservices with different data sources, data consistency among the microservices is a big challenge. You can take a look at this article how the problem is solved using Kafka for Spring Boot Microservices – here.


We were able to successfully demonstrate real time data processing by using Kafka Stream with Spring Boot.

Learn more about Kafka + SpringBoot.

Check out other Streaming solutions – Redis Stream With Spring Boot – Real Time Data Processing

The source code is available here.

Happy learning 🙂

Sours: https://www.vinsguru.com/kafka-stream-with-spring-boot/
  1. Oculus rift controller buttons
  2. One up dropper
  3. Zelda ds games
  4. Extreme paintball store

Kafka Streams with Spring Cloud Streams will help you understand stream processing in general and apply it to Kafka Streams Programming using Spring Boot.

This course uses the Kafka Streams library compatible with Spring Cloud 2020. All the source code and examples used in this course have been tested by the author on Confluent Platform 6.0.0, which is compatible with Apache Kafka 2.6 open-source distribution.

This is a fully example-driven course, and you will be working with multiple examples during the entire session. We will be making extensive use of IntelliJ IDEA as the preferred development IDE and Apache Maven and Gradle as the preferred build tool. However, based on your prior experience, you should be able to work with any other IDE designed for Spring application development and any other build tool designed for Java applications.

This course also makes use of Log4J2 to teach you industry-standard log implementation in your application. We will be using JUnit5, which is the latest version of JUnit, to implement unit test cases.

Working examples and exercises are the most critical tool to sharpen your skills. This course consists of some programming assignments as and when appropriate. These exercises will help you validate and check your concepts and apply your learning to solve programming problems.

The code bundles for this course is available in https://github.com/PacktPublishing/Kafka-Streams-with-Spring-Cloud-Stream

Sours: https://www.packtpub.com/product/kafka-streams-with-spring-cloud-stream-video/9781801811422

Roy Tutorials


In this tutorial I will show you how to work with Apache Kafka Streams for building Real Time Data Processing with STOMP over Websocket using Spring Boot and Angular 8. We will see how to build push notifications using Apache Kafka, Spring Boot and Angular 8. We need to provide some basic things that Kafka Streams requires, such as, the cluster information, application id, the topic to consume, Serdes to use, and so on. I will show you how to build the application using both gradle and maven build tools.

I would not make the content lengthy by explaining about Kafka Streams but you can always find very good documentation here.

Let’s look at some of these basic things that Kafka Streams requires:

Cluster Information

By default, the binder will try to connect to a cluster that is running on localhost:9092. If that is not the case, you can override that by using the available configuration properties.

Application ID

In a Kafka Streams application, application.id is a mandatory field. Without it, you cannot start a Kafka Streams application. By default, the binder will generate an application ID and assign it to the processor. It uses the function bean name as a prefix.

Topic to consume from

You need to provide a topic from where Kafka will consume the stream of messages.

Serialization and Deserialization (Serdes)

Kafka Streams uses a special class called Serde to deal with data marshaling. It is essentially a wrapper around a deserializer on the inbound and a serializer on the outbound. Normally, you have to tell Kafka Streams what Serde to use for each consumer. Binder, however, infers this information by using the parametric types provided as part of Kafka Streams.


At least Java 8, Eclipse 4.12, Spring Boot 2.2.2, Maven 3.6.1, Gradle 5.6, Spring Kafka 2.3.4, Angular 8

How to setup and work with Apache Kafka in Windows Environment

How to create new Angular project in Windows

Server Application

Create Project

Let’s create a project either maven or gradle based in Eclipse IDE. The name of the project is spring-apache-kafka-streams-websocket-stomp-server.

If you are creating gradle based project then you can use below build.gradle script:

I f you are creating maven based project then you can use below pom.xml file:

Application Properties

We will create application.properties file under classpath directory src/main/resources to configure some basic settings for Kafka.

Producer will produce messages into roytuts-input topic. Kafka stream processor will consume the message from roytuts-input topic and write into roytuts-output topic. Next consumer will consume messages from roytuts-output topic.

Finally will write to /topic/greeting.

Create Topics

We will create two topics in Kafka for consuming and publishing messages.

To create a topic we need to add a bean of type . If the topic already exists then this bean is ignored.

The topics will be created during application start up.

Kafka Streams

Kafka stream processor will consume from input topic and do some business processing on input data and write to output topic. Though in this example the processor just reads the messages and writes to topic but ideally your application will do some business processing on the input data.

Configure WebSocket and Stomp

The above class is annotated with to indicate that it is a Spring configuration class.

The class is also annotated and enables WebSocket message handling, backed by a message broker.

The method overrides the default method in interface to configure the message broker.

It starts by calling to enable a simple memory-based message broker to carry the greeting messages back to the client on destinations prefixed with /topic.

The method registers the endpoint, enabling SockJS fallback options so that alternate transports may be used if WebSocket is not available.

The SockJS client will attempt to connect to and use the best transport available (websocket, xhr-streaming, xhr-polling, etc).

Related Posts:

Greeting Service

This class will generate different greet messages depending upon the time of the day. I have also appended random generated string so that we will get different random string appended with actual greet message to differentiate from each other every and it will actually tell us that we are getting every time the new message.

Send Message

Spring’s is auto-configured and it can be autowired directly into bean to send a message.

You will get different overloaded methods of  and you can choose according to your needs.

In your real application the source of data ideally would be different, such as some feed URL, external web service or anything else but in this example I am pushing data every 3 seconds using scheduler to simulate the data feed.

The data or messages are sent to topic roytuts-input.

Consume Message

Now we will consume the messages which were written to roytuts-output topic by Kafka stream processor.

With Apache Kafka infrastructure a bean can be annotated with  to create a listener endpoint on a topic.

Finally we will send to stomp topic /topic/greeting.

Create Main Class

A main class means a class is having the main method that starts the application. So in our Spring Boot application main class is enough to run the application.

Testing the Application

Make sure your ZooKeeper server and Kafka broker are running before you run the main class.

Running the main class will produce the stream of messages in the console:

We are done with the server application on building real time data processing using Apache Kafka Streams.

Now we will see how to create client application in Angular 8 to see push notifications continuously on browser.

Client Application

Create Project

As I said in prerequisites section how to create Angular project in Windows environment, so first create an Angular project. The name of the project is spring-apache-kafka-streams-websocket-stomp-client-angular.

Installing Required Modules

Install the required modules with the following commands:

The  is required to connect over STOMP.

The  is required to establish connection with WebSocket server.

The  is required to directly access DOM elements in the HTML page.

To avoid  issue we need to install  module.

Update index.html

We need to declare window in the src/index.html file to avoid the below issue:

The complete content of src/index.html file is given below:

Update app.component.html

We will update the src/app/app.component.html file to put a div tag where greeting message will be updated.

Update app.component.ts

We will update src/app/app.component.ts file to consume the message over STOMP.

We set the page title by implementing  interface in the  method.

We establish connection to the WebSocket server, client socket subscribe to the topic  destination, where the server will publish greeting messages and finally we update the div (having a class ) on HTML page.

We are here to just get the message from server as a push notification towards clients.

Testing the Application

With your server running now run the client application by executing command .

Your application opens at http://localhost:4200 and you will see the message being updated every 3 seconds.

Apache Kafka Streams for building Real Time Data Processing with STOMP over Websocket using Spring and Angular

In the above image the highlighted random string will be continuously changing.

Source Code


Thanks for reading.

Tagged Publish Subscribe

Author: Soumitra

Hi, I am, Soumitra, the owner of roytuts.com and it is my passion for sharing my knowledge and I have been writing blogs on various technologies since 2014. If you like my tutorials, you may also want to like my Facebook Page, follow me on Twitter, Github.
Sours: https://roytuts.com/apache-kafka-streams-for-building-real-time-data-processing-with-stomp-over-websocket-using-spring-and-angular/

Streams spring example kafka

Spring Cloud Stream Kafka Streams first steps

Spring Cloud Stream is the solution provided by Spring to build applications connected to shared messaging systems.

It offers an abstraction (the binding) that works the same whatever underneath implementation we use (the binder):

  • Apache Kafka
  • Rabbit MQ
  • Kafka Streams
  • Amazon Kinesis
  • ...

You can also check out Spring Cloud Stream Kafka step by step where I got working a simple example using Kafka binder.

Let's try this time a simple example using Kafka Streams binder! 🤩

First steps

A bit of documentation to start with:

(this demo uses Spring Cloud 2020.0.3 that adds Kafka Streams 2.7.1)


We want to implement this flow:


  1. We receive messages with key = username and value = { score: number } from topic pub.scores
  2. We have to calculate the total score received by username on fixed windows of 10 seconds and send it to topic pub.totals

Integration Test

First we create a project using this spring initializr configuration and we add Kafka Streams binder dependency spring-cloud-stream-binder-kafka-streams.

Using testcontainers and docker-compose with a Kafka container, we write the following integration test:

This test will obviously fail, but it should work once we have finished our implementation.

Note that we have to send another message after the window has expired to force Kafka Streams close the window. It is the only way for Kafka Streams to be sure that there are no more messages left for that window. In other words, what we are really implementing is:


Kafka Streams binder configuration

Next we configure the Kafka Streams binder:

With this configuration:

  • Spring Cloud Stream will create a Kafka Streams binder connected to localhost:9094
  • We need to create a @Bean named totalScoreProcessor that should implement Function<KStream, KStream> interface
    • This @Bean will connect a KStream subscribed to pub.scores topic to another KStream publishing to pub.totals topic

You can find all the available configuration properties documented in Kafka Streams Properties.

TotalScoreProcessor first implementation

We can start with a simple implementation for a TotalScoreProcessor that for every ScoreEvent received will generate a TotalScoreEvent with the same value:

💡 We are using Spring Cloud Stream's default serialization/deserialization of Kotlin data classes to Json. In order for this to work we need to add com.fasterxml.jackson.module:jackson-module-kotlin dependency.

This implementation is not fulfilling our goal yet, just execute MyApplicationIntegrationTest and see it still failing! 😓

TotalScoreProcessor test using kafka-streams-test-utils

Using the Test Pyramid principle we should use integration tests to test the simple test cases and test the more complicated ones using unit tests (if not unit tests at least less "integrated" tests).

To create these less "integrated" tests we can use kafka-streams-test-utils.

They will be faster and more reliable (not needing Kafka) and with some cool features like "advance time" to simulate messages published at different instants in time.

Here it is one way to create a TopologyTestDriver from kafka-streams-test-utils to test our TotalScoreProcessor:

And then we can write tests like this one:

Final implementation

After a few iterations your TotalScoreProcessor implementation should look similar to this:

Note that we use the suppression operator to emit nothing for a window until it closes, and then emit the final result. If we were not using it we would have an output message for each input message.

Now you can play around with Kafka Streams DSL and do more complicated stuff!

Happy coding!

Ofertas Backend

Sours: https://dev.to/adevintaspain/spring-cloud-stream-kafka-stream-binder-first-steps-1pch
Kafka Streams, Spring Boot, and Confluent Cloud - Livestreams 002

This repository contains a collection of applications written using Spring Cloud Stream. All the applications are self contained. They can be run against either Kafka or RabbitMQ middleware technologies. You have the option of running the samples against local or Docker containerized versions of Kafka and Rabbit. For convenience, files are provided as part of each application wherever it is applicable. For this reason, Docker Compose is required and it’s recommended to use the latest version. These compose files bring up the middleware (kafka or Rabbit) and other necessary components for running each app. If you bring up Kafka or RabbitMQ in Docker containers, please make sure that you bring them down while in the same sample directory. You can read the README that is part of each sample and follow along the instructions to run them.

You can build the entire samples by going to the root of the repository and then do: However, the recommended approach to build them is to pick the sample that you are interested in and go to that particular app and follow the instructions there in the README for that app.

Following is the list of various sample applications provided

Source samples

  • Source with dynamic destinations (Kafka and Rabbit)

Processor samples

  • Basic StreamListener sample (Kafka and Rabbit)

  • Transformer sample (Kafka and Rabbit)

  • Reactive processor sample (Kafka and Rabbit)

  • Sensor average calculation using reactive patterns (Kafka and Rabbit)

Multi IO sample

  • Sample with multiple input/output bindings (Kafka and Rabbit)

Multi Binder samples

  • Multi binder - Input with Kafka and output with Rabbit

  • Multi binder - Same binder type but different clusters (Kafka only, but can be extended for Rabbit as well)


  • Kinesis produce consume sample

Kafka Streams samples

A collection of various applications in stream processing using Spring Cloud Stream support for Kafka Streams binding.

  • Kafka Streams word count

  • Kafka Streams branching

  • Kafka Streams DLQ

  • Kafka Streams aggregation

  • Kafka Streams Interactive query basic

  • Kafka Streams Interactive query advanced

  • Kafka Streams product tracker

  • Kafka Streams KTable join

  • Kafka Streams and normal Kafka binder together

Testing samples

  • Sample with embedded Kafka

  • General testing patterns in Spring Cloud Stream

Sours: https://github.com/spring-cloud/spring-cloud-stream-samples

Now discussing:

Spring boot kafka stream example

Apk ome tv no banned

Externalizing Application Configuration is a key challenge in developing great Application. Spring Boot provides a variety of options to expose Application Configuration. In this tutorial, we explore the basics of application.properties, @Value and @ConfigurationProperties.

    1. Spring Kafka - Batch Listener Example 7 minute read Starting with version 1.1 of Spring Kafka, @KafkaListener methods can be configured to receive a batch of consumer records from the consumer poll operation.. The following example shows how to setup a batch listener using Spring Kafka, Spring Boot, and Maven.

    2. In this tutorial I will show you how to work with Apache Kafka Streams for building Real Time Data Processing with STOMP over Websocket using Spring Boot and Angular 8. We will see how to build push notifications using Apache Kafka, Spring Boot and Angular 8.

    3. Example of configuring Kafka Streams within a Spring Boot application with an example of SSL configuration - KafkaStreamsConfig.java. Example of configuring Kafka Streams within a Spring Boot application with an example of SSL configuration - KafkaStreamsConfig.java. ... // stream config centric ones: props. put ...

    4. Spring Kafka and Spring Boot Configuration Example. by MemoryNotFound · Published March 8, 2018 · Updated March 8, 2018. In the following tutorial we demonstrate how to configure Spring Kafka with Spring Boot. Spring Boot uses sensible default to configure Spring Kafka. We can override these defaults using the application.yml property file.

    5. Health Checks with Spring Boot. 2020-09-21 20:00:00 +0000. Monitoring and observability are essential in distributed environments and they rely on effective health checking mechanisms that can be observed at runtime. In this article, we will build health check functions in Spring Boot applications and make them observable by capturing useful ...

    6. ⭐⭐⭐⭐⭐ Spring Boot Kafka Stream Json Example; Spring Boot Kafka Stream Json Example ...

    7. In this tutorial, I will show you how to upload and download files with a Spring Boot Rest APIs to/from a static folder. We also use Spring Web MultipartFile interface to handle HTTP multi-part requests. This Spring Boot App works with: - Angular 8 Client / Angular 10 Client / Angular 11 Client / Angular […]

    8. Summary • Scalable pipelines composed of Spring Boot cloud native applications • Spring Cloud Stream provides the programming model • Transparently mapping to Kafka-native concepts • Spring Cloud Data Flow provides the orchestration model 49.

    9. In Spring Boot JPA Auditing Example, we will look at how Spring Data JPA helps managing audit information. Using the AuditorAware Interface, Spring Data JPA provides mechanism for capturing audit information. Such a functionality is quite important while building Spring Boot Microservices for production usage.. Previously, we have also seen how Spring Boot Hibernate Envers integration helps ...

    Spring Boot Kafka Consume JSON Messages Example: On the above we have created an items-topic from Kafka cli, now we are going to send some JSON messages from Kafka producer console and listen the items-topic from Spring boot application to read messages as soon as producer publishes the messages.

    1. In this tutorial, I will show you how to upload and download files with a Spring Boot Rest APIs to/from a static folder. We also use Spring Web MultipartFile interface to handle HTTP multi-part requests. This Spring Boot App works with: - Angular 8 Client / Angular 10 Client / Angular 11 Client / Angular […]

    2. Now that our OrderService is up and running, it's time to make it a little more robust and decoupled. In this blog post we're gonna put Kafka in between the OrderResource controller and our Spring Boot back-end system and use Spring Cloud Stream to ease development:. Upon creation of a JHipster application you will be given an option to select the Asynchronous messages using Apache Kafka ...

    3. Dec 13, 2015 · This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra. Part 1 - Overview. Part 2 - Setting up Kafka. Part 3 - Writing a Spring Boot Kafka Producer. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra. Part 5 - Displaying Cassandra Data With Spring Boot.

    4. ⭐⭐⭐⭐⭐ Spring Boot Kafka Stream Json Example; Spring Boot Kafka Stream Json Example ...

  • In this tutorial series, we will be discussing about how to stream log4j application logs to apache Kafka using maven artifact kafka-log4j-appender.To keep application logging configuration simple, we will be doing spring boot configurations and stream log4j logs to apache Kafka.. We have already installed and configured apache Kafka in our local system in my last article - Apache Kafka With ...


Haina de preot
Sours: https://qvnv.stefanostefani.it/spring-boot-kafka-stream-example.html

2149 2150 2151 2152 2153