Apache Ignite Integrations

The Apache Ignite Integrations Developer Hub

Welcome to the Apache Ignite Integrations developer hub. You'll find comprehensive guides and documentation to help you start working with Apache Ignite Integrations as quickly as possible, as well as support if you get stuck. Let's jump right in!

Get Started    

MQTT Streamer

Overview

This streamer consumes from an MQTT topic and feeds key-value pairs into an IgniteDataStreamer instance, using Eclipse Paho as an MQTT client.

You must provide a stream tuple extractor (either a single-entry or multiple-entries extractor) to process the incoming message and extract the tuple to insert.

Features

This streamer supports:

  • Subscribing to a single topic or multiple topics at once.
  • Specifying the subscriber's QoS for a single topic or for multiple topics.
  • Setting MqttConnectOptions to enable features like last will testament, persistent sessions, etc.
  • Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user does not provide one.
  • (Re-)Connection retries powered by the guava-retrying library. Retry wait and retry stop policies can be configured.
  • Blocking the start() method until the client is connected for the first time.

Example

Here's a trivial code sample showing how to use this streamer:

// Start Ignite.
Ignite ignite = Ignition.start();

// Get a data streamer reference.
IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer("mycache");

// Create an MQTT data streamer  
MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
streamer.setIgnite(ignite);
streamer.setStreamer(dataStreamer);
streamer.setBrokerUrl(brokerUrl);
streamer.setBlockUntilConnected(true);

// Set a single tuple extractor to extract items in the format 'key,value' where key => Int, and value => String
// (using Guava here).
streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
    @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
        List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));

        return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
    }
});

// Consume from multiple topics at once.
streamer.setTopics(Arrays.asList("def", "ghi", "jkl", "mno"));

// Start the MQTT Streamer.
streamer.start();

Refer to the Javadocs of the ignite-mqtt module for more info on the available options.

MQTT Streamer