Creating Your Custom Data Connector

create connector

Learn how to create a Braineous Custom Data Connector

This guide covers:

  • Implement "com.appgallabs.dataplatform.targetSystem.framework.StoreDriver"

  • Develop the Java standalone Ingestion Client Application

  • Register a data pipe and send source data to multiple target MongoDB databases

  • Verify all target databases receive the data

1. Prerequisites

To complete this guide, you need:

  • Roughly 15 minutes

  • An IDE

  • JDK 11+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.9.5

Verify Maven is using the Java you expect

If you have multiple JDK’s installed, it is not certain Maven will pick up the expected java and you could end up with unexpected results. You can verify which JDK Maven uses by running mvn --version.

Download the Create a Custom Data Connector Tutorial zip archive

2. Implement "StoreDriver" interface

StoreDriver interface

package com.appgallabs.dataplatform.targetSystem.framework;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;

public interface StoreDriver {

    /**
     * This used to configure your Store Driver.
     *
     * Configuration is specified as a json object.
     *
     * Here is a sample configuration
     *
     *     {
     *       "storeDriver" : "com.appgallabs.dataplatform.targetSystem.core.driver.MySqlStoreDriver",
     *       "name": "scenario1_store_mysql",
     *       "config": {
     *         "connectionString": "jdbc:mysql://localhost:3306/braineous_staging_database",
     *         "username": "root",
     *         "password": ""
     *       },
     *       "jsonpathExpression": "jsonpath:1"
     *     }
     *
     * @param configJson
     */
    public void configure(JsonObject configJson);

    /**
     * Implementation logic for storing the dataset processed by the
     * ingestion engine sent as an array of JsonObjects
     *
     * @param dataSet
     */
    public void storeData(JsonArray dataSet);
}

In the tutorial, this implemenation is located at server/src/main/java/com/appgallabs/dataplatform/tutorial/MySqlStoreDriver.java

2.1. "configure" method implementation

@Override
    public void configure(JsonObject configJson) {
        try {
            this.configJson = configJson;

            Statement createTableStatement = null;
            try {
                String url = configJson.get("connectionString").getAsString();
                String username = configJson.get("username").getAsString();

                String password = configJson.get("password").getAsString();

                this.connection = DriverManager.getConnection(
                        url, username, password);
                System.out.println(
                        "Connection Established successfully");

                //create schema and tables
                String createTableSql = "CREATE TABLE IF NOT EXISTS staged_data (\n" +
                        "    id int NOT NULL AUTO_INCREMENT,\n" +
                        "    data varchar(255) NOT NULL,\n" +
                        "    PRIMARY KEY (id)\n" +
                        ")";
                createTableStatement = this.connection.createStatement();
                createTableStatement.executeUpdate(createTableSql);

                System.out.println("Created table in given database...");

            } finally {
                createTableStatement.close();

                System.out.println("****MYSQL_CONNECTOR_SUCCESSFULLY_REGISTERED*****");
                System.out.println("****BRING_THE_HEAT (lol)*****");
            }
        }catch(Exception e){
            logger.error(e.getMessage());
            //TODO: (CR2) report to the pipeline monitoring service
        }
    }

2.2. "storeData" method implementation

@Override
    public void storeData(JsonArray dataSet) {
        try {
            try {
                String query = "select * from staged_data";

                //populate table
                int size = dataSet.size();
                for (int i = 0; i < size; i++) {
                    JsonElement record = dataSet.get(i);
                    String insertSql = "insert into staged_data (data) values ('" + record.toString() + "')";
                    Statement insertStatement = this.connection.createStatement();
                    insertStatement.executeUpdate(insertSql);
                    insertStatement.close();
                }

                Statement queryStatement = this.connection.createStatement();
                ResultSet rs = queryStatement.executeQuery(query);
                while (rs.next()) {
                    String id = rs.getString("id");
                    String data = rs.getString("data");
                    System.out.println(id);
                    System.out.println(data);
                    System.out.println("***************");
                }
                queryStatement.close();

                System.out.println("Connection Closed....");
            } finally {
                this.connection.close();
            }
        }catch(Exception e){
            logger.error(e.getMessage());
            //TODO: (CR2) report to the pipeline monitoring service
        }
    }

