PRISM step 1: Preprocessing PRISM .bil into .tif for zonal statistics

Pipeline ID: 1 - pre_process_prism__prism_zonal_stats_v1
Author

Ran Li

Published

April 5, 2024

The overall goal of this pipeline is to pre-process raw PRISM data for our zonal statistics calcualtion, this involves:

Setup

Our setup involves first loading dependencies

Setup Code
## Dependencies
library(pacman) 
pacman::p_load(tidyverse, terra, glue, sfarrow, leaflet, furrr, sf, purrr, cli, tictoc)

Lets also set up the CCUH blocks paths associated with this pipeline. We have one upstream block (raw PRISM data) and one downstream block (standardized PRISM data).

Declar CCUH Block paths
## Declare CCUH Blocks
upstream_block = "//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data"
downstream_block = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/SpatialAnalyticMethods_SAM/freeze/ccuh_prism_zonal_stats_v1/1-preprocessed-prism-raster-tif"
 
## Local cache blocks
upstream_cache = "tmp/PRISM_data"
downstream_cache = "tmp/raster"

Note that we have a local cache path as most raster operationalize work faster on local data so we will cache the data locally temporarily for processing before moving to the block locations for long term storage.

Inventory of existing blocks

Before we start processing, lets take a look at what is already done processing in our local cached downstream directory compared to all available PRISM raw data; to see what upstream things we need to cache and process.

Downstream cache

Inventory

Lets first check our local cached results.

Inventory cached results
## Remove invalid .tif rasters
list.files(
  downstream_cache, 
  pattern = '.tif$',
  recursive =  T,
  full.names = T) %>% 
  walk(~{
    if (file.size(.x) < 1000000) {
      cli_alert_warning("Removing invalid .tif file: {.x}")
      file.remove(.x)
    }
  })

## Inventory cached results
df_downstream_cache = tibble(
  downstream_cache = list.files(
    downstream_cache, 
    pattern = '.tif$',
    recursive =  T,
    full.names = T)
) %>% 
  mutate(
    file_name =  downstream_cache %>% basename() %>% str_remove('.tif'),
    downstream_cache_file_size = file.size(downstream_cache)
  ) %>% 
  select(file_name, everything()) 


head(df_downstream_cache)
file_name downstream_cache downstream_cache_file_size
prism_ppt_us_30s_20060101 tmp/raster/prism_ppt_us_30s_20060101/prism_ppt_us_30s_20060101.tif 28344874
prism_ppt_us_30s_20060102 tmp/raster/prism_ppt_us_30s_20060102/prism_ppt_us_30s_20060102.tif 36914510
prism_ppt_us_30s_20060103 tmp/raster/prism_ppt_us_30s_20060103/prism_ppt_us_30s_20060103.tif 34767718
prism_ppt_us_30s_20060104 tmp/raster/prism_ppt_us_30s_20060104/prism_ppt_us_30s_20060104.tif 22980798
prism_ppt_us_30s_20060105 tmp/raster/prism_ppt_us_30s_20060105/prism_ppt_us_30s_20060105.tif 15020724
prism_ppt_us_30s_20060106 tmp/raster/prism_ppt_us_30s_20060106/prism_ppt_us_30s_20060106.tif 14369374

QC

Here we want to make sure that the raster files sizes don’t change over time. Small bug noticed inprocessing.

Code
## Get most previous log
db = arrow::open_dataset("tmp/raster_log")
df_log_previous = db %>% 
  distinct() %>% 
  collect() %>% 
  filter(log_index == max(log_index)) %>% 
  select(file_name, 
         prev_size = downstream_cache_file_size)
  
## Check difference in sizes
df_qc_tif_size = df_downstream_cache %>% 
  select(file_name, 
         size = downstream_cache_file_size) %>% 
  left_join(df_log_previous, by = "file_name") %>% 
  mutate(same_size = size == prev_size)
df_invalid = df_qc_tif_size %>% 
  filter(!same_size)

if (nrow(df_invalid) > 0) cli_abort("Some .tif files have changed size")

Upstream Blocks

Here we have upstream blcoks on the server as well as upstream cached files.

