Setup Code
## Dependencies
library(pacman)
::p_load(tidyverse, terra, glue, sfarrow, leaflet, furrr, sf, purrr, cli, tictoc) pacman
The overall goal of this pipeline is to pre-process raw PRISM data for our zonal statistics calcualtion, this involves:
.bil
to a more performant format .tif
Our setup involves first loading dependencies
## Dependencies
library(pacman)
::p_load(tidyverse, terra, glue, sfarrow, leaflet, furrr, sf, purrr, cli, tictoc) pacman
Lets also set up the CCUH blocks paths associated with this pipeline. We have one upstream block (raw PRISM data) and one downstream block (standardized PRISM data).
## Declare CCUH Blocks
= "//files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data"
upstream_block = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/SpatialAnalyticMethods_SAM/freeze/ccuh_prism_zonal_stats_v1/1-preprocessed-prism-raster-tif"
downstream_block
## Local cache blocks
= "tmp/PRISM_data"
upstream_cache = "tmp/raster" downstream_cache
Note that we have a local cache path as most raster operationalize work faster on local data so we will cache the data locally temporarily for processing before moving to the block locations for long term storage.
Before we start processing, lets take a look at what is already done processing in our local cached downstream directory compared to all available PRISM raw data; to see what upstream things we need to cache and process.
Lets first check our local cached results.
## Remove invalid .tif rasters
list.files(
downstream_cache, pattern = '.tif$',
recursive = T,
full.names = T) %>%
walk(~{
if (file.size(.x) < 1000000) {
cli_alert_warning("Removing invalid .tif file: {.x}")
file.remove(.x)
}
})
## Inventory cached results
= tibble(
df_downstream_cache downstream_cache = list.files(
downstream_cache, pattern = '.tif$',
recursive = T,
full.names = T)
%>%
) mutate(
file_name = downstream_cache %>% basename() %>% str_remove('.tif'),
downstream_cache_file_size = file.size(downstream_cache)
%>%
) select(file_name, everything())
head(df_downstream_cache)
file_name | downstream_cache | downstream_cache_file_size |
---|---|---|
prism_ppt_us_30s_20060101 | tmp/raster/prism_ppt_us_30s_20060101/prism_ppt_us_30s_20060101.tif | 28344874 |
prism_ppt_us_30s_20060102 | tmp/raster/prism_ppt_us_30s_20060102/prism_ppt_us_30s_20060102.tif | 36914510 |
prism_ppt_us_30s_20060103 | tmp/raster/prism_ppt_us_30s_20060103/prism_ppt_us_30s_20060103.tif | 34767718 |
prism_ppt_us_30s_20060104 | tmp/raster/prism_ppt_us_30s_20060104/prism_ppt_us_30s_20060104.tif | 22980798 |
prism_ppt_us_30s_20060105 | tmp/raster/prism_ppt_us_30s_20060105/prism_ppt_us_30s_20060105.tif | 15020724 |
prism_ppt_us_30s_20060106 | tmp/raster/prism_ppt_us_30s_20060106/prism_ppt_us_30s_20060106.tif | 14369374 |
Here we want to make sure that the raster files sizes don’t change over time. Small bug noticed inprocessing.
## Get most previous log
= arrow::open_dataset("tmp/raster_log")
db = db %>%
df_log_previous distinct() %>%
collect() %>%
filter(log_index == max(log_index)) %>%
select(file_name,
prev_size = downstream_cache_file_size)
## Check difference in sizes
= df_downstream_cache %>%
df_qc_tif_size select(file_name,
size = downstream_cache_file_size) %>%
left_join(df_log_previous, by = "file_name") %>%
mutate(same_size = size == prev_size)
= df_qc_tif_size %>%
df_invalid filter(!same_size)
if (nrow(df_invalid) > 0) cli_abort("Some .tif files have changed size")
Here we have upstream blcoks on the server as well as upstream cached files.
## Inventory Upstream block
= tibble(
df_upstream upstream_block = list.files(
upstream_block, pattern = '.bil$',
recursive = T,
full.names = T)
%>%
) mutate(
file_name = upstream_block %>% basename() %>% str_remove('.bil')
%>%
) select(file_name, everything())
head(df_upstream)
file_name | upstream_block |
---|---|
prism_ppt_us_30s_20060101 | //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060101.bil |
prism_ppt_us_30s_20060102 | //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060102.bil |
prism_ppt_us_30s_20060103 | //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060103.bil |
prism_ppt_us_30s_20060104 | //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060104.bil |
prism_ppt_us_30s_20060105 | //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060105.bil |
prism_ppt_us_30s_20060106 | //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060106.bil |
## Inventory Upstream cache
= tibble(
df_upstream_cache upstream_cache = list.files(
upstream_cache, pattern = '.bil$',
recursive = T,
full.names = T)
%>%
) mutate(
file_name = upstream_cache %>% basename() %>% str_remove('.bil')
%>%
) select(file_name, everything())
head(df_upstream_cache)
file_name | upstream_cache |
---|
Finally lets return a list of files that still need processing.
Now we have the already converted .tif files and a list of raw .bil files we can check to see what still needs converting.
= df_upstream %>%
df_prism_files filter(!str_detect(file_name,'vpd')) %>%
left_join(df_downstream_cache, by = "file_name") %>%
left_join(df_upstream_cache, by = "file_name") %>%
select(file_name, upstream_block, upstream_cache, downstream_cache)
= df_prism_files %>%
df_files_to_process filter(is.na(downstream_cache))
head(df_prism_files)
file_name | upstream_block | upstream_cache | downstream_cache |
---|---|---|---|
prism_ppt_us_30s_20060101 | //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060101.bil | NA | tmp/raster/prism_ppt_us_30s_20060101/prism_ppt_us_30s_20060101.tif |
prism_ppt_us_30s_20060102 | //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060102.bil | NA | tmp/raster/prism_ppt_us_30s_20060102/prism_ppt_us_30s_20060102.tif |
prism_ppt_us_30s_20060103 | //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060103.bil | NA | tmp/raster/prism_ppt_us_30s_20060103/prism_ppt_us_30s_20060103.tif |
prism_ppt_us_30s_20060104 | //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060104.bil | NA | tmp/raster/prism_ppt_us_30s_20060104/prism_ppt_us_30s_20060104.tif |
prism_ppt_us_30s_20060105 | //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060105.bil | NA | tmp/raster/prism_ppt_us_30s_20060105/prism_ppt_us_30s_20060105.tif |
prism_ppt_us_30s_20060106 | //files.drexel.edu/encrypted/SOPH/UHC/Schinasi_HEAT/PRISM_Data/ppt/2006/prism_ppt_us_30s_20060106.bil | NA | tmp/raster/prism_ppt_us_30s_20060106/prism_ppt_us_30s_20060106.tif |
Currently there are 23740 PRISM files and all of them have already been processed. SO this pipeline run has nothign to process. We just QC.
Note that if you are trying to convert a large number of files, it is best to cache the files locally before processing. But for a few files this isn’t really worth it. But here lets set up a gaurdrail that if we are trying to process more than 10 it throws a warning that so you should download.
if (nrow(df_files_to_process) > 10){
cli_abort("You are trying to process more than 10 files, consider downloading the files locally")
}
Now setup the function for conversion
## test parameters
= df_files_to_process %>% slice(1)
row = row$file_name
file_name_tmp = row$upstream_block
upstream_block_tmp = row$upstream_cache
upstream_cache_tmp = row$downstream_cache
downstream_cache_tmp rm(row, file_name_tmp, upstream_block_tmp, upstream_cache_tmp, downstream_cache_tmp)
## Function To convert .bil to .tif
= function(file_name_tmp,
crated_convert_prism_to_tif
upstream_block_tmp,
upstream_cache_tmp ,
downstream_cache_tmp){
## Stop if done
if ( file.exists(downstream_cache_tmp)) return("Already done")
## Import upstream raster
if (file.exists(upstream_cache_tmp)){
= terra::rast(upstream_cache_tmp)
prism_rast else {
} = terra::rast(upstream_block_tmp)
prism_rast
}
## Transform to Albers USA
= prism_rast %>% terra::project("EPSG:5070")
prism_rast_albers
## Export
= file.path('tmp','raster', file_name_tmp)
out_dir if (!dir.exists(out_dir)) dir.create(out_dir, recursive = T)
= file.path(out_dir, paste0(file_name_tmp, '.tif'))
out_path
|> terra::writeRaster(filename=out_path, overwrite=TRUE)
prism_rast_albers
::cli_alert_success("Finished processing and export: {file_name_tmp}")
cli
}
<- possibly(
crated_convert_prism_to_tif__possibly
crated_convert_prism_to_tif, otherwise = NULL,
quiet = FALSE)
if ( between(nrow(df_files_to_process) ,1, 10) ){
::pmap(
purrr
df_files_to_process,
crated_convert_prism_to_tif__possibly
)
}
if (nrow(df_files_to_process) > 10){
## Configure parallel
# plan(multisession, workers = 6)
# opts <- furrr_options(globals = FALSE, seed = TRUE)
#
# ## Run
# tic()
# furrr::future_walk(
# df_files_to_process$raw_file,
# crated_convert_prism_to_tif__possibly,
# .options = opts
# )
# toc()
}
Here we do some tests on the results
= tibble(
df_downstream_cache_final downstream_cache = list.files(
downstream_cache, pattern = '.tif$',
recursive = T,
full.names = T)
%>%
) mutate(
file_name = downstream_cache %>% basename() %>% str_remove('.tif'),
downstream_cache_file_size = file.size(downstream_cache)
%>%
) select(file_name, everything())
## Inventory cached results
= df_upstream %>%
df_qc select(file_name) %>%
mutate(upstream = T) %>%
left_join(df_downstream_cache_final %>%
select(file_name) %>%
mutate(downstream = T), by = "file_name") %>%
mutate(valid = upstream == downstream)
= df_qc %>%
df_invalid filter(!valid)
if (nrow(df_invalid) > 0) cli_abort("Some files not processed")
= df_downstream_cache_final %>%
df_invalid filter(downstream_cache_file_size < 1000000)
if (nrow(df_invalid) > 0) cli_abort("Some .tif files are too small")
## Get most previous log
= arrow::open_dataset("tmp/raster_log")
db = db %>%
df_log_previous distinct() %>%
collect() %>%
filter(log_index == max(log_index)) %>%
select(file_name,
prev_size = downstream_cache_file_size)
## Check difference in sizes
= df_downstream_cache_final %>%
df_qc_tif_size select(file_name,
size = downstream_cache_file_size) %>%
left_join(df_log_previous, by = "file_name") %>%
mutate(same_size = size == prev_size)
= df_qc_tif_size %>%
df_invalid filter(!same_size)
if (nrow(df_invalid) > 0) cli_abort("Some .tif files have changed size")
Here we log file sizes
if (F){
## Log results if we want
{ = arrow::open_dataset("tmp/raster_log")
db if (nrow(db) == 0 ) prev_index = 0
if (nrow(db) > 0) prev_index = collect(count(db, log_index)) %>%
pull(log_index) %>% max()
= prev_index+1
index_tmp = glue("tmp/raster_log/log_index={index_tmp}")
out_dir = file.path(out_dir, 'data-0.parquet')
out_path if (!dir.exists(out_dir)) dir.create(out_dir, recursive = T)
|> arrow::write_parquet(out_path)
df_downstream_cache_final cli_alert_success("Logged: raster metadata!")
}
if (nrow(df_files_to_process) > 0){
= glue("tmp/processing_log/processing_log_{index_tmp}.csv")
out_tmp %>% write.csv(out_tmp)
df_files_to_process cli_alert_success("Logged: processing metadata!")
} }