Data Tranformation on a Data Pipeline
This guide covers:
-
Get the Source Data
-
Register a data pipe and send source data to configured target MongoDB database
-
Verify the target database received the data after transformation using JSONPath expressions.
1. Prerequisites
To complete this guide, you need:
-
Roughly 15 minutes
-
An IDE
-
JDK 11+ installed with
JAVA_HOME
configured appropriately -
Apache Maven 3.9.5
Verify Maven is using the Java you expect
If you have multiple JDK’s installed, it is not certain Maven will pick up the expected java
and you could end up with unexpected results.
You can verify which JDK Maven uses by running |
Download the Braineous-1.0.0-CR3 zip archive.
This tutorial is located under: braineous-1.0.0-cr3/tutorials/data-transformation
2. Initialize
Get an instance of the DataPlatformService
. Setup your API_KEY
and API_SECRET
.
Please refer to 'Step 9' of the Getting Started guide.
https://bugsbunnyshah.github.io/braineous/get-started/
DataPlatformService dataPlatformService = DataPlatformService.getInstance();
String apiKey = "ffb2969c-5182-454f-9a0b-f3f2fb0ebf75";
String apiSecret = "5960253b-6645-41bf-b520-eede5754196e";
3. Get the Source Data
Let’s start with a simple Json array to be used as datasource to be ingested by the Braineous Data Ingestion Engine
Source Data
[
{
"id" : 1,
"name": "name_1",
"age": 46,
"addr": {
"email": "name_1@email.com",
"phone": "123"
}
},
{
"id": "2",
"name": "name_2",
"age": 55,
"addr": {
"email": "name_2@email.com",
"phone": "1234"
}
}
]
Java Code
String datasetLocation = "dataset/data.json";
String json = Util.loadResource(datasetLocation);
A dataset can be loaded from any data source such as a database, legacy production data store,
live data feed, third-party data source, Kafka stream, etc. In this example the dataset is loaded from a classpath
resource located at src/main/resources/dataset/data.json
4. Register a data pipe and send source data to configured target MongoDB database
Register a data pipe with the Braineous Data Ingestion Engine using the Java Braineous Data Ingestion Client SDK.
Pipe Configuration
{
"pipeId": "xmlyxyy",
"entity": "xmlyxyy",
"configuration": [
{
"stagingStore" : "com.appgallabs.dataplatform.targetSystem.core.driver.MongoDBStagingStore",
"name": "xmlyxyy",
"config": {
"connectionString": "mongodb://localhost:27017",
"database": "xmlyxyy",
"collection": "data",
"jsonpathExpressions": [
"$.name",
"$.addr"
]
}
}
]
}
-
pipeId : As a data source provider, this id identifies this data pipe uniquely with the Braineous Data Pipline Engine.
-
entity : The business/domain entity that this dataset should be associated with.
-
configuration.stagingStore: The
Staging Store
driver -
configuration.name: a user-friendly way to indentify the target store
-
configuration.config.connectionString: MongoDB database connection string for your target store
-
configuration.config.database: MongoDB database on your target store
-
configuration.config.collection: MongoDB database collection on your target store
-
configuration.config.jsonpathExpressions: transform the payload your system is compatible with
A data pipe can be configured with multiple target stores/systems associated with the same data pipe for data delivery.
The current Release, supports the following target stores
-
Snowflake
-
MySQL
-
ElasticSearch
-
MongoDB
-
ClickHouse
In the future releases, Braineous team will add support for more target stores and systems such as :
-
Postgresql
-
Oracle
-
Amazon RedShift
Java Code - Register Pipe
String configLocation = "pipe_config/pipe_config.json";
String pipeConfigJson = Util.loadResource(configLocation);
JsonObject configJson = JsonUtil.validateJson(pipeConfigJson).getAsJsonObject();
String pipeId = configJson.get("pipeId").getAsString();
String entity = configJson.get("entity").getAsString();
System.out.println("*****PIPE_CONFIGURATION******");
JsonUtil.printStdOut(configJson);
//configure the DataPipeline Client
Configuration configuration = new Configuration().
ingestionHostUrl("http://localhost:8080/").
apiKey(apiKey).
apiSecret(apiSecret).
streamSizeInObjects(0);
dataPlatformService.configure(configuration);
//register pipe
dataPlatformService.registerPipe(configJson);
System.out.println("*****PIPE_REGISTRATION_SUCCESS******");
Pipe Configuration can be provided dynamically at runtime. The source can be a
database, a configuration system, local file system, network file system etc.
In this example the dataset is loaded from a classpath
resource located at src/main/resources/pipe_config/pipe_config.json
Java Code - Send Data for ingestion
//send source data through the pipeline
dataPlatformService.sendData(pipeId, entity,datasetElement.toString());
System.out.println("*****DATA_INGESTION_SUCCESS******");
5. Run the Tutorial
cd braineous-1.0.0-cr3/tutorials/data-transformation
./run.sh
Expected Client Output
:
[INFO] ------------------------------------------------------------------------
*****DATA_SET******
******ARRAY_SIZE: 2**********
[
{
"id": 1,
"name": "name_1",
"age": 46,
"addr": {
"email": "name_1@email.com",
"phone": "123"
}
},
{
"id": "2",
"name": "name_2",
"age": 55,
"addr": {
"email": "name_2@email.com",
"phone": "1234"
}
}
]
**********************
*****PIPE_CONFIGURATION******
{
"pipeId": "xmlyxyy",
"entity": "xmlyxyy",
"configuration": [
{
"stagingStore": "com.appgallabs.dataplatform.targetSystem.core.driver.MongoDBStagingStore",
"name": "xmlyxyy",
"config": {
"connectionString": "mongodb://localhost:27017",
"database": "xmlyxyy",
"collection": "data",
"jsonpathExpressions": [
"$.name",
"$.addr"
]
}
}
]
}
**********************
*****PIPE_REGISTRATION_SUCCESS******
***SENDING_DATA_START*****
*****DATA_INGESTION_SUCCESS******
********DATALAKE_ASSERTION_PHASE_STARTED....***********
***SENDING_PRINT_QUERY*****
Expected Server Output
:
baebfaa.xmlyxyy.xmlyxyy' (2916dac023e65786f5b5664ec6230aa0) to 'http://127.0.0.1:8081'.
2024-04-05 12:32:10,221 INFO [org.apa.fli.tab.cat.hiv.HiveCatalog] (executor-thread-0) Setting hive conf dir as /Users/babyboy/mumma/braineous/infrastructure/apache-hive-3.1.3-bin/conf
2024-04-05 12:32:10,256 INFO [org.apa.fli.tab.cat.hiv.HiveCatalog] (executor-thread-0) Created HiveCatalog 'ffbaaaacaaaaaaaaafaaaabafafafbaebfaa'
2024-04-05 12:32:10,271 INFO [org.apa.had.hiv.met.HiveMetaStoreClient] (executor-thread-0) Trying to connect to metastore with URI thrift://0.0.0.0:9083
2024-04-05 12:32:10,277 INFO [org.apa.had.hiv.met.HiveMetaStoreClient] (executor-thread-0) Opened a connection to metastore, current connections: 6
2024-04-05 12:32:10,283 INFO [org.apa.had.hiv.met.HiveMetaStoreClient] (executor-thread-0) Connected to metastore.
2024-04-05 12:32:10,306 INFO [org.apa.had.hiv.met.RetryingMetaStoreClient] (executor-thread-0) RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=babyboy (auth:SIMPLE) retries=1 delay=1 lifetime=0
2024-04-05 12:32:10,308 INFO [org.apa.fli.tab.cat.hiv.HiveCatalog] (executor-thread-0) Connected to Hive metastore
2024-04-05 12:32:10,345 INFO [org.apa.fli.tab.cat.CatalogManager] (executor-thread-0) Set the current default catalog as [ffbaaaacaaaaaaaaafaaaabafafafbaebfaa] and the current default database as [default].
select * from ffbaaaacaaaaaaaaafaaaabafafafbaebfaa.xmlyxyy.xmlyxyy
2024-04-05 12:32:10,683 INFO [org.apa.fli.cli.pro.res.RestClusterClient] (Flink-RestClusterClient-IO-thread-1) Submitting job 'collect' (bf31592b931b9d3e9f805cfb71ee2d6a).
2024-04-05 12:32:14,787 INFO [org.apa.fli.cli.pro.res.RestClusterClient] (Flink-RestClusterClient-IO-thread-4) Successfully submitted job 'collect' (bf31592b931b9d3e9f805cfb71ee2d6a) to 'http://127.0.0.1:8081'.
********DATA**********
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op | id | name | age | addr.email | addr.phone |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 1 | name_1 | 46 | name_1@email.com | 123 |
| +I | 2 | name_2 | 55 | name_2@email.com | 1234 |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
2 rows in set
**********************
2024-04-05 12:33:18,876 INFO [org.apa.kaf.cli.NetworkClient] (Thread-3) [Consumer clientId=consumer-test-group-1, groupId=test-group] Node -1 disconnected.
2024-04-05 12:33:54,990 INFO [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Node -1 disconnected.
2024-04-05 12:40:25,022 INFO [org.apa.kaf.cli.NetworkClient] (Thread-7) [Consumer clientId=consumer-test-group-2, groupId=test-group] Node -1 disconnected.
6. Verify target received a transformed payload
To verify the success of the ingestion and delivery to the configured target databases, use the following MongoDB commands.
Expected Result: You should see two records added to a collection called "data"
in a database called "xmlyxyy" corresponding to configured value
configuration.config.database
age
and id
fields should not be included, however, the data lake will capture all data.
mongosh
mongosh
use xmlyxyy
show collections
db.data.find()
xmlyxyy> show collections
data
xmlyxyy> db.data.find()
[
{
_id: ObjectId("6610277b0151ce1d8e9e9a30"),
name: 'name_1',
addr: { email: 'name_1@email.com', phone: '123' }
},
{
_id: ObjectId("6610277b0151ce1d8e9e9a31"),
name: 'name_2',
addr: { email: 'name_2@email.com', phone: '1234' }
}
]