Apache Ignite Integrations

The Apache Ignite Integrations Developer Hub

Welcome to the Apache Ignite Integrations developer hub. You'll find comprehensive guides and documentation to help you start working with Apache Ignite Integrations as quickly as possible, as well as support if you get stuck. Let's jump right in!

Get Started    

Kafka Streamer

Overview

Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache.
Either of the following two methods can be used to achieve such streaming:

  • using Kafka Connect functionality with Ignite sink;
  • importing Kafka Streamer module in your Maven project and instantiating KafkaStreamer for data streaming;

Streaming Data via Kafka Connect

IgniteSinkConnector will help you export data from Kafka to Ignite cache by polling data from Kafka topics and writing it to your specified cache.
Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance, as described in the following subsection.
For more information on Kafka Connect, see Kafka Documentation.

Setting up and Running

  1. Put the following jar files on Kafka's classpath
ignite-kafka-x.x.x.jar <-- with IgniteSinkConnector
ignite-core-x.x.x.jar
cache-api-1.0.0.jar
ignite-spring-1.5.0-SNAPSHOT.jar
spring-aop-4.1.0.RELEASE.jar
spring-beans-4.1.0.RELEASE.jar
spring-context-4.1.0.RELEASE.jar
spring-core-4.1.0.RELEASE.jar
spring-expression-4.1.0.RELEASE.jar
commons-logging-1.1.1.jar
  1. Prepare worker configurations, e.g.,
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
  1. Prepare connector configurations, e.g.,
# connector
name=my-ignite-connector
connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
tasks.max=2
topics=someTopic1,someTopic2

# cache
cacheName=myCache
cacheAllowOverwrite=true
igniteCfg=/some-path/ignite.xml
singleTupleExtractorCls=my.company.MyExtractor

where cacheName is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'someTopic1,someTopic2' will be pulled and stored. cacheAllowOverwrite can be set to true if you want to enable overwriting existing values in the cache. If you need to parse the incoming data and decide on your new key and value, you can implement it as StreamSingleTupleExtractor and specify as singleTupleExtractorCls, as shown above.
You can also set cachePerNodeDataSize and cachePerNodeParOps to adjust per-node buffer and the maximum number of parallel stream operations for a single node.

See example-ignite.xml in tests for a simple cache configuration file example.

  1. Start connector, for instance, in a standalone mode as follows,
bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties

Checking the Flow

To perform a very basic functionality check, you can do the following,

  1. Start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
  1. Start Kafka server
bin/kafka-server-start.sh config/server.properties
  1. Provide some data input to the Kafka server
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=,
k1,v1
  1. Start the connector
bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
  1. Check the value is in the cache. For example, via REST API,
http://node1:8080/ignite?cmd=size&cacheName=cache1

Streaming data with Ignite Kafka Streamer Module

If you are using Maven to manage dependencies of your project, first of all you will have to add Kafka Streamer module dependency like this (replace '${ignite.version}' with actual Ignite version you are interested in):

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                        http://maven.apache.org/xsd/maven-4.0.0.xsd">
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-kafka</artifactId>
            <version>${ignite.version}</version>
        </dependency>
        ...
    </dependencies>
    ...
</project>

Having a cache with String keys and String values, the streamer can be started as follows

KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) {
    // allow overwriting cache data
    stmr.allowOverwrite(true);
    
    kafkaStreamer.setIgnite(ignite);
    kafkaStreamer.setStreamer(stmr);
    
    // set the topic
    kafkaStreamer.setTopic(someKafkaTopic);

    // set the number of threads to process Kafka streams
    kafkaStreamer.setThreads(4);
    
    // set Kafka consumer configurations
    kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);
    
    // set extractor
    kafkaStreamer.setSingleTupleExtractor(strExtractor);
    
    kafkaStreamer.start();
}
finally {
    kafkaStreamer.stop();
}

For the detailed information on Kafka consumer properties, refer http://kafka.apache.org/documentation.html

Kafka Streamer