EDA 4-15-2024 - DIN Access Benchmarks

Author

Ran Li

Published

April 15, 2024

Intro

There are bottle necks in the DIN workflow:

  • Computational bottlenecks
    • takes a long time
    • may need heavy infrastructure (clusters)
  • Data Access bottle necks.
    • the cleaned data (.csv) is too big to load/work with

This notebook describes how we can overcome these bottle necks by using big data dataframe engines such as Apache Arrow and Polars to work out-of-memory. Importantly, these workflows all utilize the same tidyverse API to write queries as dplyr. This makes make for almost zero retooling but supercharging of computing capacity. Note we don’t try to the traditional worklflow of importing everything into memory because honestly it takes forever and might not even fit into RAM on most workstations.

One of the bed rocks of these out-of-memory work flows is that data should be stored in columnar storage such as parquet. For this demo we have use Arrow to stream-convert the AR and BR DIN mortality datasets into a parquet hadoop distributed file system (HDFS). Below is how this file system is structure

hdfs/
├── iso2=AR/
│ ├── year=2005/
│ │ └── part-0.parquet
│ ├── ...
│ └── year=2020/
│ └── part-0.parquet
├── iso2=BR/
│ ├── year=2000/
│ │ └── part-0.parquet
│ ├── ...
│ └── year=2020/
│ └── part-0.parquet

We will go over a few methods and benchmarks including:

  • Streaming computations
  • Rust/C++ in-memory benchmarks
  • parquet: HDFS vs OBT benchmarks
    • streaming
    • loading

Packages to test

Lets load the packages we will benchmark:

  • Streaming and In process
    • Tidypolars: Rust based data frame library with with tidyverse API
  • Streaming only
    • Apache Arrow: highly interoperable C++ dataframe library with tidyverse API
  • In memory only
    • duckplyr: dplyr + duckdb
    • dtplyr: a package for working with data.table with tidyverse API
Setup Code
library(pacman)
p_load(arrow, tidypolars, duckplyr, dtplyr) 

Connect to File System

An alternative to working in memory is to stream computations on disk. The first step is not importing but connecting to data - preferably columnar storage such as parquet which is designed for this type of work.

Connect to File System for streaming
## Arrow
db_arrow = arrow::open_dataset(path_hdfs)

## Polars
db_polars = pl$scan_parquet(file.path(path_hdfs, "**/*.parquet"))

## Check metadata
dim(db_arrow)
[1] 17706120      235
Connect to File System for streaming
db_polars
polars LazyFrame
 $describe_optimized_plan() : Show the optimized query plan.

Naive plan:

  Parquet SCAN 37 files: first file: tmp\hdfs\iso2=AR\year=2005\part-0.parquet
  PROJECT */235 COLUMNS

Notice how when we print the connection, it doesn’t return a dataframe but rather a connection object. This is because the data is not loaded into memory but rather we are connected to the data source - similiar to how you would connect to a database.

But we still have some metadata such as we can see the table is ~17.7 millions rows with 235 columns.

Streaming computations

Here we will do our queries on the whole file system of BR and AR results.

Construct Benchmark queries

Lets do four queries!

Query 1: Group by summarize min/max
q1_arrow =  db_arrow %>% 
  group_by(iso2) %>% 
  summarize(
    min_year = min(year),
    max_year = max(year))

q1_polars = db_polars %>% 
  group_by(iso2) %>% 
  summarize(
    min_year = min(year),
    max_year = max(year))
Query 2: Group by count
q2_arrow = db_arrow %>% 
  group_by(iso2, DINMALE, year) %>% 
  summarize(n_deaths = n())

q2_polars = db_polars %>% 
  group_by(iso2, DINMALE, year) %>% 
  summarize(n_deaths = n()) 
Query 3: Multi-table query with joins
q3_arrow = db_arrow %>% 
  count(iso2, DINCOD_FINAL1,
        name = 'n_deaths_ghe1') %>% 
  left_join(
    count(db_arrow,
          iso2,
          name = 'n_deaths_iso2'
    )) %>%
  mutate(pct_death = (n_deaths_ghe1/n_deaths_iso2)*100 )


q3_polars = db_polars %>% 
  count(iso2, DINCOD_FINAL1,
        name = 'n_deaths_ghe1') %>% 
  mutate(n_deaths_ghe1 = as.numeric(n_deaths_ghe1)) %>% 
  left_join(
    count(db_polars,
          iso2,
          name = 'n_deaths_iso2'
    ) %>% 
      mutate(n_deaths_iso2 = as.numeric(n_deaths_iso2))) %>%
  mutate(pct_death = (n_deaths_ghe1/n_deaths_iso2)*100 )