Code
## Inventory Upstream block
df_upstream = tibble(
  upstream_block = list.files(
    upstream_block, 
    pattern = '.bil$',
    recursive =  T,
    full.names = T)
) %>% 
  mutate(
    file_name =  upstream_block %>% basename() %>% str_remove('.bil')
  )  %>% 
  select(file_name, everything()) 

head(df_upstream)
file_name upstream_block
prism_ppt_us_30s_20060101 //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060101.bil
prism_ppt_us_30s_20060102 //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060102.bil
prism_ppt_us_30s_20060103 //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060103.bil
prism_ppt_us_30s_20060104 //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060104.bil
prism_ppt_us_30s_20060105 //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060105.bil
prism_ppt_us_30s_20060106 //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060106.bil

Upstream Cache

Code
## Inventory Upstream cache
df_upstream_cache =  tibble(
  upstream_cache = list.files(
    upstream_cache, 
    pattern = '.bil$',
    recursive =  T,
    full.names = T)
) %>% 
  mutate(
    file_name =  upstream_cache %>% basename() %>% str_remove('.bil')
  )  %>% 
  select(file_name, everything()) 

head(df_upstream_cache)
file_name upstream_cache

Finally lets return a list of files that still need processing.

Op. Files to process

Now we have the already converted .tif files and a list of raw .bil files we can check to see what still needs converting.

Code
df_prism_files = df_upstream %>% 
  filter(!str_detect(file_name,'vpd')) %>% 
  left_join(df_downstream_cache, by = "file_name") %>% 
  left_join(df_upstream_cache, by = "file_name") %>% 
  select(file_name, upstream_block, upstream_cache, downstream_cache)

df_files_to_process =  df_prism_files %>% 
  filter(is.na(downstream_cache))

head(df_prism_files)
file_name upstream_block upstream_cache downstream_cache
prism_ppt_us_30s_20060101 //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060101.bil NA tmp/raster/prism_ppt_us_30s_20060101/prism_ppt_us_30s_20060101.tif
prism_ppt_us_30s_20060102 //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060102.bil NA tmp/raster/prism_ppt_us_30s_20060102/prism_ppt_us_30s_20060102.tif
prism_ppt_us_30s_20060103 //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060103.bil NA tmp/raster/prism_ppt_us_30s_20060103/prism_ppt_us_30s_20060103.tif
prism_ppt_us_30s_20060104 //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060104.bil NA tmp/raster/prism_ppt_us_30s_20060104/prism_ppt_us_30s_20060104.tif
prism_ppt_us_30s_20060105 //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060105.bil NA tmp/raster/prism_ppt_us_30s_20060105/prism_ppt_us_30s_20060105.tif
prism_ppt_us_30s_20060106 //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060106.bil NA tmp/raster/prism_ppt_us_30s_20060106/prism_ppt_us_30s_20060106.tif

Currently there are 23740 PRISM files and all of them have already been processed. SO this pipeline run has nothign to process. We just QC.

Process from .bil to .tif

Cache upstream files

Note that if you are trying to convert a large number of files, it is best to cache the files locally before processing. But for a few files this isn’t really worth it. But here lets set up a gaurdrail that if we are trying to process more than 10 it throws a warning that so you should download.

Code
if (nrow(df_files_to_process) > 10){
  cli_abort("You are trying to process more than 10 files, consider downloading the files locally")
}

Processing function

Now setup the function for conversion

Pre-process: Function
## test parameters
row = df_files_to_process %>% slice(1)
file_name_tmp = row$file_name
upstream_block_tmp = row$upstream_block
upstream_cache_tmp = row$upstream_cache
downstream_cache_tmp = row$downstream_cache
rm(row, file_name_tmp, upstream_block_tmp, upstream_cache_tmp, downstream_cache_tmp)

## Function To convert .bil to .tif
crated_convert_prism_to_tif = function(file_name_tmp, 
                                       upstream_block_tmp, 
                                       upstream_cache_tmp , 
                                       downstream_cache_tmp){
  
  ## Stop if done
  if ( file.exists(downstream_cache_tmp)) return("Already done")
  
  ## Import upstream raster
  if (file.exists(upstream_cache_tmp)){
    prism_rast = terra::rast(upstream_cache_tmp)
  } else {
    prism_rast = terra::rast(upstream_block_tmp)
  }

  ## Transform to Albers USA
  prism_rast_albers = prism_rast %>% terra::project("EPSG:5070")
  
  ## Export
  out_dir = file.path('tmp','raster', file_name_tmp)
  if (!dir.exists(out_dir)) dir.create(out_dir, recursive = T)
  out_path = file.path(out_dir, paste0(file_name_tmp, '.tif'))

  prism_rast_albers |> terra::writeRaster(filename=out_path, overwrite=TRUE) 
  
  cli::cli_alert_success("Finished processing and export: {file_name_tmp}")

}

