Creating Your Custom Data Integration Agent

create connector

Learn how to create a Braineous Custom Data Integration Agent

This guide covers:

  • Implement com.appgallabs.dataplatform.targetSystem.framework.staging.IntegrationRunner

  • Develop the Java standalone Ingestion Client Application

  • Register a data pipe and send source data to a MySQL database

  • Verify all target databases receive the staged data and processes the staged data and creates a live data set

1. Prerequisites

To complete this guide, you need:

  • Roughly 15 minutes

  • An IDE

  • JDK 11+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.9.5

Verify Maven is using the Java you expect

If you have multiple JDK’s installed, it is not certain Maven will pick up the expected java and you could end up with unexpected results. You can verify which JDK Maven uses by running mvn --version.

Download the Braineous-1.0.0-CR3 zip archive.

This tutorial is located under: braineous-1.0.0-cr3/tutorials/create-integration-runner

2. Implement IntegrationRunner interface

IntegrationRunner interface

package com.appgallabs.dataplatform.tutorial;

import com.appgallabs.dataplatform.infrastructure.Tenant;
import com.appgallabs.dataplatform.targetSystem.framework.staging.IntegrationRunner;
import com.appgallabs.dataplatform.targetSystem.framework.staging.Record;
import com.appgallabs.dataplatform.util.JsonUtil;
import com.google.gson.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.List;

public class CustomIntegrationRunner implements IntegrationRunner {
    private static Logger logger = LoggerFactory.getLogger(CustomIntegrationRunner.class);

    private Connection connection;
    private String liveTable = "liveTable";

    @Override
    public void preProcess(Tenant tenant, String pipeId, String entity) {
        try {
            Statement createTableStatement = null;
            try {
                String url = "jdbc:mysql://localhost:3306/braineous_staging_database";
                String username = "root";
                String password = "";

                this.connection = DriverManager.getConnection(
                        url, username, password);

                //create schema and tables
                String createTableSql = "CREATE TABLE IF NOT EXISTS "+this.liveTable+" (\n" +
                        "    id int NOT NULL AUTO_INCREMENT,\n" +
                        "    name longtext NOT NULL,\n" +
                        "    age longtext NOT NULL,\n" +
                        "    PRIMARY KEY (id)\n" +
                        ")";
                createTableStatement = this.connection.createStatement();
                createTableStatement.executeUpdate(createTableSql);

            } finally {
                createTableStatement.close();
            }
        }catch(Exception e){
            logger.error(e.getMessage());
        }
    }

    @Override
    public void process(Tenant tenant, String pipeId, String entity, List<Record> records) {
        try {
            Statement insertStatement = this.connection.createStatement();
            try {
                //populate table
                for (Record record: records) {
                    JsonObject recordJson = record.toJson();
                    JsonUtil.printStdOut(recordJson);

                    String name = recordJson.get("data").getAsJsonObject().get("name").getAsString();
                    String age = recordJson.get("data").getAsJsonObject().get("age").getAsString();;

                    String insertSql = "insert into "+this.liveTable+" (name,age) " +
                            "values " +
                            "('"+name+"'," +
                            "'"+age+"'" +
                            ")";

                    insertStatement.addBatch(insertSql);
                }

                insertStatement.executeBatch();

            } finally {
                insertStatement.close();
            }
        }catch(Exception e){
            logger.error(e.getMessage());
        }
    }

    @Override
    public void postProcess(Tenant tenant, String pipeId, String entity) {
        try {
            this.connection.close();
        }catch(Exception e){
            logger.error(e.getMessage());
        }
    }
}

In the tutorial, this implementation is located at server/src/main/java/com/appgallabs/dataplatform/tutorial/CustomIntegrationRunner.java

2.1. "preProcess" callback implementation