Query 4: Select, filter, type casting, grou means, counts, min/max
q4_arrow =  db_arrow %>%
  select(SALID1, SALID2, YEAR, MONTH, DINAGE, DINICD, iso2) %>%
  mutate(
    DINAGE_num = as.numeric(DINAGE),
    MONTH_num = as.numeric(MONTH)
  ) %>%
  group_by(SALID2) %>%
  summarize(
    average_age = mean(DINAGE_num, na.rm = TRUE),
    count = n(),
    min_month = min(MONTH_num, na.rm = TRUE),
    max_month = max(MONTH_num, na.rm = TRUE),
    .groups = 'drop'
  )

q4_polars =  db_polars %>%
  select(SALID1, SALID2, YEAR, MONTH, DINAGE, DINICD, iso2) %>% 
  mutate(DINAGE_num = as.numeric(DINAGE),
         MONTH_num = as.numeric(MONTH)) %>%
  group_by(SALID2) %>%
  summarize(
    average_age = mean(DINAGE_num),
    count = n(),
    min_month = min(MONTH_num),
    max_month = max(MONTH_num),
    .groups = 'drop'
  )

Run benchmarks

Query 1: Summarize min/max
if (!file.exists(out_result1)){
  result1 = bench::mark( ## start 10:55PM
    arrow = {collect(q1_arrow)},
    polars = {collect(q1_polars)},
    check = FALSE,
    iterations = 5
  )
  result1 %>% saveRDS(out_result1)
} else {
  result1 = readRDS(out_result1)
}
Query 2: Group by count
if (!file.exists(out_result2)){
  result2 = bench::mark(
    arrow = {collect(q2_arrow)},
    polars = {collect(q2_polars)},
    check = FALSE,
    iterations = 5
  )
  result2 %>% saveRDS(out_result2)
} else {
  result2 = readRDS(out_result2)
}
Query 3: Multi-table query
if (!file.exists(out_result3)){
  result3 = bench::mark(
    arrow = {collect(q3_arrow)} ,
    polars = {collect(q3_polars)},
    check = FALSE,
    iterations = 5
  )
  result3 %>% saveRDS(out_result3)
} else {
  result3 = readRDS(out_result3)
}
Query 4: Select, filter, type casting, grou means, counts, min/max
if (!file.exists(out_result4)){
  result4 = bench::mark(
    arrow = {collect(q4_arrow)},
    polars = {collect(q4_polars)},
    check = FALSE,
    iterations = 5
  )
  result4 %>% saveRDS(out_result4)
} else {
  result4 = readRDS(out_result4)
}
Compile benchmarks
df_results1 = result1 %>% 
  unnest(c(time, gc)) %>%
  mutate(query = "Query 1")

df_results2 = result2 %>%
  unnest(c(time, gc)) %>%
  mutate(query = "Query 2")

df_results3 = result3 %>%
  unnest(c(time, gc)) %>%
  mutate(query = "Query 3")

df_results4 = result4 %>%
  unnest(c(time, gc)) %>%
  mutate(query = "Query 4")

df_results1_4 = bind_rows(df_results1, df_results2, df_results3, df_results4)

Benchmark results and Takeaways

Lets take a look at the streaming results.

Streaming Benchmark Results
df_results1_4 %>%
  ggplot(aes(x = expression, y = time, color = expression)) +
  labs(title = "Streaming Benchmark Results",
       x = "Dataframe engine",
       y = "Time (ms)") +
  geom_point() +
  facet_wrap(~query)

Takeaways

  • When streaming (out-of-memory computations) arrow seems consistently more performant
  • From a user experience perspective, Arrow tidyverse binding is a little more polished than Polars as its been around longer and more tidyverse functions are supported. So for the moment it might be easier to use.
  • From a flexiblity persepctive, Tidypolars may not be as polished but you also gain access to all of polars API via Rpolars. This gives this work flow a lot of flexibility as Polars API is very powerful and much more accessible than the lower level Arrow API.

In-memory computations

However, sometimes we can fit the entire dataset into memory. But the dataframe engines will significantly affect speed. here we we test polars, data.tables and duckplyr

Repeat other benchmarks

