Auditing data transformation

Written on 2015-06-03

Auditing data transformation can be simply described as gathering metadata about the transformation process. The most basics metadata would be a timestamp, atomic transformation description, data volume on input, data volume on output, time elapsed.

If you work with R only interactively you may find it more like a fancy tool. On the other hand for automated scheduled R jobs it may be quite helpful to have traceability on the lower grain of processing than just binary success or fail after the script is executed, for example a logging each query against the data.
Similar features are already available in ETL tools for decades.

Implementation

I've addressed this in my dtq package.
It silently eavesdrop every call to [ for any data.table.
It can be perceived as preconfigured and faster version of base::trace tailored for data transformation auditing.

I believe it should be possible to port the idea to magrittr's %>% call if somebody need auditing there.

Examples

Populate example sales data.

library(dtq)
set.seed(1)
DT <- data.table(
  user = 1:10,
  group = letters[1:5],
  time = as.POSIXct(seq(from=1.4e9L+1L,to=1.5e9L,by=1e3), origin="1970-01-01"),
  sales = rnorm(10*5*1e5, 5)
)
knitr::kable(head(DT, 2))
user group time sales
1 a 2014-05-13 17:53:21 4.373546
2 b 2014-05-13 18:10:01 5.183643

Basic example

Order data, various aggregations.

## just to clarify below usage of unnamed args in `[`
# DT[ i, j, by, keyby ]

# first sale by user in each month
DT[order(time), head(.SD, 1), .(user, year(time), month(time))]
# total sales by user in each month
DT[, .(sales = sum(sales)), .(user, year(time), month(time))]
# total sales by group of users in each months, returning ordered and keyed data
DT[, .(sales = sum(sales)),, .(group, year(time), month(time))]

Lets preview transformation metadata recorded by dtq.

knitr::kable(dtl(print = TRUE))
seq dtq_id dtq_seq src query timestamp env elapsed in_rows out_rows
1 1 1 DT [i = order(time), j = head(.SD, 1), by = .(user, year(time), month(time))] 2015-06-04 00:17:05 R_GlobalEnv 5.026816 5e+06 390
2 2 1 DT [j = .(sales = sum(sales)), by = .(user, year(time), month(time))] 2015-06-04 00:17:09 R_GlobalEnv 3.543426 5e+06 390
3 3 1 DT [j = .(sales = sum(sales)), keyby = .(group, year(time), month(time))] 2015-06-04 00:17:12 R_GlobalEnv 3.439573 5e+06 195

Complex example

Populate budget data to set together with sales data.

BUDGET <- CJ(
  group = letters[1:5],
  year = 2014:2016,
  month = 1:12
) # cross of dimensions
BUDGET[, budget := rnorm(3*12*5, 12*1e4, 12*1e3)] # budget value measure

For more complex processing I've found it very useful to organize single conceptual query on data into single chain of [ calls.

In below example I will calculate year-to-date sales and budget values and last year sales vs current sales ratio.

# column names
measures <- c("sales","budget")
ytd_measures <- paste("ytd", measures, sep="_")
ly_ytd_measures <- paste("ly", ytd_measures, sep="_")
# query
DT[, .(sales = sum(sales)),, .(group, year = year(time), month = month(time))
   ][BUDGET, `:=`(budget = i.budget, budget_ratio = sales / i.budget)
     ][, c(ytd_measures) := lapply(.SD, cumsum), .(group, year), .SDcol = measures
       ][, c(ly_ytd_measures) := shift(.SD), .(group, month), .SDcol = ytd_measures
         ][, ytd_sales_vs_ly_ytd_sales := ytd_sales / ly_ytd_sales
           ]

Check dtq logs again.

knitr::kable(dtl(print = TRUE))
seq dtq_id dtq_seq src query timestamp env elapsed in_rows out_rows
1 1 1 DT [i = order(time), j = head(.SD, 1), by = .(user, year(time), month(time))] 2015-06-04 00:17:05 R_GlobalEnv 5.0268161 5000000 390
2 2 1 DT [j = .(sales = sum(sales)), by = .(user, year(time), month(time))] 2015-06-04 00:17:09 R_GlobalEnv 3.5434259 5000000 390
3 3 1 DT [j = .(sales = sum(sales)), keyby = .(group, year(time), month(time))] 2015-06-04 00:17:12 R_GlobalEnv 3.4395734 5000000 195
4 4 1 BUDGET [j = :=(budget, rnorm(3 * 12 * 5, 12 * 10000, 12 * 1000))] 2015-06-04 00:17:12 R_GlobalEnv 0.0015612 180 180
5 5 1 DT [j = .(sales = sum(sales)), keyby = .(group, year = year(time), month = month(time))] 2015-06-04 00:17:16 R_GlobalEnv 3.3816404 5000000 195
6 5 2 DT [i = BUDGET, j = :=(budget = i.budget, budget_ratio = sales/i.budget)] 2015-06-04 00:17:16 R_GlobalEnv 0.0026196 195 195
7 5 3 DT [j = :=(c(ytd_measures), lapply(.SD, cumsum)), by = .(group, year), .SDcols = measures] 2015-06-04 00:17:16 R_GlobalEnv 0.0018303 195 195
8 5 4 DT [j = :=(c(ly_ytd_measures), shift(.SD)), by = .(group, month), .SDcols = ytd_measures] 2015-06-04 00:17:16 R_GlobalEnv 0.0044990 195 195
9 5 5 DT [j = :=(ytd_sales_vs_ly_ytd_sales, ytd_sales/ly_ytd_sales)] 2015-06-04 00:17:16 R_GlobalEnv 0.0012220 195 195

Minimal overhead

Time added to transformation due to logging using dtq is minimal.
For time consuming queries it is unnoticeable ~ 0% overhead.
For the most pessimistic, unrealistic query: for(i in seq_len(1e3)) data.table(a=1L)[,.(a)] it adds around 30% overhead.

Processes evaluated by dtq on each data.table [ call:

  • gathering information
    • current call
    • top environment of the current call
    • nrow on input and output
    • proc.time on start and end, or get_nanotime if available
  • append gathered info to log storage
    • c function is used to append current log to existing logs

All other information (sources, sequences, queries) are extracted from dtq logs while accessing them by dtl(). So they are not adding any overhead during data transformation.
The dtl() is a function to query against R6 class log storage.

Using in package

It is common to form your scripts into package so you may want to track data transformation from within your package.
It is handled by default, you don't need anything more than library(dtq) on script init.
Actually you can restrict logging to particular package or global env, which is useful if some of your dependencies are also using data.table and by default would be logged together with your logs.

Example use cases

Storing dtq logs in rds/csv/db

Traceability is good but if you close R session the dtq logs will be gone.
Simplest would be write.table with append=TRUE on the end of script.
Use dtl(print = TRUE) to get rid off list-type columns which stores the R6 or call objects.
If you want to keep R data types you should then use saveRDS on dtl().

Performance tuning

You can easily analyse your logs and detect the most time consuming sub-queries. Compare its timing to volume of in/our rows, etc.

Apply business rules on metadata

Your currency exchange source system is commited to provide daily exchanges rates in 5 minutes intervals.

  • You expect exactly 288 rows on the input. If that is not true you catch it and send email directly from R to source system support.
  • In your high level processing you expect to have 5 different KPIs on output. If it is not true you catch it and send email to yourself.

PS.

I would appreciate if someone could recommend a commenting service for github jekyll blogs, without tracking feature like in disqus service. Including self hosted of course.