Apache Ignite Integrations

The Apache Ignite Integrations Developer Hub

Welcome to the Apache Ignite Integrations developer hub. You'll find comprehensive guides and documentation to help you start working with Apache Ignite Integrations as quickly as possible, as well as support if you get stuck. Let's jump right in!

Get Started    

Camel Streamer

Overview

This documentation page focuses on the Apache Camel Streamer, which can also be thought of as a Universal Streamer because it allows you to consume from any technology or protocol supported by Camel into an Ignite Cache.

What is Apache Camel?

If you don't know what Apache Camel is, check out the section at the bottom of the page for a quick introduction.

With this streamer, you can ingest entries straight into an Ignite cache based on:

  • Calls received on a Web Service (SOAP or REST), by extracting the body or headers.
  • Listening on a TCP or UDP channel for messages.
  • The content of files received via FTP or written to the local filesystem.
  • Email messages received via POP3 or IMAP.
  • A MongoDB tailable cursor.
  • An AWS SQS queue.
  • And many others.

This streamer supports two modes of ingestion: direct ingestion and mediated ingestion.

An Ignite Camel Component

There is also a camel-ignite component, if what you are looking is to interact with Ignite Caches, Compute, Events, Messaging, etc. from within a Camel route.

Read on for more details.

Direct Ingestion

Direct Ingestion allows you to consume from any Camel endpoint straight into Ignite, with the help of a Tuple Extractor. We call this direct ingestion.

Here is a code sample:

// Start Apache Ignite.
Ignite ignite = Ignition.start();

// Create an streamer pipe which ingests into the 'mycache' cache.
IgniteDataStreamer<String, String> pipe = ignite.dataStreamer("mycache");

// Create a Camel streamer and connect it.
CamelStreamer<String, String> streamer = new CamelStreamer<>();  
streamer.setIgnite(ignite);  
streamer.setStreamer(pipe);

// This endpoint starts a Jetty server and consumes from all network interfaces on port 8080 and context path /ignite.
streamer.setEndpointUri("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST");

// This is the tuple extractor. We'll assume each message contains only one tuple.
// If your message contains multiple tuples, use a StreamMultipleTupleExtractor.
// The Tuple Extractor receives the Camel Exchange and returns a Map.Entry<?,?> with the key and value.
streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<Exchange, String, String>() {  
    @Override public Map.Entry<String, String> extract(Exchange exchange) {
        String stationId = exchange.getIn().getHeader("X-StationId", String.class);
        String temperature = exchange.getIn().getBody(String.class);
        return new GridMapEntry<>(stationId, temperature);
    }
});

// Start the streamer.
streamer.start();  

Mediated Ingestion

For more sophisticated scenarios, you can also create a Camel route that performs complex processing on incoming messages, e.g. transformations, validations, splitting, aggregating, idempotency, resequencing, enrichment, etc. and ingest only the result into the Ignite cache.

We call this mediated ingestion.

// Create a CamelContext with a custom route that will:
//  (1) consume from our Jetty endpoint.
//  (2) transform incoming JSON into a Java object with Jackson.
//  (3) uses JSR 303 Bean Validation to validate the object.
//  (4) dispatches to the direct:ignite.ingest endpoint, where the streamer is consuming from.
CamelContext context = new DefaultCamelContext();  
context.addRoutes(new RouteBuilder() {  
    @Override
    public void configure() throws Exception {
        from("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST")
            .unmarshal().json(JsonLibrary.Jackson)
            .to("bean-validator:validate")
            .to("direct:ignite.ingest");
    }
});

// Remember our Streamer is now consuming from the Direct endpoint above.
streamer.setEndpointUri("direct:ignite.ingest");

Setting a Response

By default, the response sent back to the caller (if it is a synchronous endpoint) is simply an echo of the original request. If you want to customize​ the response, set a Camel Processor as a responseProcessor:

streamer.setResponseProcessor(new Processor() {  
    @Override public void process(Exchange exchange) throws Exception {
        exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
        exchange.getOut().setBody("OK");
    }
});

Maven Dependency

To make use of the ignite-camel streamer, you need to add the following dependency:

<dependency>
	<groupId>org.apache.ignite</groupId>
	<artifactId>ignite-camel</artifactId>
	<version>${ignite.version}</version>
</dependency>

It will also drag in camel-core as a transitive dependency.

Don't forget to add the Camel component dependencies!

Make sure to also add the dependencies to the Camel components that you'll be using in the streamer.

About Apache Camel

Apache Camel is an enterprise integration framework that revolves around the idea of the well-known Enterprise Integration Patterns popularised by Gregor Hohpe and Bobby Woolf – such as channels, pipes, filters, splitters, aggregators, routers, resequences​, etc. – which you piece with one another like a Lego puzzle to create integration routes that connect systems together.

To date, there are over 200 components for Camel, many of which are adapters for different technologies like JMS, SOAP, HTTP, Files, FTP, POP3, SMTP, SSH; including cloud services like Amazon Web Services, Google Compute Engine, Salesforce; social networks like Twitter, Facebook; and even new generation databases like MongoDB, Cassandra; and data processing technologies like Hadoop (HDFS, HBase) and Spark.

Camel runs in a variety of environments, also supported by Ignite: standalone Java, OSGi, Servlet containers, Spring Boot, JEE application servers, etc. and it's fully modular, so you only deploy the components you'll actually be using and nothing else.

Check out What is Camel? for more information.

Camel Streamer