@Override
    public void preProcess(Tenant tenant, String pipeId, String entity) {
        try {
            Statement createTableStatement = null;
            try {
                String url = "jdbc:mysql://localhost:3306/braineous_staging_database";
                String username = "root";
                String password = "";

                this.connection = DriverManager.getConnection(
                        url, username, password);

                //create schema and tables
                String createTableSql = "CREATE TABLE IF NOT EXISTS "+this.liveTable+" (\n" +
                        "    id int NOT NULL AUTO_INCREMENT,\n" +
                        "    name longtext NOT NULL,\n" +
                        "    age longtext NOT NULL,\n" +
                        "    PRIMARY KEY (id)\n" +
                        ")";
                createTableStatement = this.connection.createStatement();
                createTableStatement.executeUpdate(createTableSql);

            } finally {
                createTableStatement.close();
            }
        }catch(Exception e){
            logger.error(e.getMessage());
        }
    }

2.2. "process" callback implementation

@Override
    public void process(Tenant tenant, String pipeId, String entity, List<Record> records) {
        try {
            Statement insertStatement = this.connection.createStatement();
            try {
                //populate table
                for (Record record: records) {
                    JsonObject recordJson = record.toJson();
                    JsonUtil.printStdOut(recordJson);

                    String name = recordJson.get("data").getAsJsonObject().get("name").getAsString();
                    String age = recordJson.get("data").getAsJsonObject().get("age").getAsString();;

                    String insertSql = "insert into "+this.liveTable+" (name,age) " +
                            "values " +
                            "('"+name+"'," +
                            "'"+age+"'" +
                            ")";

                    insertStatement.addBatch(insertSql);
                }

                insertStatement.executeBatch();

            } finally {
                insertStatement.close();
            }
        }catch(Exception e){
            logger.error(e.getMessage());
        }
    }

2.3. "postProcess" callback implementation

@Override
    public void postProcess(Tenant tenant, String pipeId, String entity) {
        try {
            this.connection.close();
        }catch(Exception e){
            logger.error(e.getMessage());
        }
    }

2.4. Full Implementation

package com.appgallabs.dataplatform.tutorial;

import com.appgallabs.dataplatform.infrastructure.Tenant;
import com.appgallabs.dataplatform.targetSystem.framework.staging.IntegrationRunner;
import com.appgallabs.dataplatform.targetSystem.framework.staging.Record;
import com.appgallabs.dataplatform.util.JsonUtil;
import com.google.gson.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.List;

public class CustomIntegrationRunner implements IntegrationRunner {
    private static Logger logger = LoggerFactory.getLogger(CustomIntegrationRunner.class);

    private Connection connection;
    private String liveTable = "liveTable";

    @Override
    public void preProcess(Tenant tenant, String pipeId, String entity) {
        try {
            Statement createTableStatement = null;
            try {
                String url = "jdbc:mysql://localhost:3306/braineous_staging_database";
                String username = "root";
                String password = "";

                this.connection = DriverManager.getConnection(
                        url, username, password);

                //create schema and tables
                String createTableSql = "CREATE TABLE IF NOT EXISTS "+this.liveTable+" (\n" +
                        "    id int NOT NULL AUTO_INCREMENT,\n" +
                        "    name longtext NOT NULL,\n" +
                        "    age longtext NOT NULL,\n" +
                        "    PRIMARY KEY (id)\n" +
                        ")";
                createTableStatement = this.connection.createStatement();
                createTableStatement.executeUpdate(createTableSql);

            } finally {
                createTableStatement.close();
            }
        }catch(Exception e){
            logger.error(e.getMessage());
        }
    }

    @Override
    public void process(Tenant tenant, String pipeId, String entity, List<Record> records) {
        try {
            Statement insertStatement = this.connection.createStatement();
            try {
                //populate table
                for (Record record: records) {
                    JsonObject recordJson = record.toJson();
                    JsonUtil.printStdOut(recordJson);

                    String name = recordJson.get("data").getAsJsonObject().get("name").getAsString();
                    String age = recordJson.get("data").getAsJsonObject().get("age").getAsString();;

                    String insertSql = "insert into "+this.liveTable+" (name,age) " +
                            "values " +
                            "('"+name+"'," +
                            "'"+age+"'" +
                            ")";

                    insertStatement.addBatch(insertSql);
                }

                insertStatement.executeBatch();

            } finally {
                insertStatement.close();
            }
        }catch(Exception e){
            logger.error(e.getMessage());
        }
    }

