Continuing from the previous blog, we discussed moving Postgres RDS Database tables to snowflake, using AWS DataPipeline using a simple PSQL to export. These next set of challenges are really moving semi-structured and unstructured data to Snowflake.
As part of the ETA project, some of the data are stored in Dynamodb and Elasticsearch. 3 of the DynamoDB table contains the latest truckload and less than truckload data, it’s a temporary storage which lasts for 1-2 days. A microservice uses these tables to keep the data fresh for client consumption. Elasticsearch on the other hand contains all hit rates of the API, including customer’s location tracking a truck. Tracking a truck is really the heart of ETA, since the actual location comes from the carrier network connected to our APIs. This visibility via logs is a treasure.
Since part of the architecture already solves the problem of ingestion to Snowflake using AWS Data Pipeline, the next part is really exporting data (DynamoDB and Elasticseach) to S3 bucket.
DynamoDB Exports
DynamoDB —> App Importer <— S3 Bucket |
For this process, we wrote a simple shell cron being called by a kubernetes cronjob to export the data from Dynamodb to s3 bucket. As this process will export the entire table (we have 3 tables for this ETA project that are important), that export process is around ~4 hrs, it needs to start at a specific time. AWS data pipeline will trigger on a specific time as well based on this information and execute Snowflake ingest.
Tools we’ve used.
Writing a very simple Shell script.
Elasticsearch Exports —
The other piece, which is the most interesting and challenging part, is moving data with logs as big as 8TB daily when we started (circa 2018), I will not be surprised if now it’s around~25-28TB per day.
Logs are also challenging, as well, each microservice app has some attributes added per app, which makes the logstash parsing particularly moving target.
Below is the data pipeline of logs.
App(S) —> Log4j-Kafka—> Kafka —> Kafka-Logstash —> Elasticsearch |
We are able to stream a lot of messages to elasticsearch realtime and are able to diagnose issues fast as per clients’ complaints. Also, we are able to ingest up-to ~20M messages per minute on a good day.
Elasticsearch export is a complicated problem:
10K dataset limit – it takes forever to export the data.
Exporting data sometimes timeout, we need a retry mechanism
It’s constantly being used to render quick kibana reports.
Continuous ingestion of logs as per info above.
If we ever strain our Elasticsearch cluster, it will cause a downtime and cause our ingestion to lag.
NOTE:
We found a way to export data, using the simple tool architecture from the above pipeline.. Please reach out if you want to know more.
External Stage usage:
A stage is a s3 like location, where you can run a query using snowflakes query engine, and retrieve information using the storage location of the source. We can use it to run query checks, transformations and among others. This will be incredibly useful for our loading down stream.
Stored Procedure:
A stored procedure is a set of instructions/program/code that is used to manipulate data on Snowflake. We use a stored procedure to insert the data to the final table of logs based on partition date.
At this time, a Snowflake StoredProcedure only supported JavaScript. – They have expanded since then.
This code is long but it is really simple, based on start and end date, check daily and build the S3 url partition to see if data exists. After which, load the external stage to filter, transform the data base on needs of the final table.
Different Warehouse per ingestion
This will depend on the size of the s3 file, for databases that are between < 25G, we use a small warehouse, anything that is larger up-to 100G we use medium-large depending on criticality of the data and reports.
StoredProcedure above is an X-large warehouse and the dynamoDB ingest is a large warehouse.
As per this part, we just really needed to solve few things:
1) extract/export data out of DynamoDB in a consistent and repeatable way. 2) extract/export data out of Elasticsearch, its log and we are ingesting 8TB of logs everyday. 3) Ingest all of those concurrently to Snowflake and provide a daily snapshot of data.
The combination of ETL tools, apps and the internals of snowflake made things possible for moving structured, semi-structured and unstructured data into snowflake. The design and architecture designed and implemented made all things possible.
Ready on your Snowflake Journey? Contact our technical team today!
Comentarios