This is a benchmark done from tidypolars

In-memory Benchmark with iris
## Setup Iris tables
large_iris <- data.table::rbindlist(rep(list(iris), 100000))
tibble_iris = as_tibble(large_iris)
large_iris_pl <- as_polars_lf(large_iris)
large_iris_dt <- lazy_dt(large_iris)
format(nrow(large_iris), big.mark = ",")
[1] "15,000,000"
In-memory Benchmark with iris
## Benchmark
out_results_mem1 = "tmp/results_mem1.rds"
if (!file.exists(out_results_mem1)){
  
  results_mem1 = bench::mark(
    polars = {
      large_iris_pl |>
        select(starts_with(c("Sep", "Pet"))) |>
        mutate(
          petal_type = ifelse((Petal.Length / Petal.Width) > 3, "long", "large")
        ) |> 
        filter(between(Sepal.Length, 4.5, 5.5)) |> 
        compute()
    },
    dplyr = {
      tibble_iris |>
        select(starts_with(c("Sep", "Pet"))) |>
        mutate(
          petal_type = ifelse((Petal.Length / Petal.Width) > 3, "long", "large")
        ) |>
        filter(between(Sepal.Length, 4.5, 5.5))
    },
    duckplyr = {
      as_duckplyr_df(tibble_iris) |>
        select(starts_with(c("Sep", "Pet"))) |>
        mutate(
          petal_type = ifelse((Petal.Length / Petal.Width) > 3, "long", "large")
        ) |>
        filter(between(Sepal.Length, 4.5, 5.5))
    },
    dtplyr = {
      large_iris_dt |>
        select(starts_with(c("Sep", "Pet"))) |>
        mutate(
          petal_type = ifelse((Petal.Length / Petal.Width) > 3, "long", "large")
        ) |>
        filter(between(Sepal.Length, 4.5, 5.5)) |> 
        as.data.frame()
    },
    check = FALSE,
    iterations = 10
  )
  results_mem1 %>% saveRDS(out_results_mem1)
} else {
  results_mem1 = readRDS(out_results_mem1)
}


## Table
results_mem1
expression min median itr/sec mem_alloc gc/sec
polars 130.7ms 233.46ms 4.5840645 1.29MB 0.4584064
dplyr 2.49s 2.67s 0.3694028 1.34GB 0.6649251
duckplyr 2.53s 2.9s 0.3458887 1.36GB 0.5880108
dtplyr 583.37ms 697.57ms 1.4555396 1.72GB 3.2021870
In-memory Benchmark with iris
## Viz
results_mem1 %>%
  unnest(c(time, gc)) %>% 
  mutate(expression = as.character(expression)) %>%
  ggplot(aes(x = expression, y = time, color = expression)) +
  labs(title = "In memory Benchmarks",
       x = "Dataframe engine",
       y = "Time (ms)") +
  geom_point() 

DIN Benchmarks

Lets first load in some DIN data into memory to work with.

Load 3 years of AR DIN data into memory
data_ar = db_arrow %>% 
  filter(iso2 == "AR") %>% 
  filter(between(year, 2008, 2012)) %>% 
  collect() 

dim(data_ar)
[1] 1070138     235
Load 3 years of AR DIN data into memory
data_ar %>% slice(1:5)
SALID1 SALID2 SALID1OCC SALID2OCC YEAR MONTH DINAGE DINAGE5C DINIMAGE DINMALE DINIMMALE DINICD DINCOD_ICD DINCOD_ICD_FINAL DINIMCODICD DINCOD_FINAL1 DINCOD_FINAL2 DINCOD_FINAL3 DINIMCODTIER DINIAGEDYS DINIAGECAT DINBW DINBWCAT DINIGA DINIGACAT DINMAGE DINMAGECAT DINMEDU1 DINMEDU2 INCLUDE_NOUNITCHG_L1AD INCLUDE_NOBDRYCHG_L1AD INCLUDE_NOUNITCHG_L2 INCLUDE_NOBDRYCHG_L2 ICD_IT1 ICD_IT2 ICD_IT3 ICD_IT4 ICD_IT5 ICD_IT6 ICD_IT7 ICD_IT8 ICD_IT9 ICD_IT10 ICD_IT11 ICD_IT12 ICD_IT13 ICD_IT14 ICD_IT15 ICD_IT16 ICD_IT17 ICD_IT18 ICD_IT19 ICD_IT20 ICD_IT21 ICD_IT22 ICD_IT23 ICD_IT24 ICD_IT25 ICD_IT26 ICD_IT27 ICD_IT28 ICD_IT29 ICD_IT30 ICD_IT31 ICD_IT32 ICD_IT33 ICD_IT34 ICD_IT35 ICD_IT36 ICD_IT37 ICD_IT38 ICD_IT39 ICD_IT40 ICD_IT41 ICD_IT42 ICD_IT43 ICD_IT44 ICD_IT45 ICD_IT46 ICD_IT47 ICD_IT48 ICD_IT49 ICD_IT50 ICD_IT51 ICD_IT52 ICD_IT53 ICD_IT54 ICD_IT55 ICD_IT56 ICD_IT57 ICD_IT58 ICD_IT59 ICD_IT60 ICD_IT61 ICD_IT62 ICD_IT63 ICD_IT64 ICD_IT65 ICD_IT66 ICD_IT67 ICD_IT68 ICD_IT69 ICD_IT70 ICD_IT71 ICD_IT72 ICD_IT73 ICD_IT74 ICD_IT75 ICD_IT76 ICD_IT77 ICD_IT78 ICD_IT79 ICD_IT80 ICD_IT81 ICD_IT82 ICD_IT83 ICD_IT84 ICD_IT85 ICD_IT86 ICD_IT87 ICD_IT88 ICD_IT89 ICD_IT90 ICD_IT91 ICD_IT92 ICD_IT93 ICD_IT94 ICD_IT95 ICD_IT96 ICD_IT97 ICD_IT98 ICD_IT99 ICD_IT100 GHE3_IT1 GHE3_IT2 GHE3_IT3 GHE3_IT4 GHE3_IT5 GHE3_IT6 GHE3_IT7 GHE3_IT8 GHE3_IT9 GHE3_IT10 GHE3_IT11 GHE3_IT12 GHE3_IT13 GHE3_IT14 GHE3_IT15 GHE3_IT16 GHE3_IT17 GHE3_IT18 GHE3_IT19 GHE3_IT20 GHE3_IT21 GHE3_IT22 GHE3_IT23 GHE3_IT24 GHE3_IT25 GHE3_IT26 GHE3_IT27 GHE3_IT28 GHE3_IT29 GHE3_IT30 GHE3_IT31 GHE3_IT32 GHE3_IT33 GHE3_IT34 GHE3_IT35 GHE3_IT36 GHE3_IT37 GHE3_IT38 GHE3_IT39 GHE3_IT40 GHE3_IT41 GHE3_IT42 GHE3_IT43 GHE3_IT44 GHE3_IT45 GHE3_IT46 GHE3_IT47 GHE3_IT48 GHE3_IT49 GHE3_IT50 GHE3_IT51 GHE3_IT52 GHE3_IT53 GHE3_IT54 GHE3_IT55 GHE3_IT56 GHE3_IT57 GHE3_IT58 GHE3_IT59 GHE3_IT60 GHE3_IT61 GHE3_IT62 GHE3_IT63 GHE3_IT64 GHE3_IT65 GHE3_IT66 GHE3_IT67 GHE3_IT68 GHE3_IT69 GHE3_IT70 GHE3_IT71 GHE3_IT72 GHE3_IT73 GHE3_IT74 GHE3_IT75 GHE3_IT76 GHE3_IT77 GHE3_IT78 GHE3_IT79 GHE3_IT80 GHE3_IT81 GHE3_IT82 GHE3_IT83 GHE3_IT84 GHE3_IT85 GHE3_IT86 GHE3_IT87 GHE3_IT88 GHE3_IT89 GHE3_IT90 GHE3_IT91 GHE3_IT92 GHE3_IT93 GHE3_IT94 GHE3_IT95 GHE3_IT96 GHE3_IT97 GHE3_IT98 GHE3_IT99 GHE3_IT100 iso2 year
101115 10111510 101115 10111510 2008 10 43 40 0 1 0 10 B909 B909 0 10 20 30 0 15705.75 88 99999 99 99999 99 99999 99 99 99 1 1 1 1 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 AR 2008
101112 10111248 101888 10188888 2008 3 92 90 0 0 0 10 B909 B909 0 10 20 30 0 33603 88 99999 99 99999 99 99999 99 99 99 1 1 1 1 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 AR 2008
101112 10111226 101888 10188888 2008 4 1 1 0 1 0 10 A170 A170 0 10 20 30 0 365.25 3 99999 99 99999 99 43 7 2 2 1 1 1 1 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 A170 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 AR 2008
101112 10111235 101112 10111230 2008 2 37 35 0 1 0 10 A169 A169 0 10 20 30 0 13514.25 88 99999 99 99999 99 99999 99 99 99 1 1 1 1 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 A169 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 AR 2008
101128 10112814 101128 10112814 2008 6 86 85 0 1 0 10 B909 B909 0 10 20 30 0 31411.5 88 99999 99 99999 99 99999 99 99 99 1 1 1 1 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 B909 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 30 AR 2008

