Writing Parquet files with Rust to save on cloud storage costs
FinOps measures come in all shapes and sizes. Some may take many work hours to implement, whilst some may only take one brave engineer and some curiosity. And while this is not the story of an actual economic gain, it is one of an interesting study that might lead to one in the future.
In this blog post, I'll go through a snippet of code I've written recently that shows just how efficient Parquet files can be for saving up on cloud storage costs. We'll briefly talk about the Parquet file format, and then go straight into the code, which I've decided to write in Rust.
The Apache Parquet file format is, by their own definition: "an open source, column-oriented data file format designed for efficient data storage and retrieval. It provides high performance compression and encoding schemes to handle complex data in bulk and is supported in many programming language and analytics tools". In this specific snippet of code I'll soon show you, I've focused on the "high performance compression" aspect of it, and how it can be used to save on cloud storage.
The premise of the code is simple: Generate a large CSV file, generate a parquet file with the same data on the CSV, and compare both. I've thrown in some compression on the Parquet file as well, with not much of a positive effect, as you will soon see.
The setup
Rust can't handle parquet files with it's standard library (as most languages), so we must first pull in some dependencies that we'll need along the way. Here's what our Cargo.toml
file ended up with:
[package]
name = "parquet-playground"
version = "0.1.0"
edition = "2021"
[dependencies]
bytesize = "1.3.0" # Pretty print file sizes
chrono = "0.4.38" # Generate Utc timestamp
flate2 = "1.0.30" # GZip Compression Utility
polars = { version = "0.41.2", features = [ # DataFrame library
"lazy", # Provide lazy loading utilities
"parquet", # Parquet features
"streaming", # Buffered reading so we don't load the CSV entirely in memory
"csv", # CSV reading features
] }
rand = "0.8.5" # RNG engine
ulid = "1.1.2" # ULID generation for our fake data
The code
I think the meaning of every line of code on the snippet is pretty clear. So I'll refrain from further comments about the code.
use bytesize::ByteSize;
use flate2::{write::GzEncoder, Compression};
use std::{
fs::File,
io::{copy, BufReader, BufWriter, Error, Write},
time::{Instant, SystemTime},
};
use chrono::{DateTime, Utc};
use polars::{datatypes::Field, prelude::*};
use rand::{seq::SliceRandom, thread_rng, Rng};
use ulid::Ulid;
#[derive(Debug)]
enum TxStatus {
SUCCESS,
FAILURE,
PENDING,
}
// Generate account IDs to fill our CSV with not totally random data
fn generate_account_ids() -> Vec<String> {
let mut accounts = Vec::<String>::new();
for _ in 0..100_000 {
accounts.push(Ulid::new().to_string());
}
accounts
}
// We build the schema of our data so that the parquet file can be optimized with it
fn build_schema() -> Schema {
// TRANSF_ID;ORIGIN_ID;DEST_ID;VALUE;STATUS;TIMESTAMP
Schema::from_iter(vec![
Field::new("TRANSF_ID", DataType::String),
Field::new("ORIGIN_ID", DataType::String),
Field::new("DEST_ID", DataType::String),
Field::new("VALUE", DataType::Float32),
Field::new("STATUS", DataType::String),
Field::new(
"TIMESTAMP",
DataType::Datetime(TimeUnit::Milliseconds, Some(String::from("UTC"))),
),
])
}
fn main() -> Result<(), Error> {
let mut rand_engine = thread_rng();
let account_ids: Vec<String> = generate_account_ids();
let possible_status = vec![TxStatus::SUCCESS, TxStatus::FAILURE, TxStatus::PENDING];
// Create a timer to measure execution time
let timer = Instant::now();
// Generating the CSV file that we will use to generate the parque file with
let csv_file = File::create("input.csv")?;
let mut csv_file_writer = BufWriter::new(csv_file);
csv_file_writer.write("TRANSF_ID;ORIGIN_ID;DEST_ID;VALUE;STATUS;TIMESTAMP\n".as_bytes())?;
for _ in 0..1_000_000 {
let tx_id = Ulid::new().to_string();
let origin_id = account_ids.choose(&mut rand_engine).unwrap();
let dest_id = account_ids.choose(&mut rand_engine).unwrap();
let value = rand_engine.gen_range(0.01..1_000_000.00);
let status = possible_status.choose(&mut rand_engine).unwrap();
let tx_timestamp = SystemTime::now();
let tx_timestamp: DateTime<Utc> = tx_timestamp.into();
csv_file_writer.write(
format!(
"{},{},{},{},{:?},{}\n",
&tx_id,
&origin_id,
&dest_id,
value.to_string(),
&status,
tx_timestamp.to_rfc3339()
)
.as_bytes(),
)?;
}
csv_file_writer.flush()?;
let csv_file = File::open("input.csv")?;
println!(
"CSV file size is: {}",
ByteSize::b(csv_file.metadata()?.len())
);
println!("Wrote CSV file in {:?}", timer.elapsed());
// Write the uncompressed Parquet file
let timer = Instant::now();
let schema = build_schema();
// Lazily loads the CSV data frame, setting the schema to be used
let csv_lazy_data_frame = LazyCsvReader::new("input.csv")
.with_has_header(true)
.with_schema(Some(schema.into()))
.finish()
.unwrap();
let parquet_file = File::create("parquet_output.parquet")?;
let mut parquet_file_writer = BufWriter::new(parquet_file);
ParquetWriter::new(&mut parquet_file_writer)
.finish(&mut csv_lazy_data_frame.with_streaming(true).collect().unwrap())
.unwrap();
let parquet_file = File::open("parquet_output.parquet")?;
println!(
"Uncompressed Parquet file size is: {}",
ByteSize::b(parquet_file.metadata()?.len())
);
println!("Wrote Uncompressed Parquet file in {:?}", timer.elapsed());
// Write the compressed Parquet file
let timer = Instant::now();
let compressed_parquet_file = File::create("compressed_parquet_output.gz")?;
let mut gz_encoder = GzEncoder::new(compressed_parquet_file, Compression::default());
copy(&mut BufReader::new(parquet_file), &mut gz_encoder)?;
gz_encoder.finish()?;
let compressed_parquet_file = File::open("compressed_parquet_output.gz")?;
println!(
"Compressed Parquet file size is: {}",
ByteSize::b(compressed_parquet_file.metadata()?.len())
);
println!("Wrote Compressed Parquet file in {:?}", timer.elapsed());
Ok(())
}
As you can see, the operations are pretty defined: write CSV file, generate Parquet file, and generate compressed Parquet file.
The results
Running in release mode with cargo run --release
we achieve the following results (on average):
CSV file size is: 143.2 MB
Wrote CSV file in 911.760223ms
Uncompressed Parquet file size is: 28.3 MB
Wrote Uncompressed Parquet file in 696.527488ms
Compressed Parquet file size is: 28.3 MB
Wrote Compressed Parquet file in 751.252494ms
There is a 115 MB difference between the original CSV file and it's Parquet counterpart. That's an 80% reduction in file size!
As you can see, compressing the Parquet file made no noticeable difference on the file size. Even though there are a few bytes of difference when looking at the exact byte size.
In terms of cloud cost reduction, of course you can't only consider file storage, there are many other elements that alter how much you're billed. But in terms of storage, if you had 20 TB of structured data stored in an S3 Bucket, that would cost you around 471 USD a month. If you applied this transformation, the 20 TB could go down to 3.8 TB, and that would cost you 89 USD.
Again, these are mere estimates based on the data provided by the experiment I've developed!
Anyways, I see a lot of potential on Parquet files, they can be easily ingested by data analytics tools, and if you can get through the lack of information regarding how to create and consume them, I think it's worth investing your time on it.
Afterthought Note: I have also tried compressing the CSV using the Gzip utility, and the result was a ~55 MB file. So no, simply compressing the CSV isn't as effective as the Parquet transformation.