Thursday, March 31, 2016

Save MapR Streams messages into MapR DB JSON

In this article you will learn how to create a MapR Streams Consumer that saves all the messages into a MapR-DB JSON Table.

Install and Run the sample MapR Streams application

The steps to install and run the applications are the same as the one defined in the following article:

Once you have the default producer and consumer running in your environment using the commands:

Producer:

$ java -cp $(mapr classpath):./mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run producer

Consumer:

$ java -cp $(mapr classpath):./mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run consumer

Save messages into MapR-DB JSON

The DBConsumer class is a copy of the Consumer class with small changes to save the messages coming from the /sample-stream:fast-messages topic into a MapR-DB table named /apps/fast-messages.

1- Add MapR-DB Maven dependency to your project

Edit the pom.xml file and add the following entry in the dependencies tag:

   <dependency>
      <groupId>com.mapr.db</groupId>
      <artifactId>maprdb</artifactId>
      <version>5.1.0-mapr</version>
   </dependency>

This add support for:

2- Create and Get a JSON Table

To save the messages, the application must access a JSON Table, for this just call the MapRDB.getTable(TABLE_PATH) method. If the table does not exist, create it with the MapRDB.createTable(TABLE_PATH).

This is what the DBConsumer.getTable(TABLE_PATH) method is doing.

  private static Table getTable(String tablePath) {
    if ( ! MapRDB.tableExists(tablePath)) {
      return MapRDB.createTable(tablePath);
    } else {
      return MapRDB.getTable(tablePath);
    }
  }

When the DBConsumer starts the getTable("/apps/fast-messages") method is called.

  Table fastMessagesTable = getTable("/apps/fast-messages");

The table fastMessagesTable is not available to the consumer.

3- Save messages into the JSON Table

Messages can be saved into the table using the MapR-DB JSON Java API.

The producer sends the message as JSON String that is converted into a JSON object names msg. This object can be used to create an OJAI Document:

  Document messageDocument = MapRDB.newDocument(msg);

To be saved into MapR-DB, a document must have a _id field. In this example let’s use the message number generated by the producer (JSON field k).

  messageDocument.setId( Integer.toString(messageDocument.getInt("k")));

Let’s now save the document into the table:

  fastMessagesTable.insertOrReplace( messageDocument );       

Each time the producer will be executed, the message number counter will be initialized to 0. So the document _id will be the same, and the document into the table must be replaced; this is why the insertOrReplace method is used.

Let’s run the new consumer.

4- Run the DBConsumer

To run the DBConsumer just pass the parameter dbconsumer as follow:

Consumer:

$ java -cp $(mapr classpath):./mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run dbconsumer

Note that a new group is created to be sure that messages are read by the two different consumers (Consumer and DBConsumer).

5- Query the messages saved into MapR-DB

Messages are saved into the /apps/fast-messages table, let’s used the MapR DBShell to query the data. On your cluster run the following commands, as mapr:

$ mapr dbshell
maprdb mapr:> find /apps/fast-messages --id 100
{"_id":"100","type":"test","t":64986.787,"k":{"$numberLong":100}}

Conclusion

In this very simple example, the DBConsumer takes each message and saved it as a simple JSON Document into MapR-DB JSON. The table can be used to create any type of application, or using Apache Drill (1.6 or later) to do some analytics.

In a real application the messages will probably be modified, enriched and/or aggregated and then the result be saved into MapR-DB Table. The goal of this sample is just to show that it is easy to integrate MapR Streams and MapR-DB.

You have also other alternative to achieve the same thing using for example:

  • Spark Streaming
  • 3rd Party ETL and Tools

No comments: