Discover how to access your piped data in the Apache Hive based Data Lake

This guide covers:

  • Get the Source Data

  • Register a data pipe and send the data to the configured Data Pipeline

  • Verify the data is stored in the Apache Hive based Data Lake

1. Prerequisites

To complete this guide, you need:

  • Roughly 15 minutes

  • An IDE

  • JDK 11+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.9.5

Verify Maven is using the Java you expect

If you have multiple JDK’s installed, it is not certain Maven will pick up the expected java and you could end up with unexpected results. You can verify which JDK Maven uses by running mvn --version.

Download the Braineous-1.0.0-CR2 zip archive

This tutorial is located under: braineous-1.0.0-cr2/tutorials/datalake

2. Initialize

Get an instance of the DataPlatformService. Setup your API_KEY and API_SECRET

DataPlatformService dataPlatformService = DataPlatformService.getInstance();

String apiKey = "ffb2969c-5182-454f-9a0b-f3f2fb0ebf75";
String apiSecret = "5960253b-6645-41bf-b520-eede5754196e";

3. Get the Source Data

Let’s start with a simple Json array to be used as datasource to be ingested by the Braineous Data Ingestion Engine

Source Data

[
  {
    "id" : 1,
    "name": "name_1",
    "age": 46,
    "addr": {
      "email": "name_1@email.com",
      "phone": "123"
    }
  },
  {
    "id": "2",
    "name": "name_2",
    "age": 55,
    "addr": {
      "email": "name_2@email.com",
      "phone": "1234"
    }
  }
]

Java Code

String datasetLocation = "dataset/data.json";
String json = Util.loadResource(datasetLocation);

A dataset can be loaded from any data source such as a database, legacy production data store, live data feed, third-party data source, Kafka stream, etc. In this example the dataset is loaded from a classpath resource located at src/main/resources/dataset/data.json

4. Register a data pipe and send source data to configured data pipeline

Register a data pipe with the Braineous Data Ingestion Engine using the Java Braineous Data Ingestion Client SDK.

Pipe Configuration

{
  "pipeId": "jbdxyz",
  "entity": "ybyyby",
  "configuration": [
    {
      "stagingStore" : "com.appgallabs.dataplatform.targetSystem.core.driver.MongoDBStagingStore",
      "name": "jbdxyz",
      "config": {
        "connectionString": "mongodb://localhost:27017",
        "database": "jbdxyz",
        "collection": "data",
        "jsonpathExpressions": []
      }
    }
  ]
}
  • pipeId : As a data source provider, this id identifies this data pipe uniquely with the Braineous Data Pipline Engine.

  • entity : The business/domain entity that this dataset should be associated with.

  • configuration.stagingStore: The Staging Store driver

  • configuration.name: a user-friendly way to indentify the target store

  • configuration.config.connectionString: MongoDB database connection string for your target store

  • configuration.config.database: MongoDB database on your target store

  • configuration.config.collection: MongoDB database collection on your target store

A data pipe can be configured with multiple target stores/systems associated with the same data pipe for data delivery.

In the next Candidate Release, Braineous team will add support for more target stores and systems such as :

  • Postgresql

  • Mysql

  • Oracle

  • Snowflake

  • Microservices

  • Airbyte Catalog

Java Code - Register Pipe

String configLocation = "pipe_config/pipe_config.json";
String pipeConfigJson = Util.loadResource(configLocation);
JsonObject configJson = JsonUtil.validateJson(pipeConfigJson).getAsJsonObject();
String pipeId = configJson.get("pipeId").getAsString();
String entity = configJson.get("entity").getAsString();
System.out.println("*****PIPE_CONFIGURATION******");
JsonUtil.printStdOut(configJson);

//configure the DataPipeline Client
Configuration configuration = new Configuration().
ingestionHostUrl("http://localhost:8080/").
apiKey(apiKey).
apiSecret(apiSecret).
streamSizeInObjects(0);
dataPlatformService.configure(configuration);

//register pipe
dataPlatformService.registerPipe(configJson);
System.out.println("*****PIPE_REGISTRATION_SUCCESS******");

Pipe Configuration can be provided dynamically at runtime. The source can be a database, a configuration system, local file system, network file system etc. In this example the dataset is loaded from a classpath resource located at src/main/resources/pipe_config/pipe_config.json

Java Code - Send Data for ingestion

//send source data through the pipeline
dataPlatformService.sendData(pipeId, entity,datasetElement.toString());
System.out.println("*****DATA_INGESTION_SUCCESS******");

Java Code - Verify the data stored in Apache Hive based Data Lake

System.out.println("********DATALAKE_ASSERTION_PHASE_STARTED....***********");
Thread.sleep(15000);

String table = JobManagerUtil.getTable(apiKey, pipeId, entity);
String selectSql = "select * from "+table;
dataPlatformService.print(
        pipeId,
        entity,
        selectSql
);

Braineous server output

baebfaa.jbdxyz.ybyyby' (26d9548a5795925914bba8f0a73ae5dd) to 'http://127.0.0.1:8081'.
2024-04-02 23:15:56,792 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (executor-thread-0) Setting hive conf dir as ./services/hive_3.1.3/conf
2024-04-02 23:15:56,822 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (executor-thread-0) Created HiveCatalog 'ffbaaaacaaaaaaaaafaaaabafafafbaebfaa'
2024-04-02 23:15:56,825 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (executor-thread-0) Trying to connect to metastore with URI thrift://0.0.0.0:9083
2024-04-02 23:15:56,835 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (executor-thread-0) Opened a connection to metastore, current connections: 3
2024-04-02 23:15:56,839 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (executor-thread-0) Connected to metastore.
2024-04-02 23:15:56,839 INFO  [org.apa.had.hiv.met.RetryingMetaStoreClient] (executor-thread-0) RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=babyboy (auth:SIMPLE) retries=1 delay=1 lifetime=0
2024-04-02 23:15:56,840 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (executor-thread-0) Connected to Hive metastore
2024-04-02 23:15:56,849 INFO  [org.apa.fli.tab.cat.CatalogManager] (executor-thread-0) Set the current default catalog as [ffbaaaacaaaaaaaaafaaaabafafafbaebfaa] and the current default database as [default].
select * from ffbaaaacaaaaaaaaafaaaabafafafbaebfaa.jbdxyz.ybyyby
2024-04-02 23:15:57,467 INFO  [org.apa.fli.cli.pro.res.RestClusterClient] (Flink-RestClusterClient-IO-thread-1) Submitting job 'collect' (8b788595a1c3160dac40109afcf9648c).
2024-04-02 23:16:01,295 INFO  [org.apa.fli.cli.pro.res.RestClusterClient] (Flink-RestClusterClient-IO-thread-4) Successfully submitted job 'collect' (8b788595a1c3160dac40109afcf9648c) to 'http://127.0.0.1:8081'.
********DATA**********
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                             id |                           name |                            age |                     addr.email |                     addr.phone |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                              1 |                         name_1 |                             46 |               name_1@email.com |                            123 |
| +I |                              2 |                         name_2 |                             55 |               name_2@email.com |                           1234 |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
2 rows in set
**********************

5. Run the Tutorial

cd braineous-1.0.0-cr2/tutorials/datalake
./run.sh

Expected Client Output:

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
*****DATA_SET******
******ARRAY_SIZE: 2**********
[
  {
    "id": 1,
    "name": "name_1",
    "age": 46,
    "addr": {
      "email": "name_1@email.com",
      "phone": "123"
    }
  },
  {
    "id": "2",
    "name": "name_2",
    "age": 55,
    "addr": {
      "email": "name_2@email.com",
      "phone": "1234"
    }
  }
]
**********************
*****PIPE_CONFIGURATION******
{
  "pipeId": "jbdxyz",
  "entity": "ybyyby",
  "configuration": [
    {
      "stagingStore": "com.appgallabs.dataplatform.targetSystem.core.driver.MongoDBStagingStore",
      "name": "jbdxyz",
      "config": {
        "connectionString": "mongodb://localhost:27017",
        "database": "jbdxyz",
        "collection": "data",
        "jsonpathExpressions": []
      }
    }
  ]
}
**********************
*****PIPE_REGISTRATION_SUCCESS******
***SENDING_DATA_START*****
*****DATA_INGESTION_SUCCESS******
********DATALAKE_ASSERTION_PHASE_STARTED....***********
***SENDING_PRINT_QUERY*****

Expected Braineous server output

baebfaa.jbdxyz.ybyyby' (26d9548a5795925914bba8f0a73ae5dd) to 'http://127.0.0.1:8081'.
2024-04-02 23:15:56,792 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (executor-thread-0) Setting hive conf dir as ./services/hive_3.1.3/conf
2024-04-02 23:15:56,822 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (executor-thread-0) Created HiveCatalog 'ffbaaaacaaaaaaaaafaaaabafafafbaebfaa'
2024-04-02 23:15:56,825 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (executor-thread-0) Trying to connect to metastore with URI thrift://0.0.0.0:9083
2024-04-02 23:15:56,835 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (executor-thread-0) Opened a connection to metastore, current connections: 3
2024-04-02 23:15:56,839 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (executor-thread-0) Connected to metastore.
2024-04-02 23:15:56,839 INFO  [org.apa.had.hiv.met.RetryingMetaStoreClient] (executor-thread-0) RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=babyboy (auth:SIMPLE) retries=1 delay=1 lifetime=0
2024-04-02 23:15:56,840 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (executor-thread-0) Connected to Hive metastore
2024-04-02 23:15:56,849 INFO  [org.apa.fli.tab.cat.CatalogManager] (executor-thread-0) Set the current default catalog as [ffbaaaacaaaaaaaaafaaaabafafafbaebfaa] and the current default database as [default].
select * from ffbaaaacaaaaaaaaafaaaabafafafbaebfaa.jbdxyz.ybyyby
2024-04-02 23:15:57,467 INFO  [org.apa.fli.cli.pro.res.RestClusterClient] (Flink-RestClusterClient-IO-thread-1) Submitting job 'collect' (8b788595a1c3160dac40109afcf9648c).
2024-04-02 23:16:01,295 INFO  [org.apa.fli.cli.pro.res.RestClusterClient] (Flink-RestClusterClient-IO-thread-4) Successfully submitted job 'collect' (8b788595a1c3160dac40109afcf9648c) to 'http://127.0.0.1:8081'.
********DATA**********
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                             id |                           name |                            age |                     addr.email |                     addr.phone |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                              1 |                         name_1 |                             46 |               name_1@email.com |                            123 |
| +I |                              2 |                         name_2 |                             55 |               name_2@email.com |                           1234 |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
2 rows in set
**********************