    @Override
    public void postProcess(Tenant tenant, String pipeId, String entity) {
        try {
            this.connection.close();
        }catch(Exception e){
            logger.error(e.getMessage());
        }
    }
}

3. Develop the Java standalone Ingestion Client Application

Let’s start with a simple Json array to be used as datasource to be ingested by the Braineous Data Ingestion Engine

4. Initialize

Get an instance of the DataPlatformService. Setup your API_KEY and API_SECRET. Please refer to 'Step 9' of the Getting Started guide. https://bugsbunnyshah.github.io/braineous/get-started/

DataPlatformService dataPlatformService = DataPlatformService.getInstance();

String apiKey = "ffb2969c-5182-454f-9a0b-f3f2fb0ebf75";
String apiSecret = "5960253b-6645-41bf-b520-eede5754196e";

5. Get the Source Data

Let’s start with a simple Json array to be used as datasource to be ingested by the Braineous Data Ingestion Engine

Source Data

[
  {
    "id" : 1,
    "name": "name_1",
    "age": 46,
    "addr": {
      "email": "name_1@email.com",
      "phone": "123"
    }
  },
  {
    "id": "2",
    "name": "name_2",
    "age": 55,
    "addr": {
      "email": "name_2@email.com",
      "phone": "1234"
    }
  }
]

Java Code

String datasetLocation = "dataset/data.json";
String json = Util.loadResource(datasetLocation);

A dataset can be loaded from any data source such as a database, legacy production data store, live data feed, third-party data source, Kafka stream, etc. In this example the dataset is loaded from a classpath resource located at src/main/resources/dataset/data.json

6. Register a data pipe and send source data to your mysql staging store

Register a data pipe with the Braineous Data Ingestion Engine using the Java Braineous Data Ingestion Client SDK.

Pipe Configuration

{
  "pipeId": "yyya",
  "entity": "abc",
  "configuration": [
    {
      "stagingStore" : "com.appgallabs.dataplatform.targetSystem.core.driver.MySqlStagingStore",
      "integrationAgent": "com.appgallabs.dataplatform.tutorial.CustomIntegrationRunner",
      "name": "yyya",
      "config": {
        "connectionString": "jdbc:mysql://localhost:3306/braineous_staging_database",
        "username": "root",
        "password":  "",
        "staging_table": "staging_store",
        "jsonpathExpressions": []
      }
    }
  ]
}
  • pipeId : As a data source provider, this id identifies this data pipe uniquely with the Braineous Data Pipline Engine.

  • entity : The business/domain entity that this dataset should be associated with.

  • configuration.stagingStore: The Staging Store driver

  • configuration.integrationAgent: The Data Integration Agent that can be provided to process staged data

  • configuration.name: a user-friendly way to indentify the target store

  • configuration.config.connectionString: MySQL database connection string for your target store

  • configuration.config.username: MySQL user

  • configuration.config.password: MySQL’s user’s password

  • configuration.config.staging_table: MySql staging table

  • configuration.config.jsonpathExpressions: Data Transformation based on JSONPath specification: https://www.ietf.org/archive/id/draft-goessner-dispatch-jsonpath-00.html

A data pipe can be configured with multiple target stores/systems associated with the same data pipe for data delivery. In this tutorial ingested data will be delivered to your mysql staging store and the integration agent will make it live data.

Braineous is built for no interruption and scale. You can associate thousands of target stores and evolve the delivery network over time by updating your pipeId configuration and deploy without Braineous restart.

The current Release, supports the following target stores

  • Snowflake

  • MySQL

  • ElasticSearch

  • MongoDB

  • ClickHouse

In the future releases, Braineous team will add support for more target stores and systems such as :

  • Postgresql

  • Oracle

  • Amazon RedShift

Java Code - Register Pipe

