4-26-24 - DuckDB transcation db test

Setting up a transcational database
Author

Ran Li

library(pacman)
p_load(duckdb, dplyr, jsonlite, yaml)
db_name = paste0(Sys.time() %>% as.integer(), ".duckdb")

The goal here is to test duckdb on the UHC as a transactional database to store pipeline metadata. We will follow this API guide: https://duckdb.org/docs/api/r.html

1. Initialize database

For now lets have this database here: //files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/SpatialAnalyticMethods_SAM/pipeline_db.

Lets create a dummy one for testing with a unqiue name.

## db_name
db_path = file.path(
  "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/SpatialAnalyticMethods_SAM/pipeline_db", 
  db_name)
 
## Connect to db
con <- dbConnect(duckdb(), dbdir = db_path, read_only = FALSE)

Lest see whats inside

duckdb::dbListTables(con)
character(0)

2. Add blocks table

ets add our first table which is just the blocks in our system.

  • blocks
    • id
    • name
    • type
    • format
    • tags
    • location
    • version
    • date_create
    • date_update
## Create Table
dbExecute(con, "
  CREATE TABLE blocks (
    id VARCHAR,
    name VARCHAR,
    type VARCHAR,
    format VARCHAR,
    tags VARCHAR,
    location VARCHAR,
    version VARCHAR, 
    mtime TIMESTAMP
  )
")
[1] 0
## Ini data
blocks_df <- data.frame(
  id = '1',
  name = "Example Block",
  type = "Type1",
  format = "Format1",
  tags = "Tag1, Tag2",
  location = "Location1",
  version = "v1.0",
  mtime = Sys.time()  # Using Sys.time() for current date and time
)

dbWriteTable(con, "blocks", blocks_df, append = TRUE, overwrite = FALSE)

Now lets pull information from the blocks table

dbGetQuery(con, "SELECT * FROM blocks")
  id          name  type  format       tags  location version
1  1 Example Block Type1 Format1 Tag1, Tag2 Location1    v1.0
                mtime
1 2024-04-26 21:01:47

Great. Now we can write and get data!

3. Try Dbplyr

Here we want to interact with our database with dplyr API. So the first step is to operationalize our tables and dataframe connections. Lets conenct to the ‘blocks’ db:

df_blocks = tbl(con, "blocks")

df_blocks
# Source:   table<blocks> [1 x 8]
# Database: DuckDB v0.10.1 [ranli@Windows 10 x64:R 4.3.2/\\files.drexel.edu\colleges\SOPH\Shared\UHC\Projects\SpatialAnalyticMethods_SAM\pipeline_db\1714165305.duckdb]
  id    name          type  format  tags    location version mtime              
  <chr> <chr>         <chr> <chr>   <chr>   <chr>    <chr>   <dttm>             
1 1     Example Block Type1 Format1 Tag1, … Locatio… v1.0    2024-04-26 21:01:47

now we can write some queries

df_blocks %>% 
  filter(version == 'v1.0') %>% 
  collect()
# A tibble: 1 × 8
  id    name          type  format  tags    location version mtime              
  <chr> <chr>         <chr> <chr>   <chr>   <chr>    <chr>   <dttm>             
1 1     Example Block Type1 Format1 Tag1, … Locatio… v1.0    2024-04-26 21:01:47

4. YAML to DB

Our use case for having a transcational database is for a place to store blocks and pipeline information. I don’t think we will store run information. but start simple wiht blocks and pipelines and the relationships there.

Raw Block

Here we have a Raw Data Block and a Seed Data Block. Before we run the pipeline we should clearly define both of these things in our YAML file. Lets draft that locally here. We wrote op araw block metadata as json

id: raw_dsph_phl_nb
type: raw
format: .shp
tags:
  - boundaries
description: Phill boundaries
version: 0.0.1

Lets import that!

## Import YAML
yml = yaml.load_file("block1.yml") 
tags_tmp = yml$tags %>% toJSON() 
df_imported = yml %>% 
  as_tibble() %>% 
  select(-tags) %>% 
  distinct() %>% 
  mutate(tags = tags_tmp,
         mtime = Sys.time()) 
df_imported
# A tibble: 1 × 8
  id              name   type  format location version tags  mtime              
  <chr>           <chr>  <chr> <chr>  <chr>    <chr>   <jso> <dttm>             
1 raw_dsph_phl_nb Phila… raw   .shp   //files… 0.0.1   "[\"… 2024-04-26 17:01:47
## Write to db
dbWriteTable(con, "blocks", df_imported, append = TRUE, overwrite = FALSE)

Lets check its in the db

df_blocks %>% 
  filter(id == 'raw_dsph_phl_nb') %>% 
  collect()
# A tibble: 1 × 8
  id              name   type  format tags  location version mtime              
  <chr>           <chr>  <chr> <chr>  <chr> <chr>    <chr>   <dttm>             
1 raw_dsph_phl_nb Phila… raw   .shp   "[\"… //files… 0.0.1   2024-04-26 21:01:47