Lets convert this into the various dataframe representations we want to work with.

Code
## Polars
df_polars = as_polars_df(data_ar)

## data.tables
df_dt <- lazy_dt(data_ar)

## duckplyr
df_duck = duckplyr::as_duckplyr_df(data_ar)

Lets just use a query simliar to Query 4 from the streaming results

In-memory Benchmark
if (!file.exists(out_result5)){
  result5 = bench::mark( 
    polars_memory = {
      df_polars   %>%
        select( SALID1, SALID2, YEAR, MONTH, 
                DINAGE, DINICD, iso2,
                SALID1OCC, DINCOD_ICD_FINAL) %>%
        mutate(
          DINAGE_num = as.numeric(DINAGE),
          MONTH_num = as.numeric(MONTH),
          YEAR_num = as.numeric(YEAR)
        ) %>%
        # Add a new variable based on conditions.
        mutate(
          AgeGroup = case_when(
            DINAGE_num < 18 ~ "Child",
            DINAGE_num < 65 ~ "Adult",
            TRUE ~ "Senior"
          ),
          ICD_Severity = if_else(DINCOD_ICD_FINAL == "B909", "High", "Low")
        )  %>%
        # Grouping by multiple columns to calculate metrics.
        group_by(SALID2, AgeGroup, ICD_Severity) %>%
        # Summary calculations with added complexity.
        summarize(
          average_age = mean(DINAGE_num, na.rm = TRUE),
          count = n(),
          min_month = min(MONTH_num, na.rm = TRUE),
          max_month = max(MONTH_num, na.rm = TRUE),
          # diversity_index = n_distinct(SALID1),
          avg_year = mean(YEAR_num, na.rm = TRUE),
          .groups = 'drop'
        )
    },
    dplyr = {
      as_tibble(data_ar)  %>%
        select( SALID1, SALID2, YEAR, MONTH, 
                DINAGE, DINICD, iso2,
                SALID1OCC, DINCOD_ICD_FINAL) %>%
        mutate(
          DINAGE_num = as.numeric(DINAGE),
          MONTH_num = as.numeric(MONTH),
          YEAR_num = as.numeric(YEAR)
        ) %>%
        # Add a new variable based on conditions.
        mutate(
          AgeGroup = case_when(
            DINAGE_num < 18 ~ "Child",
            DINAGE_num < 65 ~ "Adult",
            TRUE ~ "Senior"
          ),
          ICD_Severity = if_else(DINCOD_ICD_FINAL == "B909", "High", "Low")
        )  %>%
        # Grouping by multiple columns to calculate metrics.
        group_by(SALID2, AgeGroup, ICD_Severity) %>%
        # Summary calculations with added complexity.
        summarize(
          average_age = mean(DINAGE_num, na.rm = TRUE),
          count = n(),
          min_month = min(MONTH_num, na.rm = TRUE),
          max_month = max(MONTH_num, na.rm = TRUE),
          # diversity_index = n_distinct(SALID1),
          avg_year = mean(YEAR_num, na.rm = TRUE),
          .groups = 'drop'
        )
    },
    dt_memory = {
      df_dt  %>%
        select( SALID1, SALID2, YEAR, MONTH, 
                DINAGE, DINICD, iso2,
                SALID1OCC, DINCOD_ICD_FINAL) %>%
        mutate(
          DINAGE_num = as.numeric(DINAGE),
          MONTH_num = as.numeric(MONTH),
          YEAR_num = as.numeric(YEAR)
        ) %>%
        # Add a new variable based on conditions.
        mutate(
          AgeGroup = case_when(
            DINAGE_num < 18 ~ "Child",
            DINAGE_num < 65 ~ "Adult",
            TRUE ~ "Senior"
          ),
          ICD_Severity = if_else(DINCOD_ICD_FINAL == "B909", "High", "Low")
        )  %>%
        # Grouping by multiple columns to calculate metrics.
        group_by(SALID2, AgeGroup, ICD_Severity) %>%
        # Summary calculations with added complexity.
        summarize(
          average_age = mean(DINAGE_num, na.rm = TRUE),
          count = n(),
          min_month = min(MONTH_num, na.rm = TRUE),
          max_month = max(MONTH_num, na.rm = TRUE),
          # diversity_index = n_distinct(SALID1),
          avg_year = mean(YEAR_num, na.rm = TRUE),
          .groups = 'drop'
        ) |> 
      as.data.frame()
      },
    duckplyr_memory = {
      df_duck %>%
        select( SALID1, SALID2, YEAR, MONTH, 
                DINAGE, DINICD, iso2,
                SALID1OCC, DINCOD_ICD_FINAL) %>%
        mutate(
          DINAGE_num = as.numeric(DINAGE),
          MONTH_num = as.numeric(MONTH),
          YEAR_num = as.numeric(YEAR)
        ) %>%
        # Add a new variable based on conditions.
        mutate(
          AgeGroup = case_when(
            DINAGE_num < 18 ~ "Child",
            DINAGE_num < 65 ~ "Adult",
            TRUE ~ "Senior"
          ),
          ICD_Severity = if_else(DINCOD_ICD_FINAL == "B909", "High", "Low")
        )  %>%
        # Grouping by multiple columns to calculate metrics.
        group_by(SALID2, AgeGroup, ICD_Severity) %>%
        # Summary calculations with added complexity.
        summarize(
          average_age = mean(DINAGE_num, na.rm = TRUE),
          count = n(),
          min_month = min(MONTH_num, na.rm = TRUE),
          max_month = max(MONTH_num, na.rm = TRUE),
          # diversity_index = n_distinct(SALID1),
          avg_year = mean(YEAR_num, na.rm = TRUE),
          .groups = 'drop'
        )
    },
    check = FALSE,
    iterations = 10
  )
  result5 %>% saveRDS(out_result5)
} else {
  result5 = readRDS(out_result5)
}

