EDA 4-7-2024 - Hive, Arrow and Big Data in R

Harnessing Hive and Arrow for Big Data Mastery in R
Author

Ran Li

Published

April 7, 2024

Intro

In the data science realm, R users frequently encounter the challenge of managing datasets too large to fit into memory. Apache Hive and Apache Arrow offer a solution, enabling efficient data storage and processing. Hive, with its SQL-like interface, allows for scalable management of big data across distributed storage, while Arrow facilitates fast in-memory data processing and interoperability between systems. This post introduces how R users can leverage Hive’s partitioning to organize data effectively and use Arrow for enhanced querying capabilities, providing a practical approach to handling big data challenges within the R ecosystem.

Here lets create a Hive partitioned dataset. Lets imagine that we are working on a desktop with 8 GB of RAM meaning but the data we are wrangling is a total of 500GB of data but split into smaller chunk such as PRISM data or HCUP data. So we can process and 8 GB chunks but how can we save our results in a way to enable software to query the end result as a single database? That is where Apache Hive, a core technologist in distributed computing comes in to save the day!

Lets do an example of Hive partitioning with a common teaching dataset gapminder

## Load
library(pacman)
p_load(gapminder, tidyverse, glue, sfarrow)
data("gapminder") 
df_gapminder = gapminder %>%
  mutate(across(c(country, continent), as.character))

## Display subset
set.seed(123)
sample_n(df_gapminder, size = 10)
country continent year lifeExp pop gdpPercap
Denmark Europe 1982 74.630 5117810 21688.040
Egypt Africa 1982 56.006 45681811 3503.730
Brazil Americas 2002 71.006 179914212 8131.213
Finland Europe 1997 77.130 5134406 23723.950
Burkina Faso Africa 1962 37.814 4919632 722.512
Malaysia Asia 1957 52.102 7739235 1810.067
Norway Europe 1957 73.440 3491938 11653.973
Senegal Africa 1962 41.454 3430243 1654.989
Puerto Rico Americas 1972 72.160 2847132 9123.042
Reunion Africa 1987 71.913 562035 5303.377

This data set is not big - it only has 1704 rows. But it is structure common to many urban health datasets in that each row is a specific instance of country, year and some metadata (continent) and data (lifeExp, pop, gdb). So we can use it as a representation of large dataset, lets say the data was not for country but for census-tract/L3 and the time was not year but daily. If that was the case the data would be too big to handle in-memory so we would need to query it as a database.

Raw data

Lets assume this was a huge dataset with billions of rows where the raw data was stored by paritioned by some attribute such as county, date, or geographic units. Our analysts would then usually process each of the country-year files in-memory and save the result.

For example if for gapminder it could be that the raw data was sotred by contintent and this is what our Analysts start with. Lets generate that first!

df_gapminder %>%
  group_by(continent) %>%
  group_walk(~{
    continent_name <- .y$continent
    raw_out <- glue('raw_data/gapminder_{continent_name}_raw.parquet')
    df_result = .x %>% mutate(continent = continent_name)
    arrow::write_parquet(df_result, raw_out)
  })

Now we have our raw data in a way that is commonly stored!

Regular workflow

So lets look at how our original work flow would work. Analyst would process each raw dataset individually and save results individually. We will iterate through these just to show that processing is done similarly for each dataset and save the results in /cleaned_data_legacy.

list.files('raw_data/', full.names = T) %>% 
  walk(~{
    
    # Import Raw data
    df_raw_data = arrow::read_parquet(.x)
    
    ## Some processing
    df_cleaned = df_raw_data %>% 
      mutate(new_variable = 'cleaned_value')
    
    ## Save cleaned results
    out = glue('cleaned_data_legacy/gapminder_{unique(df_cleaned$continent)}_cleaned.parquet')
    df_cleaned %>% arrow::write_parquet(sink = out)
    
  })

Now its done! We have our traditional results stored in this manner.

Hive Partitioned