crated_convert_prism_to_tif__possibly <- possibly(
  crated_convert_prism_to_tif, 
  otherwise = NULL, 
  quiet = FALSE)

Processing

Parallel runs: convert .bil to .tif
if ( between(nrow(df_files_to_process) ,1, 10) ){
  purrr::pmap(
    df_files_to_process, 
    crated_convert_prism_to_tif__possibly
  )
}

if (nrow(df_files_to_process) > 10){
  ## Configure parallel
  # plan(multisession, workers = 6)
  # opts <- furrr_options(globals = FALSE, seed = TRUE)
  # 
  # ## Run
  # tic()
  # furrr::future_walk(
  #   df_files_to_process$raw_file, 
  #   crated_convert_prism_to_tif__possibly,
  #   .options = opts
  # )
  # toc()
}

QC

Here we do some tests on the results

  • VALID: completed processing - all raw .bil and in .tif results
  • VALID: successful conversion - .tif > 1MB
  • VALID: file sizes are the same as previous log
Code
df_downstream_cache_final = tibble(
  downstream_cache = list.files(
    downstream_cache, 
    pattern = '.tif$',
    recursive =  T,
    full.names = T)
) %>% 
  mutate(
    file_name =  downstream_cache %>% basename() %>% str_remove('.tif'),
    downstream_cache_file_size = file.size(downstream_cache)
  ) %>% 
  select(file_name, everything()) 
Test: completed processing - all raw .bil and in .tif results
## Inventory cached results
df_qc = df_upstream %>% 
  select(file_name) %>% 
  mutate(upstream = T) %>% 
  left_join(df_downstream_cache_final %>% 
              select(file_name) %>% 
              mutate(downstream = T), by = "file_name") %>% 
  mutate(valid = upstream == downstream) 
df_invalid = df_qc %>% 
  filter(!valid)

if (nrow(df_invalid) > 0) cli_abort("Some files not processed")
Test: successful conversion - .tif > 1MB
df_invalid = df_downstream_cache_final %>% 
  filter(downstream_cache_file_size < 1000000)

if (nrow(df_invalid) > 0) cli_abort("Some .tif files are too small")
Test: file sizes are the same as previous log
## Get most previous log
db = arrow::open_dataset("tmp/raster_log")
df_log_previous = db %>% 
  distinct() %>% 
  collect() %>% 
  filter(log_index == max(log_index)) %>% 
  select(file_name, 
         prev_size = downstream_cache_file_size)

## Check difference in sizes
df_qc_tif_size = df_downstream_cache_final %>% 
  select(file_name, 
         size = downstream_cache_file_size) %>% 
  left_join(df_log_previous, by = "file_name") %>% 
  mutate(same_size = size == prev_size)

df_invalid = df_qc_tif_size %>%
  filter(!same_size)

if (nrow(df_invalid) > 0) cli_abort("Some .tif files have changed size")

Log processing history

Here we log file sizes

Code
if (F){
  
  { ## Log results if we want
    db = arrow::open_dataset("tmp/raster_log")
    if (nrow(db) == 0 ) prev_index = 0
    if (nrow(db) > 0) prev_index = collect(count(db, log_index)) %>% 
        pull(log_index) %>% max()
    index_tmp = prev_index+1
    out_dir = glue("tmp/raster_log/log_index={index_tmp}")
    out_path = file.path(out_dir, 'data-0.parquet')
    if (!dir.exists(out_dir)) dir.create(out_dir, recursive = T)
    df_downstream_cache_final |> arrow::write_parquet(out_path)
    cli_alert_success("Logged: raster metadata!")
    
  }
  
  if (nrow(df_files_to_process) > 0){
    out_tmp = glue("tmp/processing_log/processing_log_{index_tmp}.csv")
    df_files_to_process %>% write.csv(out_tmp)
    cli_alert_success("Logged: processing metadata!")
  }
}