arkdb
arkdb copied to clipboard
Archive and unarchive databases as flat text files
arkdb 
The goal of arkdb
is to provide a convenient way to move data from
large compressed text files (tsv, csv, etc) into any DBI-compliant
database connection (e.g. MYSQL, Postgres, SQLite; see
DBI), and move tables out of such
databases into text files. The key feature of arkdb
is that files are
moved between databases and text files in chunks of a fixed size,
allowing the package functions to work with tables that would be much
too large to read into memory all at once. There is also functionality
for filtering and applying transformation to data as it is extracted
from the database.
The arkdb
package is easily extended to use custom read and write
methods allowing you to dictate your own output formats. See
R/streamable_table.R
for examples that include using:
- Base c/tsv
- Apache arrow’s parquet
- The
readr
package for c/tsv
Links
- A more detailed introduction to package design and use can be found in the package Vignette
- Online versions of package documentation
Installation
You can install arkdb from GitHub with:
# install.packages("devtools")
devtools::install_github("cboettig/arkdb")
Basic use
library(arkdb)
# additional libraries just for this demo
library(dbplyr)
library(dplyr)
library(fs)
Creating an archive of a database
Consider the nycflights
database in SQLite:
tmp <- tempdir() # Or can be your working directory, "."
db <- dbplyr::nycflights13_sqlite(tmp)
#> Caching nycflights db at /tmp/RtmpKGu2Ay/nycflights13.sqlite
#> Creating table: airlines
#> Creating table: airports
#> Creating table: flights
#> Creating table: planes
#> Creating table: weather
Create an archive of the database:
dir <- fs::dir_create(fs::path(tmp, "nycflights"))
ark(db, dir, lines = 50000)
#> Exporting airlines in 50000 line chunks:
#> ...Done! (in 0.006583929 secs)
#> Exporting airports in 50000 line chunks:
#> ...Done! (in 0.02108455 secs)
#> Exporting flights in 50000 line chunks:
#> ...Done! (in 8.810824 secs)
#> Exporting planes in 50000 line chunks:
#> ...Done! (in 0.02794719 secs)
#> Exporting weather in 50000 line chunks:
#> ...Done! (in 0.6644697 secs)
Unarchive
Import a list of compressed tabular files (i.e. *.csv.bz2
) into a
local SQLite database:
files <- fs::dir_ls(dir)
new_db <- DBI::dbConnect(RSQLite::SQLite(), fs::path(tmp, "local.sqlite"))
unark(files, new_db, lines = 50000)
#> Importing /tmp/RtmpKGu2Ay/nycflights/airlines.tsv.bz2 in 50000 line chunks:
#> ...Done! (in 0.0117662 secs)
#> Importing /tmp/RtmpKGu2Ay/nycflights/airports.tsv.bz2 in 50000 line chunks:
#> ...Done! (in 0.02637362 secs)
#> Importing /tmp/RtmpKGu2Ay/nycflights/flights.tsv.bz2 in 50000 line chunks:
#> ...Done! (in 6.802646 secs)
#> Importing /tmp/RtmpKGu2Ay/nycflights/planes.tsv.bz2 in 50000 line chunks:
#> ...Done! (in 0.03848696 secs)
#> Importing /tmp/RtmpKGu2Ay/nycflights/weather.tsv.bz2 in 50000 line chunks:
#> ...Done! (in 0.3772023 secs)
Using filters
This package can also be used to generate slices of data that are required for analytical or operational purposes. In the example below we archive to disk only the flight data that occured in the month of December. It is recommended to use filters on a single table at a time.
ark(db, dir, lines = 50000, tables = "flights", filter_statement = "WHERE month = 12")
Using callbacks
It is possible to use a callback to perform just-in-time data
transformations before ark writes your data object to disk in your
preferred format. In the example below, we write a simple transformation
to convert the flights data arr_delay
field, from minutes, to hours.
It is recommended to use callbacks on a single table at a time. A
callback function can be anything you can imagine so long as it returns
a data.frame that can be written to disk.
mins_to_hours <- function(data) {
data$arr_delay <- data$arr_delay/60
data
}
ark(db, dir, lines = 50000, tables = "flights", callback = mins_to_hours)
ark() in parallel
There are two strategies for using ark
in parallel. One is to loop
over the tables, re-using the ark function per table in parallel. The
other, introduced in 0.0.15, is to use the “window-parallel” method
which loops over chunks of your table. This is particularly useful if
your tables are very large and can speed up the process significantly.
Note: window-parallel
currently only works in conjunction with
streamable_parquet
# Strategy 1: Parallel over tables
library(arkdb)
library(future.apply)
plan(multisession)
# Any streamable_table method is acceptable
future_lapply(vector_of_tables, function(x) ark(db, dir, lines, tables = x))
# Strategy 2: Parallel over chunks of a table
library(arkdb)
library(future.apply)
plan(multisession)
ark(
db,
dir,
streamable_table = streamable_parquet(), # required for window-parallel
lines = 50000,
tables = "flights",
method = "window-parallel"
)
# Strategy 3: Parallel over tables and chunks of tables
library(arkdb)
library(future.apply)
# 16 core machine for example
plan(list(tweak(multisession, n = 4), tweak(multisession, n = 4)))
# 4 tables at a time, 4 threads per table
future_lapply(vector_of_tables, function(x) {
ark(
db,
dir,
streamable_table = streamable_parquet(), # required for window-parallel
lines = 50000,
tables = x,
method = "window-parallel")
}
)
ETLs with arkdb
The arkdb
package can also be used to create a number of ETL pipelines
involving text archives or databases given its ability to filter, and
use callbacks. In the example below, we leverage duckdb
to read a
fictional folder of files by US state, filter by var_filtered
, apply a
callback transformation transform_fun
to var_transformed
save as
parquet, and then load a folder of parquet files for analysis with
Apache Arrow.
library(arrow)
library(duckdb)
db <- dbConnect(duckdb::duckdb())
transform_fun <- function(data) {
data$var_transformed <- sqrt(data$var_transformed)
data
}
for(state in c("DC", state.abb)) {
path <- paste0("path/to/archives/", state, ".gz")
ark(
db,
dir = paste0("output/", state),
streamable_table = streamable_parquet(), # parquet files of nline rows
lines = 100000,
# See: https://duckdb.org/docs/data/csv
tables = sprintf("read_csv_auto('%s')", path),
compress = "none", # Compression meaningless for parquet as it's already compressed
overwrite = T,
filenames = state, # Overload tablename
filter_statement = "WHERE var_filtered = 1",
callback = transform_fun
)
}
# The result is trivial to read in with arrow
ds <- open_dataset("output", partitioning = "state")
Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms.