Creating Your Custom Data Connector

create connector

Learn how to create a Braineous Custom Data Connector

This guide covers:

  • Implement com.appgallabs.dataplatform.targetSystem.framework.staging.StagingStore

  • Develop the Java standalone Ingestion Client Application

  • Register a data pipe and send source data to multiple target MongoDB databases

  • Verify all target databases receive the data

1. Prerequisites

To complete this guide, you need:

  • Roughly 15 minutes

  • An IDE

  • JDK 11+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.9.5

Verify Maven is using the Java you expect

If you have multiple JDK’s installed, it is not certain Maven will pick up the expected java and you could end up with unexpected results. You can verify which JDK Maven uses by running mvn --version.

2. Implement StagingStore interface

StagingStore interface

package com.appgallabs.dataplatform.targetSystem.framework.staging;

import com.appgallabs.dataplatform.infrastructure.Tenant;
import com.google.gson.JsonObject;

import java.io.Serializable;
import java.util.List;

public interface StagingStore extends Serializable {

    /**
     * This used to configure your Staging Store.
     *
     * Configuration is specified as a json object.
     *
     * @param configJson
     */
    public void configure(JsonObject configJson);

    public String getName();

    public JsonObject getConfiguration();

    /**
     * Implementation logic for storing the dataset processed by the
     * ingestion engine sent as an array of JsonObjects
     *
     * @param dataSet
     */
    public void storeData(Tenant tenant,
                          String pipeId,
                          String entity,
                          List<Record> dataSet
    );

    public List<Record> getData(Tenant tenant,
                             String pipeId,
                             String entity);
}

In the tutorial, this implementation is located at server/src/main/java/com/appgallabs/dataplatform/tutorial /CustomMySqlStagingStore.java

2.1. "configure" method implementation

@Override
public void configure(JsonObject configJson) {
    try {
        this.configJson = configJson;
        Statement createTableStatement = null;
        try {
            String url = configJson.get("connectionString").getAsString();
            String username = configJson.get("username").getAsString();

            String password = configJson.get("password").getAsString();

            this.connection = DriverManager.getConnection(
                    url, username, password);

            //create schema and tables
            String createTableSql = "CREATE TABLE IF NOT EXISTS staged_data (\n" +
                    "    id int NOT NULL AUTO_INCREMENT,\n" +
                    "    data longtext NOT NULL,\n" +
                    "    PRIMARY KEY (id)\n" +
                    ")";
            createTableStatement = this.connection.createStatement();
            createTableStatement.executeUpdate(createTableSql);

            //System.out.println("Created table in given database...");

        } finally {
            createTableStatement.close();
        }
    }catch(Exception e){
        logger.error(e.getMessage());

        //report to the pipeline monitoring service
        JsonObject jsonObject = new JsonObject();
        this.ingestionReportingService.reportDataError(jsonObject);
    }
}

2.2. "storeData" method implementation

@Override
public void storeData(Tenant tenant, String pipeId, String entity, List<Record> records) {
    JsonArray dataSet = new JsonArray();
    for(Record record: records){
        JsonObject data = record.getData();
        dataSet.add(data);
    }
    this.storeData(dataSet);
}

private void storeData(JsonArray dataSet) {
    try {
        Statement insertStatement = this.connection.createStatement();
        try {
            //populate table
            int size = dataSet.size();
            for (int i = 0; i < size; i++) {
                JsonElement record = dataSet.get(i);
                String insertSql = "insert into staged_data (data) values ('" + record.toString() + "')";
                insertStatement.addBatch(insertSql);
            }

            insertStatement.executeBatch();

            String query = "SELECT * FROM staged_data;";
            Statement queryStatement = this.connection.createStatement();
            ResultSet rs = queryStatement.executeQuery(query);
            while (rs.next()) {
                String id = rs.getString("id");
                String data = rs.getString("data");
                System.out.println(id);
                System.out.println(data);
                System.out.println("******CUSTOM_MYSQL_DATA_STORE_ROW*********");
            }
            queryStatement.close();
            System.out.println("Connection Closed....");

        } finally {
            insertStatement.close();
            this.connection.close();
            System.out.println(
                    "MYSQL: STORED_SUCCESSFULLY");
        }
    }catch(Exception e){
        e.printStackTrace();
        logger.error(e.getMessage());

        //report to the pipeline monitoring service
        JsonObject jsonObject = new JsonObject();
        this.ingestionReportingService.reportDataError(jsonObject);
    }
}

2.3. Full Implementation

package com.appgallabs.dataplatform.tutorial;

import com.appgallabs.dataplatform.infrastructure.Tenant;
import com.appgallabs.dataplatform.reporting.IngestionReportingService;
import com.appgallabs.dataplatform.targetSystem.framework.staging.Record;

import com.appgallabs.dataplatform.targetSystem.framework.staging.StagingStore;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.sql.ResultSet;

import java.util.ArrayList;
import java.util.List;

public class CustomMySqlStagingStore implements StagingStore {
    private static Logger logger = LoggerFactory.getLogger(CustomMySqlStagingStore.class);

    private Connection connection;
    private JsonObject configJson;

    //TODO: (NOW) - thread it in
    private IngestionReportingService ingestionReportingService;

    @Override
    public void configure(JsonObject configJson) {
        try {
            this.configJson = configJson;
            Statement createTableStatement = null;
            try {
                String url = configJson.get("connectionString").getAsString();
                String username = configJson.get("username").getAsString();

                String password = configJson.get("password").getAsString();

                this.connection = DriverManager.getConnection(
                        url, username, password);

                //create schema and tables
                String createTableSql = "CREATE TABLE IF NOT EXISTS staged_data (\n" +
                        "    id int NOT NULL AUTO_INCREMENT,\n" +
                        "    data longtext NOT NULL,\n" +
                        "    PRIMARY KEY (id)\n" +
                        ")";
                createTableStatement = this.connection.createStatement();
                createTableStatement.executeUpdate(createTableSql);

                //System.out.println("Created table in given database...");

            } finally {
                createTableStatement.close();
            }
        }catch(Exception e){
            logger.error(e.getMessage());

            //report to the pipeline monitoring service
            JsonObject jsonObject = new JsonObject();
            this.ingestionReportingService.reportDataError(jsonObject);
        }
    }

    @Override
    public String getName() {
        return this.configJson.get("connectionString").getAsString();
    }

    @Override
    public JsonObject getConfiguration() {
        return this.configJson;
    }

    @Override
    public void storeData(Tenant tenant, String pipeId, String entity, List<Record> records) {
        JsonArray dataSet = new JsonArray();
        for(Record record: records){
            JsonObject data = record.getData();
            dataSet.add(data);
        }
        this.storeData(dataSet);
    }

    @Override
    public List<Record> getData(Tenant tenant, String pipeId, String entity) {
        return null;
    }

    //----------------------------------------------------------------------------------------------
    private void storeData(JsonArray dataSet) {
        try {
            Statement insertStatement = this.connection.createStatement();
            try {
                //populate table
                int size = dataSet.size();
                for (int i = 0; i < size; i++) {
                    JsonElement record = dataSet.get(i);
                    String insertSql = "insert into staged_data (data) values ('" + record.toString() + "')";
                    insertStatement.addBatch(insertSql);
                }

                insertStatement.executeBatch();

                String query = "SELECT * FROM staged_data;";
                Statement queryStatement = this.connection.createStatement();
                ResultSet rs = queryStatement.executeQuery(query);
                while (rs.next()) {
                    String id = rs.getString("id");
                    String data = rs.getString("data");
                    System.out.println(id);
                    System.out.println(data);
                    System.out.println("******CUSTOM_MYSQL_DATA_STORE_ROW*********");
                }
                queryStatement.close();
                System.out.println("Connection Closed....");

            } finally {
                insertStatement.close();
                this.connection.close();
                System.out.println(
                        "MYSQL: STORED_SUCCESSFULLY");
            }
        }catch(Exception e){
            e.printStackTrace();
            logger.error(e.getMessage());

            //report to the pipeline monitoring service
            JsonObject jsonObject = new JsonObject();
            this.ingestionReportingService.reportDataError(jsonObject);
        }
    }
}

3. Develop the Java standalone Ingestion Client Application

Let’s start with a simple Json array to be used as datasource to be ingested by the Braineous Data Ingestion Engine

4. Initialize

Get an instance of the DataPlatformService. Setup your API_KEY and API_SECRET

DataPlatformService dataPlatformService = DataPlatformService.getInstance();

String apiKey = "ffb2969c-5182-454f-9a0b-f3f2fb0ebf75";
String apiSecret = "5960253b-6645-41bf-b520-eede5754196e";

5. Get the Source Data

Let’s start with a simple Json array to be used as datasource to be ingested by the Braineous Data Ingestion Engine

Source Data

[
  {
    "id" : 1,
    "name": "name_1",
    "age": 46,
    "addr": {
      "email": "name_1@email.com",
      "phone": "123"
    }
  },
  {
    "id": "2",
    "name": "name_2",
    "age": 55,
    "addr": {
      "email": "name_2@email.com",
      "phone": "1234"
    }
  }
]

Java Code

String datasetLocation = "dataset/data.json";
String json = Util.loadResource(datasetLocation);

A dataset can be loaded from any data source such as a database, legacy production data store, live data feed, third-party data source, Kafka stream, etc. In this example the dataset is loaded from a classpath resource located at src/main/resources/dataset/data.json

6. Register a data pipe and send source data to your custom mysql staging store

Register a data pipe with the Braineous Data Ingestion Engine using the Java Braineous Data Ingestion Client SDK.

Pipe Configuration

{
  "pipeId": "zzza",
  "entity": "abcd",
  "configuration": [
    {
      "stagingStore" : "com.appgallabs.dataplatform.tutorial.CustomMySqlStagingStore",
      "name": "zzza",
      "config": {
        "connectionString": "jdbc:mysql://localhost:3306/braineous_staging_database",
        "username": "root",
        "password": "",
        "jsonpathExpressions": []
      }
    },
    {
      "stagingStore" : "com.appgallabs.dataplatform.targetSystem.core.driver.MongoDBStagingStore",
      "name": "zzza",
      "config": {
        "connectionString": "mongodb://localhost:27017",
        "database": "zzza",
        "collection": "data",
        "jsonpathExpressions": []
      }
    }
  ]
}
  • pipeId : As a data source provider, this id identifies this data pipe uniquely with the Braineous Data Pipline Engine.

  • entity : The business/domain entity that this dataset should be associated with.

  • configuration.stagingStore: The Staging Store driver

  • configuration.name: a user-friendly way to indentify the target store

  • configuration.config.connectionString: MongoDB database connection string for your target store

  • configuration.config.database: MongoDB database on your target store

  • configuration.config.collection: MongoDB database collection on your target store

A data pipe can be configured with multiple target stores/systems associated with the same data pipe for data delivery. In this tutorial ingested data will be delivered to your custom mysql staging store and a mongodb staging store which is part of the core Braineous target delivery engine.

Braineous is built for no interruption and scale. You can associate thousands of target stores and evolve the delivery network over time by updating your pipeId configuration and deploy without Braineous restart.

In the next Candidate Release, Braineous team will add support for more target stores and systems such as :

  • Postgresql

  • Mysql

  • Oracle

  • Snowflake

  • Microservices

  • Airbyte Catalog

Java Code - Register Pipe

String configLocation = "pipe_config/pipe_config.json";
String pipeConfigJson = Util.loadResource(configLocation);
JsonObject configJson = JsonUtil.validateJson(pipeConfigJson).getAsJsonObject();
String pipeId = configJson.get("pipeId").getAsString();
String entity = configJson.get("entity").getAsString();
System.out.println("*****PIPE_CONFIGURATION******");
JsonUtil.printStdOut(configJson);