While the workflow above works it is still working in-memory and the amount of data you can process both at a individual table level as well as the whole data resource is limited by the amount of RAM you have access to. Here we show how with very little change in workflow (just save your cleaned files in a more structured way) we can enable out-of-memory analytics!

When saving data in a Hive-partitioned style, the naming convention and structure of your directories and files play a critical role in how Hive and other big data tools interpret and access your data. There is only one rule … your folders should use key=value format for naming partitioned directories (e.g., continent=Asia).

For example you can save gapminder by continent in like

gapminder_by_continent/
├── continent=Africa/
│   └── data.parquet
├── continent=Asia/
│   └── data.parquet
├── continent=Europe/
│   └── data.parquet
...

Some things to remember:

  • Only rule:your folders should use key=value format for naming partitioned directories (e.g., continent=Asia).

  • what you name your parent folder gapminder_by_continent or data files data.parquet don’t matter so choose meaningful names for organization and clarity.

  • you can nest with as many layers as you want just add nested folders

Below we process and save in Hive partitioned style

list.files('raw_data/', full.names = T) %>% 
  walk(~{
    
    # Import Raw data
    df_raw_data = arrow::read_parquet(.x)
    
        
    ## Some processing
    df_cleaned = df_raw_data %>% 
      mutate(new_variable = 'cleaned_value')

    # Create Hive Partition folder
    continent_tmp = unique(df_raw_data$continent)
    partition_out = glue("cleaned_data_hive/continent={continent_tmp}")
    dir.create(partition_out, recursive = TRUE, showWarnings = FALSE)
    
    ## Write partitioned data
    data_out <- file.path(partition_out, "data.parquet")
    arrow::write_parquet(df_cleaned, data_out)
  
    })

The result of this is a folder for each partition with the data in each folder.

User facing product: Hive-partitioned dataset

The benefit of jjust storing your data in this style is that you can now use big data tools suchs as Apache Arrow to handle massive amounts of data that could not fit in a single machine’s RAM. Lets see how we can query our partitioned dataset.

Apache Arrow is a industry standard big data tool. You can connect to structured data such as indvidual files (.csv, parquet) or distirbuted formats such as Hive-partitioned folders as databases and run multi-lingual queries in R, SQL, Python, Javascript, Julia … etc.

Since we structured our cleaned data in Hive-partition we can now use that as a database. Lets connect to it!

## Open the database
db = arrow::open_dataset("cleaned_data_hive/")

## View avaialable columns
db
FileSystemDataset with 5 Parquet files
country: string
year: int32
lifeExp: double
pop: int32
gdpPercap: double
continent: string
new_variable: string

Now that we are connected we can start running queries. lets just pull data for USA and Austria and calculate average lifeExp over all years

#|code-summary: 'Querying our Hive-paritioned data'
db %>% 
  filter(country%in%c("Austria", "United States")) %>% 
  group_by(country) %>% 
  summarize(meanLifeExp = mean(lifeExp)) %>% 
  collect()
country meanLifeExp
United States 73.47850
Austria 73.10325

Just like that without changing our workflow we can now effectively access and query our dataset without having to worry about memory limitations!

Summary

This work flow greatly enhances the amount of data analysts can work effectively with. Specifically

  • scalability by distribution

  • flexibility in how you parititon

  • parallelism

    • files stored like this is great for parallel compute!

Use Cases

Hive partitioned spatial databases

A really interested application of hive partitioning is for geographic boundaries where we want scalability because they could get big and parallaliability where we often want to run parallel processes on spatial units. Lets explore how Hive partitioning can work for this application.

We currently have a folder that contains tracts fo each state as geoparquet in this path //files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/SpatialAnalyticMethods_SAM/clean/tract_2010_albers/geoparquet. Lets see if we can use Arrow to query this spatial database.

## Connect to database
db_path = '//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/SpatialAnalyticMethods_SAM/clean/tract_2010_albers/geoparquet'
db = arrow::open_dataset(db_path)