Now lets take a look at the in-memory results

Code
## Table
result5
expression min median itr/sec mem_alloc gc/sec
polars_memory 283ms 331ms 2.984482 5.21MB 5.6705163
dplyr 606ms 672ms 1.458353 264.2MB 0.2916706
dt_memory 573ms 691ms 1.382882 239.15MB 0.1382882
duckplyr_memory 606ms 760ms 1.351704 264.12MB 0.5406816
Code
## Viz
result5 %>%
  unnest(c(time, gc)) %>% 
  mutate(expression = as.character(expression)) %>%
  ggplot(aes(x = expression, y = time, color = expression)) +
  labs(title = "In memory Benchmarks",
       x = "Dataframe engine",
       y = "Time (ms)") +
  geom_point() 

Recomendations

Consider streaming computations if RAM is limited

  • Arrow will cover your basics and is currentlymore performant and user friendly, so start here
  • If you need more flexiblity in your data manipulation (e.g. not supported in Arrow) then leverage Tidypolars with Rpolars.

Consider Distribute File storage for batch processing

  • many analysts process data in subsets. Just storing results in a organize distribute fashion allow others to access the result as a single entity without needing to pull it all into memory
  • this fits well with distirbuted processing but allows collective access

Consider Polars for In-memory computations

  • Our basic benchmark shows polars is more performant in memory
  • consider including Polars in benchmarks when developing big data ETL pipelines

Benchmarking code

library(arrow)
library(data.table)
library(dplyr)
library(tictoc)


## Distributed file system
tic()
con = open_dataset("//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/Wellcome_Trust/Data Methods Core/Data/Mortality Data/DIN/Archive/apache_demo_4-17-24/hdfs")
con %>% 
  filter(iso2 == 'BR') %>% 
  count(YEAR, DINMALE, DINIAGECAT,
        name = 'n_deaths') %>% 
  collect()
toc()

## Traditional files
tic()
df = fread("//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/Wellcome_Trust/Data Methods Core/Data/Mortality Data/DIN/DINBR_20240109.csv")
df %>% 
  filter(iso2 == 'BR') %>% 
  count(YEAR, DINMALE, DINIAGECAT,
        name = 'n_deaths') 
toc()

This benchmark was done on a high power PC with 64 GB RAM (as this is required for the traditional workflow).