2.3. "Full Implementation"

package com.appgallabs.dataplatform.tutorial;

import com.appgallabs.dataplatform.targetSystem.framework.StoreDriver;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.sql.ResultSet;

public class MySqlStoreDriver implements StoreDriver {
    private static Logger logger = LoggerFactory.getLogger(MySqlStoreDriver.class);

    private Connection connection;
    private JsonObject configJson;

    @Override
    public void configure(JsonObject configJson) {
        try {
            this.configJson = configJson;

            Statement createTableStatement = null;
            try {
                String url = configJson.get("connectionString").getAsString();
                String username = configJson.get("username").getAsString();

                String password = configJson.get("password").getAsString();

                this.connection = DriverManager.getConnection(
                        url, username, password);
                System.out.println(
                        "Connection Established successfully");

                //create schema and tables
                String createTableSql = "CREATE TABLE IF NOT EXISTS staged_data (\n" +
                        "    id int NOT NULL AUTO_INCREMENT,\n" +
                        "    data varchar(255) NOT NULL,\n" +
                        "    PRIMARY KEY (id)\n" +
                        ")";
                createTableStatement = this.connection.createStatement();
                createTableStatement.executeUpdate(createTableSql);

                System.out.println("Created table in given database...");

            } finally {
                createTableStatement.close();

                System.out.println("****MYSQL_CONNECTOR_SUCCESSFULLY_REGISTERED*****");
                System.out.println("****BRING_THE_HEAT (lol)*****");
            }
        }catch(Exception e){
            logger.error(e.getMessage());
            //TODO: (CR2) report to the pipeline monitoring service
        }
    }

    @Override
    public void storeData(JsonArray dataSet) {
        try {
            try {
                String query = "select * from staged_data";

                //populate table
                int size = dataSet.size();
                for (int i = 0; i < size; i++) {
                    JsonElement record = dataSet.get(i);
                    String insertSql = "insert into staged_data (data) values ('" + record.toString() + "')";
                    Statement insertStatement = this.connection.createStatement();
                    insertStatement.executeUpdate(insertSql);
                    insertStatement.close();
                }

                Statement queryStatement = this.connection.createStatement();
                ResultSet rs = queryStatement.executeQuery(query);
                while (rs.next()) {
                    String id = rs.getString("id");
                    String data = rs.getString("data");
                    System.out.println(id);
                    System.out.println(data);
                    System.out.println("***************");
                }
                queryStatement.close();

                System.out.println("Connection Closed....");
            } finally {
                this.connection.close();
            }
        }catch(Exception e){
            logger.error(e.getMessage());
            //TODO: (CR2) report to the pipeline monitoring service
        }
    }
}

3. Develop the Java standalone Ingestion Client Application

Let’s start with a simple Json array to be used as datasource to be ingested by the Braineous Data Ingestion Engine

3.1. Java Code: Setup Dataset to be ingested

String user1 = UUID.randomUUID().toString();
        String user2 = UUID.randomUUID().toString();
        //setup source data for ingestion
        String sourceData = "[\n" +
                "  {\n" +
                "    \"id\" : 1,\n" +
                "    \"name\": \""+user1+"\",\n" +
                "    \"age\": 50,\n" +
                "    \"addr\": {\n" +
                "      \"email\": \"joe_1@email.com\",\n" +
                "      \"phone\": \"123456\"\n" +
                "    }\n" +
                "  },\n" +
                "  {\n" +
                "    \"id\": \"2\",\n" +
                "    \"name\": \""+user2+"\",\n" +
                "    \"age\": 51,\n" +
                "    \"addr\": {\n" +
                "      \"email\": \"joe_2@email.com\",\n" +
                "      \"phone\": \"1234567\"\n" +
                "    }\n" +
                "  }\n" +
                "]";