## Query for boundaries for tract '42101000500'
result = db %>% 
  filter(GEOID == '42101000500') %>% 
  collect()
result
STATEFP COUNTYFP TRACTCE GEOID NAME NAMELSAD MTFCC FUNCSTAT ALAND AWATER INTPTLAT INTPTLON geometry
42 101 000500 42101000500 5 Census Tract 5 G5020 S 428760 0 +39.9519533 -075.1581771 01, 06, 00, 00, 00, 01, 00, 00, 00, 01, 03, 00, 00, 00, 01, 00, 00, 00, 36, 00, 00, 00, 15, cc, 02, 43, 90, b4, 3a, 41, db, a8, 2a, 51, 8c, a6, 3f, 41, 66, 79, fb, 2d, 8f, b4, 3a, 41, 4c, be, bc, 98, ad, a6, 3f, 41, c9, 59, bc, 80, 83, b4, 3a, 41, e9, b7, 18, 77, 21, a7, 3f, 41, d9, 90, 47, 54, e6, b4, 3a, 41, d1, ce, c1, bd, 23, a7, 3f, 41, 63, b8, 1b, d9, 12, b5, 3a, 41, 7a, 35, 35, a0, 25, a7, 3f, 41, b6, 1c, f1, 7a, 19, b5, 3a, 41, 0d, 1a, 48, d7, 25, a7, 3f, 41, 96, 31, 13, af, 16, b5, 3a, 41, 81, 8b, b8, 63, 65, a7, 3f, 41, bd, 82, 8e, 05, 13, b5, 3a, 41, bc, e8, 6e, 5d, 9c, a7, 3f, 41, 46, 7f, ec, b0, 6d, b5, 3a, 41, 60, 0e, f5, c2, a1, a7, 3f, 41, 07, 82, 93, ae, c5, b5, 3a, 41, 46, 10, 25, 55, a6, a7, 3f, 41, 87, 54, 33, 3f, 4c, b6, 3a, 41, 8b, 0c, 36, db, ae, a7, 3f, 41, f0, 46, ec, 81, 6f, b6, 3a, 41, 0d, ac, 8a, a1, b0, a7, 3f, 41, ca, a9, 8d, 04, 9d, b6, 3a, 41, 34, e6, 27, bd, b2, a7, 3f, 41, 5e, f2, e3, 15, d3, b6, 3a, 41, f4, d9, 7a, 60, b5, a7, 3f, 41, 2c, 9c, 3d, 04, 5a, b7, 3a, 41, e1, d4, c3, 43, bc, a7, 3f, 41, bb, 61, fb, 32, 95, b7, 3a, 41, c3, 19, 25, b3, bf, a7, 3f, 41, 70, ea, d8, ee, e0, b7, 3a, 41, 9a, 19, 59, 9c, c3, a7, 3f, 41, e9, e9, c9, c6, 67, b8, 3a, 41, 79, be, 83, 49, cb, a7, 3f, 41, c8, 39, 6b, 16, b6, b8, 3a, 41, 39, 5a, c5, 50, cf, a7, 3f, 41, ed, 36, f4, 57, f1, b8, 3a, 41, b2, 66, d4, 6c, d2, a7, 3f, 41, 63, bd, dc, 74, f6, b8, 3a, 41, dd, fd, bc, 3a, 6f, a7, 3f, 41, 79, 25, 48, 15, fd, b8, 3a, 41, fd, a3, fb, 58, fa, a6, 3f, 41, a7, ec, b5, 4d, fc, b8, 3a, 41, b7, 79, de, e3, ed, a6, 3f, 41, a9, 97, 92, ff, fb, b8, 3a, 41, bf, 89, b8, c0, e4, a6, 3f, 41, 54, 30, 9e, 62, 02, b9, 3a, 41, e2, 40, 82, 1e, 80, a6, 3f, 41, 84, 1f, 44, 64, 05, b9, 3a, 41, e3, 89, 5c, fe, 41, a6, 3f, 41, 31, 4c, e5, 11, 7e, b8, 3a, 41, ce, 8d, 39, 18, 3a, a6, 3f, 41, ce, 18, 3a, cd, 42, b8, 3a, 41, 71, 6b, b0, a3, 36, a6, 3f, 41, 96, 0b, 7d, 5f, 30, b8, 3a, 41, 17, ba, 70, a1, 35, a6, 3f, 41, f2, 79, 89, 3e, f7, b7, 3a, 41, fa, 3a, fc, 4e, 32, a6, 3f, 41, ec, ef, 65, b0, b8, b7, 3a, 41, ee, 0b, 75, cf, 2e, a6, 3f, 41, 18, 86, fd, 97, 70, b7, 3a, 41, ad, a0, 7a, e8, 2a, a6, 3f, 41, dd, 77, 3b, 98, 0d, b7, 3a, 41, 51, 91, a5, 7c, 25, a6, 3f, 41, b7, 2b, 58, dc, e9, b6, 3a, 41, 7b, a1, d8, 7d, 23, a6, 3f, 41, 71, 02, 7b, 00, 63, b6, 3a, 41, bb, 00, ab, 79, 1b, a6, 3f, 41, 55, e8, 64, 14, dc, b5, 3a, 41, 21, 89, c8, 22, 14, a6, 3f, 41, 41, 88, 01, d3, 84, b5, 3a, 41, 84, 17, dc, 94, 0e, a6, 3f, 41, 97, 2a, 35, 3f, 2a, b5, 3a, 41, 52, 70, d4, b2, 0a, a6, 3f, 41, a9, fd, 13, 64, 23, b5, 3a, 41, 36, 49, 9c, 51, 0a, a6, 3f, 41, 51, 7c, e7, 07, 1d, b5, 3a, 41, d2, b0, ae, 0c, 0a, a6, 3f, 41, 22, 15, 11, 3a, 1b, b5, 3a, 41, 77, 2e, e8, f4, 2e, a6, 3f, 41, c6, de, 58, 84, 19, b5, 3a, 41, 30, 5f, 79, 53, 50, a6, 3f, 41, ce, d9, 6b, 69, 18, b5, 3a, 41, d5, b6, 21, f3, 52, a6, 3f, 41, cb, a3, 8c, 08, 14, b5, 3a, 41, 42, 06, 71, 06, 57, a6, 3f, 41, 4f, 66, 6a, 04, 0c, b5, 3a, 41, b6, a2, d5, 3d, 5c, a6, 3f, 41, fb, 59, a0, 36, fb, b4, 3a, 41, 49, e8, 3f, 90, 5a, a6, 3f, 41, 01, b1, db, c6, ec, b4, 3a, 41, 26, 15, d1, 54, 5a, a6, 3f, 41, 6a, 21, 60, de, dd, b4, 3a, 41, 24, 9d, 1a, e1, 59, a6, 3f, 41, ce, 0d, 82, 1e, ce, b4, 3a, 41, 5b, 1f, f5, b2, 59, a6, 3f, 41, d1, 4b, 4a, 5d, be, b4, 3a, 41, ac, f3, 3b, 52, 5a, a6, 3f, 41, 13, 44, d6, fa, af, b4, 3a, 41, 3a, 05, 7f, f8, 5c, a6, 3f, 41, 15, 3f, 4f, d8, a3, b4, 3a, 41, f5, ee, 03, c3, 62, a6, 3f, 41, 14, ab, 78, ba, 9b, b4, 3a, 41, 15, 71, 0b, 45, 6e, a6, 3f, 41, 15, cc, 02, 43, 90, b4, 3a, 41, db, a8, 2a, 51, 8c, a6, 3f, 41
42 101 000500 42101000500 5 Census Tract 5 G5020 S 428760 0 +39.9519533 -075.1581771 01, 06, 00, 00, 00, 01, 00, 00, 00, 01, 03, 00, 00, 00, 01, 00, 00, 00, 36, 00, 00, 00, 15, cc, 02, 43, 90, b4, 3a, 41, db, a8, 2a, 51, 8c, a6, 3f, 41, 66, 79, fb, 2d, 8f, b4, 3a, 41, 4c, be, bc, 98, ad, a6, 3f, 41, c9, 59, bc, 80, 83, b4, 3a, 41, e9, b7, 18, 77, 21, a7, 3f, 41, d9, 90, 47, 54, e6, b4, 3a, 41, d1, ce, c1, bd, 23, a7, 3f, 41, 63, b8, 1b, d9, 12, b5, 3a, 41, 7a, 35, 35, a0, 25, a7, 3f, 41, b6, 1c, f1, 7a, 19, b5, 3a, 41, 0d, 1a, 48, d7, 25, a7, 3f, 41, 96, 31, 13, af, 16, b5, 3a, 41, 81, 8b, b8, 63, 65, a7, 3f, 41, bd, 82, 8e, 05, 13, b5, 3a, 41, bc, e8, 6e, 5d, 9c, a7, 3f, 41, 46, 7f, ec, b0, 6d, b5, 3a, 41, 60, 0e, f5, c2, a1, a7, 3f, 41, 07, 82, 93, ae, c5, b5, 3a, 41, 46, 10, 25, 55, a6, a7, 3f, 41, 87, 54, 33, 3f, 4c, b6, 3a, 41, 8b, 0c, 36, db, ae, a7, 3f, 41, f0, 46, ec, 81, 6f, b6, 3a, 41, 0d, ac, 8a, a1, b0, a7, 3f, 41, ca, a9, 8d, 04, 9d, b6, 3a, 41, 34, e6, 27, bd, b2, a7, 3f, 41, 5e, f2, e3, 15, d3, b6, 3a, 41, f4, d9, 7a, 60, b5, a7, 3f, 41, 2c, 9c, 3d, 04, 5a, b7, 3a, 41, e1, d4, c3, 43, bc, a7, 3f, 41, bb, 61, fb, 32, 95, b7, 3a, 41, c3, 19, 25, b3, bf, a7, 3f, 41, 70, ea, d8, ee, e0, b7, 3a, 41, 9a, 19, 59, 9c, c3, a7, 3f, 41, e9, e9, c9, c6, 67, b8, 3a, 41, 79, be, 83, 49, cb, a7, 3f, 41, c8, 39, 6b, 16, b6, b8, 3a, 41, 39, 5a, c5, 50, cf, a7, 3f, 41, ed, 36, f4, 57, f1, b8, 3a, 41, b2, 66, d4, 6c, d2, a7, 3f, 41, 63, bd, dc, 74, f6, b8, 3a, 41, dd, fd, bc, 3a, 6f, a7, 3f, 41, 79, 25, 48, 15, fd, b8, 3a, 41, fd, a3, fb, 58, fa, a6, 3f, 41, a7, ec, b5, 4d, fc, b8, 3a, 41, b7, 79, de, e3, ed, a6, 3f, 41, a9, 97, 92, ff, fb, b8, 3a, 41, bf, 89, b8, c0, e4, a6, 3f, 41, 54, 30, 9e, 62, 02, b9, 3a, 41, e2, 40, 82, 1e, 80, a6, 3f, 41, 84, 1f, 44, 64, 05, b9, 3a, 41, e3, 89, 5c, fe, 41, a6, 3f, 41, 31, 4c, e5, 11, 7e, b8, 3a, 41, ce, 8d, 39, 18, 3a, a6, 3f, 41, ce, 18, 3a, cd, 42, b8, 3a, 41, 71, 6b, b0, a3, 36, a6, 3f, 41, 96, 0b, 7d, 5f, 30, b8, 3a, 41, 17, ba, 70, a1, 35, a6, 3f, 41, f2, 79, 89, 3e, f7, b7, 3a, 41, fa, 3a, fc, 4e, 32, a6, 3f, 41, ec, ef, 65, b0, b8, b7, 3a, 41, ee, 0b, 75, cf, 2e, a6, 3f, 41, 18, 86, fd, 97, 70, b7, 3a, 41, ad, a0, 7a, e8, 2a, a6, 3f, 41, dd, 77, 3b, 98, 0d, b7, 3a, 41, 51, 91, a5, 7c, 25, a6, 3f, 41, b7, 2b, 58, dc, e9, b6, 3a, 41, 7b, a1, d8, 7d, 23, a6, 3f, 41, 71, 02, 7b, 00, 63, b6, 3a, 41, bb, 00, ab, 79, 1b, a6, 3f, 41, 55, e8, 64, 14, dc, b5, 3a, 41, 21, 89, c8, 22, 14, a6, 3f, 41, 41, 88, 01, d3, 84, b5, 3a, 41, 84, 17, dc, 94, 0e, a6, 3f, 41, 97, 2a, 35, 3f, 2a, b5, 3a, 41, 52, 70, d4, b2, 0a, a6, 3f, 41, a9, fd, 13, 64, 23, b5, 3a, 41, 36, 49, 9c, 51, 0a, a6, 3f, 41, 51, 7c, e7, 07, 1d, b5, 3a, 41, d2, b0, ae, 0c, 0a, a6, 3f, 41, 22, 15, 11, 3a, 1b, b5, 3a, 41, 77, 2e, e8, f4, 2e, a6, 3f, 41, c6, de, 58, 84, 19, b5, 3a, 41, 30, 5f, 79, 53, 50, a6, 3f, 41, ce, d9, 6b, 69, 18, b5, 3a, 41, d5, b6, 21, f3, 52, a6, 3f, 41, cb, a3, 8c, 08, 14, b5, 3a, 41, 42, 06, 71, 06, 57, a6, 3f, 41, 4f, 66, 6a, 04, 0c, b5, 3a, 41, b6, a2, d5, 3d, 5c, a6, 3f, 41, fb, 59, a0, 36, fb, b4, 3a, 41, 49, e8, 3f, 90, 5a, a6, 3f, 41, 01, b1, db, c6, ec, b4, 3a, 41, 26, 15, d1, 54, 5a, a6, 3f, 41, 6a, 21, 60, de, dd, b4, 3a, 41, 24, 9d, 1a, e1, 59, a6, 3f, 41, ce, 0d, 82, 1e, ce, b4, 3a, 41, 5b, 1f, f5, b2, 59, a6, 3f, 41, d1, 4b, 4a, 5d, be, b4, 3a, 41, ac, f3, 3b, 52, 5a, a6, 3f, 41, 13, 44, d6, fa, af, b4, 3a, 41, 3a, 05, 7f, f8, 5c, a6, 3f, 41, 15, 3f, 4f, d8, a3, b4, 3a, 41, f5, ee, 03, c3, 62, a6, 3f, 41, 14, ab, 78, ba, 9b, b4, 3a, 41, 15, 71, 0b, 45, 6e, a6, 3f, 41, 15, cc, 02, 43, 90, b4, 3a, 41, db, a8, 2a, 51, 8c, a6, 3f, 41
## Configure coordinates
sfa = result %>% 
  sf::st_as_sf()  %>% 
  sf::st_set_crs(5070) %>% 
  sf::st_transform(4326)


## Map
sfa %>% 
  leaflet() %>% 
  addTiles() %>% 
  addPolygons()

Indeed we can utilize Apache Arrow to query spatial databases. The main drawback is that unlike sfarrow regular arrow does not bring int he coordinates metadata automatically and so we need to operationalize the coordinates manually. Hopefully in the future sfarrow will have support sfarrow::sf_open_dataset() function so we can avoid that one manual step!