//configure the DataPipeline Client
Configuration configuration = new Configuration().
ingestionHostUrl("http://localhost:8080/").
apiKey(apiKey).
apiSecret(apiSecret).
streamSizeInObjects(0);
dataPlatformService.configure(configuration);

//register pipe
dataPlatformService.registerPipe(configJson);
System.out.println("*****PIPE_REGISTRATION_SUCCESS******");

Pipe Configuration can be provided dynamically at runtime. The source can be a database, a configuration system, local file system, network file system etc. In this example the dataset is loaded from a classpath resource located at src/main/resources/pipe_config/pipe_config.json

Java Code - Send Data for ingestion

//send source data through the pipeline
dataPlatformService.sendData(pipeId, entity,datasetElement.toString());
System.out.println("*****DATA_INGESTION_SUCCESS******");

7. Run the Application

cd braineous-1.0.0-cr2/tutorials/create-connector

7.1. Start the Braineous Data Ingestion Server

cd server

./run.sh

7.2. Run the Data Ingestion Client Application

cd client

./run.sh

Expected Client Output:

[INFO] ------------------------------------------------------------------------
*****DATA_SET******
******ARRAY_SIZE: 2**********
[
  {
    "id": 1,
    "name": "name_1",
    "age": 46,
    "addr": {
      "email": "name_1@email.com",
      "phone": "123"
    }
  },
  {
    "id": "2",
    "name": "name_2",
    "age": 55,
    "addr": {
      "email": "name_2@email.com",
      "phone": "1234"
    }
  }
]
**********************
*****PIPE_CONFIGURATION******
{
  "pipeId": "zzza",
  "entity": "abcd",
  "configuration": [
    {
      "stagingStore": "com.appgallabs.dataplatform.targetSystem.core.driver.MongoDBStagingStore",
      "name": "zzza",
      "config": {
        "connectionString": "mongodb://localhost:27017",
        "database": "zzza",
        "collection": "data",
        "jsonpathExpressions": []
      }
    },
    {
      "stagingStore": "com.appgallabs.dataplatform.tutorial.CustomMySqlStagingStore",
      "name": "zzza",
      "config": {
        "connectionString": "jdbc:mysql://localhost:3306/braineous_staging_database",
        "username": "root",
        "password": "",
        "jsonpathExpressions": []
      }
    }
  ]
}
**********************
*****PIPE_REGISTRATION_SUCCESS******
***SENDING_DATA_START*****
*****DATA_INGESTION_SUCCESS******

Expected Server Output:

2024-03-30 14:00:42,536 INFO  [org.mon.dri.client] (pool-7-thread-4) MongoClient with metadata {"driver": {"name": "mongo-java-driver|sync", "version": "4.8.2"}, "os": {"type": "Darwin", "name": "Mac OS X", "architecture": "x86_64", "version": "12.2.1"}, "platform": "Java/Amazon.com Inc./11.0.22+7-LTS"} created with settings MongoClientSettings{readPreference=primary, writeConcern=WriteConcern{w=null, wTimeout=null ms, journal=null}, retryWrites=true, retryReads=true, readConcern=ReadConcern{level=null}, credential=null, streamFactoryFactory=null, commandListeners=[], codecRegistry=ProvidersCodecRegistry{codecProviders=[ValueCodecProvider{}, BsonValueCodecProvider{}, DBRefCodecProvider{}, DBObjectCodecProvider{}, DocumentCodecProvider{}, CollectionCodecProvider{}, IterableCodecProvider{}, MapCodecProvider{}, GeoJsonCodecProvider{}, GridFSFileCodecProvider{}, Jsr310CodecProvider{}, JsonObjectCodecProvider{}, BsonCodecProvider{}, EnumCodecProvider{}, com.mongodb.Jep395RecordCodecProvider@1adbccc9]}, clusterSettings={hosts=[localhost:27017], srvServiceName=mongodb, mode=SINGLE, requiredClusterType=UNKNOWN, requiredReplicaSetName='null', serverSelector='null', clusterListeners='[]', serverSelectionTimeout='30000 ms', localThreshold='30000 ms'}, socketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=0, receiveBufferSize=0, sendBufferSize=0}, heartbeatSocketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=10000, receiveBufferSize=0, sendBufferSize=0}, connectionPoolSettings=ConnectionPoolSettings{maxSize=100, minSize=0, maxWaitTimeMS=120000, maxConnectionLifeTimeMS=0, maxConnectionIdleTimeMS=0, maintenanceInitialDelayMS=0, maintenanceFrequencyMS=60000, connectionPoolListeners=[], maxConnecting=2}, serverSettings=ServerSettings{heartbeatFrequencyMS=10000, minHeartbeatFrequencyMS=500, serverListeners='[]', serverMonitorListeners='[]'}, sslSettings=SslSettings{enabled=false, invalidHostNameAllowed=false, context=null}, applicationName='null', compressorList=[], uuidRepresentation=UNSPECIFIED, serverApi=null, autoEncryptionSettings=null, contextProvider=null}
2024-03-30 14:00:42,637 INFO  [org.mon.dri.cluster] (cluster-ClusterId{value='6608534a1cdf54173e088644', description='null'}-localhost:27017) Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=13, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=93562834}
2024-03-30 14:00:42,823 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (pool-7-thread-3) Setting hive conf dir as /Users/babyboy/mumma/braineous/infrastructure/apache-hive-3.1.3-bin/conf
2024-03-30 14:00:42,870 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (pool-7-thread-3) Created HiveCatalog 'ffbaaaacaaaaaaaaafaaaabafafafbaebfaa'
2024-03-30 14:00:42,902 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (pool-7-thread-3) Trying to connect to metastore with URI thrift://0.0.0.0:9083
2024-03-30 14:00:42,916 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (pool-7-thread-3) Opened a connection to metastore, current connections: 3
2024-03-30 14:00:42,989 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (pool-7-thread-3) Connected to metastore.
2024-03-30 14:00:42,989 INFO  [org.apa.had.hiv.met.RetryingMetaStoreClient] (pool-7-thread-3) RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=babyboy (auth:SIMPLE) retries=1 delay=1 lifetime=0
2024-03-30 14:00:42,990 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (pool-7-thread-3) Connected to Hive metastore
2024-03-30 14:00:43,096 INFO  [org.apa.fli.tab.cat.CatalogManager] (pool-7-thread-3) Set the current default catalog as [ffbaaaacaaaaaaaaafaaaabafafafbaebfaa] and the current default database as [default].
2808965
{"id":1,"name":"name_1","age":46,"addr":{"email":"name_1@email.com","phone":"123"}}
******CUSTOM_MYSQL_DATA_STORE_ROW*********
2808966
{"id":"2","name":"name_2","age":55,"addr":{"email":"name_2@email.com","phone":"1234"}}
******CUSTOM_MYSQL_DATA_STORE_ROW*********
Connection Closed....
MYSQL: STORED_SUCCESSFULLY
2024-03-30 14:00:43,202 INFO  [com.app.dat.tar.fra.sta.LogicRunner] (pool-7-thread-4) com.appgallabs.dataplatform.targetSystem.framework.staging.InMemoryDB@750c9c8c
2024-03-30 14:00:43,203 INFO  [com.app.dat.tar.fra.sta.LogicRunner] (pool-7-thread-4) PROCESSING: # of records: 2
MONGODB: STORED_SUCCESSFULLY

8. Verify all target collections received the data

8.1. Verify MySQL Target Store

verify mysql

8.2. Verify MongoDB Target Store

To verify the success of the ingestion and delivery to the configured target databases, use the following MongoDB commands.

Expected Result: You should see two records added to a collection called "data" in a database called "zzza" corresponding to configured value configuration.config.database

mongosh

mongosh
use zzza
show collections
db.data.find()
db.data.count()