4. Register a data pipe and send source data to multiple target MongoDB databases

Register a data pipe with the Braineous Data Ingestion Engine using the Java Braineous Data Ingestion Client SDK.

4.1. Pipe Configuration

{
  "pipeId": "mysql_mongodb_fan_out_to_target",
  "configuration": [
    {
      "storeDriver": "com.appgallabs.dataplatform.targetSystem.core.driver.MongoDBStoreDriver",
      "name": "scenario1_store_mongodb",
      "config": {
        "connectionString": "mongodb://localhost:27017",
        "database": "scenario1_store",
        "collection": "data"
      },
      "jsonpathExpression": "jsonpath:1"
    },
    {
      "storeDriver": "com.appgallabs.dataplatform.tutorial.MySqlStoreDriver",
      "name": "scenario1_store_mysql",
      "config": {
        "connectionString": "jdbc:mysql://localhost:3306/braineous_staging_database",
        "username": "root",
        "password": ""
      },
      "jsonpathExpression": "jsonpath:1"
    }
  ]
}
  • pipeId : As a data source provider, this is id identifies this data pipe uniquely with the Braineous Data Pipline Engine.

  • configuration.storeDriver: MongoDB target store driver

  • name: a user-friendly way to indentify the target store

  • config.connectionString: MySql database connection string

  • config.username: Database User

  • config.password: Database Password

A data pipe can be configured with multiple target stores/systems associated with the same data pipe for data delivery.

In the next Candidate Release, Braineous team will add support for more target stores and systems such as :

  • Postgresql

  • Mysql

  • Oracle

  • Snowflake

  • Microservices

  • Airbyte Catalog

4.2. Java Code - Register Pipe

//setup data pipe configuration json
        String dataPipeConfiguration = "{\n" +
                "  \"pipeId\": \""+pipeId+"\",\n" +
                "  \"configuration\": [\n" +
                "    {\n" +
                "      \"storeDriver\" : \"com.appgallabs.dataplatform.targetSystem.core.driver.MongoDBStoreDriver\",\n" +
                "      \"name\": \"scenario1_store_mongodb\",\n" +
                "      \"config\": {\n" +
                "        \"connectionString\": \"mongodb://localhost:27017\",\n" +
                "        \"database\": \"scenario1_store\",\n" +
                "        \"collection\": \"data\"\n" +
                "      },\n" +
                "      \"jsonpathExpression\": \"jsonpath:1\"\n" +
                "    },\n" +
                "    {\n" +
                "      \"storeDriver\" : \"com.appgallabs.dataplatform.tutorial.MySqlStoreDriver\",\n" +
                "      \"name\": \"scenario1_store_mysql\",\n" +
                "      \"config\": {\n" +
                "        \"connectionString\": \"jdbc:mysql://localhost:3306/braineous_staging_database\",\n" +
                "        \"username\": \"root\",\n" +
                "        \"password\": \"\"\n" +
                "      },\n" +
                "      \"jsonpathExpression\": \"jsonpath:1\"\n" +
                "    }\n" +
                "  ]\n" +
                "}";

4.3. Java Code - Send Data for ingestion

//send source data through the pipeline
String pipeId = configJson.get("pipeId").getAsString();
String entity = "books";
DataPipeline.sendData(pipeId, entity, sourceData);

4.4. "Full Implementation"

package com.appgallabs.dataplatform.tutorial;

import com.appgallabs.dataplatform.client.sdk.api.Configuration;
import com.appgallabs.dataplatform.client.sdk.api.DataPipeline;
import com.appgallabs.dataplatform.client.sdk.api.RegisterPipeException;
import com.appgallabs.dataplatform.util.JsonUtil;