String configLocation = "pipe_config/pipe_config.json";
String pipeConfigJson = Util.loadResource(configLocation);
JsonObject configJson = JsonUtil.validateJson(pipeConfigJson).getAsJsonObject();
String pipeId = configJson.get("pipeId").getAsString();
String entity = configJson.get("entity").getAsString();
System.out.println("*****PIPE_CONFIGURATION******");
JsonUtil.printStdOut(configJson);

//configure the DataPipeline Client
Configuration configuration = new Configuration().
ingestionHostUrl("http://localhost:8080/").
apiKey(apiKey).
apiSecret(apiSecret).
streamSizeInObjects(0);
dataPlatformService.configure(configuration);

//register pipe
dataPlatformService.registerPipe(configJson);
System.out.println("*****PIPE_REGISTRATION_SUCCESS******");

Pipe Configuration can be provided dynamically at runtime. The source can be a database, a configuration system, local file system, network file system etc. In this example the dataset is loaded from a classpath resource located at src/main/resources/pipe_config/pipe_config.json

Java Code - Send Data for ingestion

//send source data through the pipeline
dataPlatformService.sendData(pipeId, entity,datasetElement.toString());
System.out.println("*****DATA_INGESTION_SUCCESS******");

7. Run the Application

cd braineous-1.0.0-cr3/tutorials/create-integration-runner

7.1. Start the Braineous Data Ingestion Server

cd server

./run.sh

7.2. Run the Data Ingestion Client Application

cd client

./run.sh

Expected Client Output:

*****DATA_SET******
******ARRAY_SIZE: 2**********
[
  {
    "id": 1,
    "name": "name_1",
    "age": 46,
    "addr": {
      "email": "name_1@email.com",
      "phone": "123"
    }
  },
  {
    "id": "2",
    "name": "name_2",
    "age": 55,
    "addr": {
      "email": "name_2@email.com",
      "phone": "1234"
    }
  }
]
**********************
*****PIPE_CONFIGURATION******
{
  "pipeId": "yyya",
  "entity": "abc",
  "configuration": [
    {
      "stagingStore": "com.appgallabs.dataplatform.targetSystem.core.driver.MySqlStagingStore",
      "integrationAgent": "com.appgallabs.dataplatform.tutorial.CustomIntegrationRunner",
      "name": "yyya",
      "config": {
        "connectionString": "jdbc:mysql://localhost:3306/braineous_staging_database",
        "username": "root",
        "password": "",
        "staging_table": "staging_store",
        "jsonpathExpressions": []
      }
    }
  ]
}
**********************
*****PIPE_REGISTRATION_SUCCESS******
***SENDING_DATA_START*****
******PAYLOAD**********
[{"id":1,"name":"name_1","age":46,"addr":{"email":"name_1@email.com","phone":"123"}},{"id":"2","name":"name_2","age":55,"addr":{"email":"name_2@email.com","phone":"1234"}}]
***********************
*****DATA_INGESTION_SUCCESS******

Expected Server Output:

MYSQL: DATA_STORED_SUCCESSFULLY
{
  "recordMetaData": {
    "metadata": {
      "objectHash": "55F1BCC5BAF4BFD841E8E345D35F737B",
      "timestamp": 1722187621,
      "tenant": "ffb2969c-5182-454f-9a0b-f3f2fb0ebf75",
      "pipeId": "yyya",
      "entity": "abc",
      "kafka_offset": 7
    }
  },
  "data": {
    "id": 1,
    "name": "name_1",
    "age": 46,
    "addr": {
      "email": "name_1@email.com",
      "phone": "123"
    }
  }
}
**********************
{
  "recordMetaData": {
    "metadata": {
      "objectHash": "8F3B1B7001BFE2AD0AA72D975BDE78ED",
      "timestamp": 1722187621,
      "tenant": "ffb2969c-5182-454f-9a0b-f3f2fb0ebf75",
      "pipeId": "yyya",
      "entity": "abc",
      "kafka_offset": 7
    }
  },
  "data": {
    "id": "2",
    "name": "name_2",
    "age": 55,
    "addr": {
      "email": "name_2@email.com",
      "phone": "1234"
    }
  }
}
**********************

7.3. Verify MySQL Target Staging Store

staging store

7.4. Verify MySQL Live Data

live data