library(duckdb)
library(dplyr, warn.conflicts = FALSE)
library(dbplyr, warn.conflicts = FALSE)
There are a lot of conversations — understandably — on the use of Apache Parquet, Apache Arrow and DuckDB.
Apache Parquet is an open source columnar file format storage designed for fast I/O
Apache Arrow is a standardization of an in-memory representation of the data: it allows “zero copy” (use of pointer at low level), it is an OLAP (Online Analytical Process) runtime, with deferred or lazy materialization (internally optimizes queries for us).
DuckDB is an implementation (i.e an engine) of an OLAP database.
Those three open source technologies and the amount of resources that a personal laptop has now have lowered the need of using a classic RDBMS for out-of-memory-tasks for data science works, and in some cases maybe the need of using a distributed computing framework like Spark.
Let’s see an example of how they can be used to convert a table in a PostgresSQL database to a parquet file:
Libraries and data (optional) needed
We will use FCC NBM raw data with December 2023’s release. You can learn more about on FCC website.
The table we will be converting has a size of 102 GB (without indexes), has 888,176,676 rows and 12 columns and it is the results of importing around 440 CSVs.
Getting your credentials
Since this exercise is all about converting data that is currently stored in PostgreSQL (using R
), the first step in connecting to your PG database server is getting your credentials. We are assuming here that you have a .pgpass
file located in your home directory (~/.pgpass
).
The code below will assume that you have this ~/.pgpass set and that it contains a one-line connection string.
<- function(path_pgpass) {
get_cred <- readLines(path_pgpass)
pgpass <- unlist(strsplit(pgpass, ":"))
cred names(cred) <- c("host", "port", "db", "user", "pwd")
return(cred)
}
<- get_cred("~/.pgpass") cred
Use DuckDB magic to convert it!
Well, the magic is a four-step steps process:
- Connect to DuckDB
- Get DuckDB’s postgres extension
- Connect to your DB (with the credential we have set)
- Use DuckDB COPY specifying where and how you want it to be partitioned
This will be wrapped in one function:
# yes I am terrible at naming
<- function(table_name, path_for_parquet, part1, part2) {
from_elephant_to_duck # 1. Connect to duckDB
<- DBI::dbConnect(duckdb())
con ::dbExecute(con,
DBIsprintf("SET temp_directory ='%s';", tempdir()))
# cleaning up after the function
on.exit(DBI::dbDisconnect(con), add = TRUE)
# 2. install and load PG extension
::dbExecute(con, "INSTALL postgres")
DBI::dbExecute(con, "LOAD postgres")
DBI
# 3. Connect, "attach" to your PG server
<- sprintf(
attach_string "ATTACH 'dbname=%s user=%s password=%s host=%s' AS db (TYPE POSTGRES, READ_ONLY)",
"db"],
cred["user"],
cred["pwd"],
cred["host"]
cred[
)::dbExecute(con, attach_string)
DBI
# 4. Copy to a parquet
<- sprintf("COPY
copy_string (SELECT *
FROM db.%s)
TO '%s' (FORMAT 'parquet', PARTITION_BY(%s, %s))",
table_name,
path_for_parquet,
part1, part2)::dbExecute(con, copy_string)
DBI
return(invisible(path_for_parquet))
}# not an improvement on this function will be to take cred has an argument
Partitioning
Deciding how to partition a parquet is both a data and business decisions. In this case, state_abbr
and technology
are good tradeoffs in terms of the overall size of each parquet file and the fast performance of common filtering and grouping operations on this data.
Let’s do it and do some quick comparisons
<- Sys.time()
start from_elephant_to_duck("staging.dec23", "dec23", "state_abbr", "technology")
<- Sys.time()
end - start
end # Time difference of 58.92108 mins
On a relatively new MacBook with a wifi-internet-speed connection (probably the limiting factor here) it took a little less than an hour to run from_elephant_to_duck
.
We can also compare our 102 GB to the size of parquet files (ofc. PG offer additional perks!):
du -sh dec23/
# 14 G
Finally, just for the pleasure, let’s run a quick query:
<- Sys.time()
start <- DBI::dbConnect(duckdb::duckdb(), shutdown = TRUE, dbdir = tempdir())
con
<-
reading_string sprintf("read_parquet('%s/*/*/*.parquet', hive_partitioning = true)",
"dec23")
<- dplyr::tbl(con, reading_string)
fcc
# check number of row
|>
fcc summarize(tot_rows = count(location_id)) |>
collect()
# A tibble: 1 × 1
# tot_rows
# <dbl>
# 1 888176676
# let's start one a bit
<- Sys.time()
start # this will count every location_id by state_abbr and frn
# that have low_latency
<- Sys.time()
start <- fcc |>
q1 filter(low_latency == TRUE) |>
summarize(
count_location = n_distinct(location_id),
.by = c(state_abbr, frn)
)
<- collect(q1)
rez_q1 - start
end # Time difference of 4.262549 mins
::dbDisconnect(con) DBI
Impressive, isn’t it? We will probably dig a bit deeper on those new technologies in future blog posts, so check back soon!