import com.google.gson.JsonObject;

import java.util.UUID;

public class DataIngestionTutorial {

    public static void main(String[] args) throws RegisterPipeException, InterruptedException {
        //configure the DataPipeline Client
        Configuration configuration = new Configuration().
                streamSizeInBytes(80).
                ingestionHostUrl("http://localhost:8080");
        DataPipeline.configure(configuration);

        String pipeId = "mysql_mongodb_fan_out_to_target";

        //setup data pipe configuration json
        String dataPipeConfiguration = "{\n" +
                "  \"pipeId\": \""+pipeId+"\",\n" +
                "  \"configuration\": [\n" +
                "    {\n" +
                "      \"storeDriver\" : \"com.appgallabs.dataplatform.targetSystem.core.driver.MongoDBStoreDriver\",\n" +
                "      \"name\": \"scenario1_store_mongodb\",\n" +
                "      \"config\": {\n" +
                "        \"connectionString\": \"mongodb://localhost:27017\",\n" +
                "        \"database\": \"scenario1_store\",\n" +
                "        \"collection\": \"data\"\n" +
                "      },\n" +
                "      \"jsonpathExpression\": \"jsonpath:1\"\n" +
                "    },\n" +
                "    {\n" +
                "      \"storeDriver\" : \"com.appgallabs.dataplatform.tutorial.MySqlStoreDriver\",\n" +
                "      \"name\": \"scenario1_store_mysql\",\n" +
                "      \"config\": {\n" +
                "        \"connectionString\": \"jdbc:mysql://localhost:3306/braineous_staging_database\",\n" +
                "        \"username\": \"root\",\n" +
                "        \"password\": \"\"\n" +
                "      },\n" +
                "      \"jsonpathExpression\": \"jsonpath:1\"\n" +
                "    }\n" +
                "  ]\n" +
                "}";
        JsonObject configJson = JsonUtil.validateJson(dataPipeConfiguration).getAsJsonObject();
        JsonUtil.printStdOut(configJson);


        String user1 = UUID.randomUUID().toString();
        String user2 = UUID.randomUUID().toString();
        //setup source data for ingestion
        String sourceData = "[\n" +
                "  {\n" +
                "    \"id\" : 1,\n" +
                "    \"name\": \""+user1+"\",\n" +
                "    \"age\": 50,\n" +
                "    \"addr\": {\n" +
                "      \"email\": \"joe_1@email.com\",\n" +
                "      \"phone\": \"123456\"\n" +
                "    }\n" +
                "  },\n" +
                "  {\n" +
                "    \"id\": \"2\",\n" +
                "    \"name\": \""+user2+"\",\n" +
                "    \"age\": 51,\n" +
                "    \"addr\": {\n" +
                "      \"email\": \"joe_2@email.com\",\n" +
                "      \"phone\": \"1234567\"\n" +
                "    }\n" +
                "  }\n" +
                "]";
        JsonUtil.printStdOut(JsonUtil.validateJson(sourceData));

        //register pipe
        JsonObject response = DataPipeline.registerPipe(dataPipeConfiguration);
        JsonUtil.printStdOut(response);

        //send source data through the pipeline
        pipeId = configJson.get("pipeId").getAsString();
        String entity = "books";
        DataPipeline.sendData(pipeId, entity, sourceData);
    }
}

4.5. Run the Application

4.5.1. Start the Braineous Data Ingestion Server

cd server

./run.sh

4.5.2. Run the Data Ingestion Client Application

cd client

./run.sh

5. Verify all target collections receive the data

5.1. Verify MySQL Target Store

verify mysql

5.2. Verify MongoDB Target Store

To verify the success of the ingestion and delivery to the configured target databases, use the following MongoDB commands.

Expected Result: You should see two records added to a collection called "data" in a database called "get_started_store"

mongosh

mongosh
use get_started_store
show collections
db.data.find()
db.data.count()