This streamer consumes from an MQTT topic and feeds key-value pairs into an IgniteDataStreamer instance, using Eclipse Paho as an MQTT client.
You must provide a stream tuple extractor (either a single-entry or multiple-entries extractor) to process the incoming message and extract the tuple to insert.
This streamer supports:
- Subscribing to a single topic or multiple topics at once.
- Specifying the subscriber's QoS for a single topic or for multiple topics.
- Setting MqttConnectOptions to enable features like last will testament, persistent sessions, etc.
- Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user does not provide one.
- (Re-)Connection retries powered by the guava-retrying library. Retry wait and retry stop policies can be configured.
- Blocking the start() method until the client is connected for the first time.
Here's a trivial code sample showing how to use this streamer:
// Start Ignite.
Ignite ignite = Ignition.start();
// Get a data streamer reference.
IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer("mycache");
// Create an MQTT data streamer
MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
streamer.setIgnite(ignite);
streamer.setStreamer(dataStreamer);
streamer.setBrokerUrl(brokerUrl);
streamer.setBlockUntilConnected(true);
// Set a single tuple extractor to extract items in the format 'key,value' where key => Int, and value => String
// (using Guava here).
streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
public Map.Entry<Integer, String> extract(MqttMessage msg) {
List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));
return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
}
});
// Consume from multiple topics at once.
streamer.setTopics(Arrays.asList("def", "ghi", "jkl", "mno"));
// Start the MQTT Streamer.
streamer.start();
Refer to the Javadocs of the ignite-mqtt module for more info on the available options.