Auditing data transformation
Auditing data transformation can be simply described as gathering metadata about the transformation process. The most basics metadata would be a timestamp, atomic transformation description, data volume on input, data volume on output, time elapsed.
If you work with R only interactively you may find it more like a fancy tool. On the other hand for automated scheduled R jobs it may be quite helpful to have traceability on the lower grain of processing than just binary success or fail after the script is executed, for example a logging each query against the data.
Similar features are already available in ETL tools for decades.
Implementation
I've addressed this in my dtq package.
It silently eavesdrop every call to [
for any data.table
.
It can be perceived as preconfigured and faster version of base::trace
tailored for data transformation auditing.
I believe it should be possible to port the idea to magrittr's %>%
call if somebody need auditing there.
Examples
Populate example sales data.
library(dtq)
set.seed(1)
DT <- data.table(
user = 1:10,
group = letters[1:5],
time = as.POSIXct(seq(from=1.4e9L+1L,to=1.5e9L,by=1e3), origin="1970-01-01"),
sales = rnorm(10*5*1e5, 5)
)
knitr::kable(head(DT, 2))
user | group | time | sales |
---|---|---|---|
1 | a | 2014-05-13 17:53:21 | 4.373546 |
2 | b | 2014-05-13 18:10:01 | 5.183643 |
Basic example
Order data, various aggregations.
## just to clarify below usage of unnamed args in `[`
# DT[ i, j, by, keyby ]
# first sale by user in each month
DT[order(time), head(.SD, 1), .(user, year(time), month(time))]
# total sales by user in each month
DT[, .(sales = sum(sales)), .(user, year(time), month(time))]
# total sales by group of users in each months, returning ordered and keyed data
DT[, .(sales = sum(sales)),, .(group, year(time), month(time))]
Lets preview transformation metadata recorded by dtq
.
knitr::kable(dtl(print = TRUE))
seq | dtq_id | dtq_seq | src | query | timestamp | env | elapsed | in_rows | out_rows |
---|---|---|---|---|---|---|---|---|---|
1 | 1 | 1 | DT | [i = order(time), j = head(.SD, 1), by = .(user, year(time), month(time))] | 2015-06-04 00:17:05 | R_GlobalEnv | 5.026816 | 5e+06 | 390 |
2 | 2 | 1 | DT | [j = .(sales = sum(sales)), by = .(user, year(time), month(time))] | 2015-06-04 00:17:09 | R_GlobalEnv | 3.543426 | 5e+06 | 390 |
3 | 3 | 1 | DT | [j = .(sales = sum(sales)), keyby = .(group, year(time), month(time))] | 2015-06-04 00:17:12 | R_GlobalEnv | 3.439573 | 5e+06 | 195 |
Complex example
Populate budget data to set together with sales data.
BUDGET <- CJ(
group = letters[1:5],
year = 2014:2016,
month = 1:12
) # cross of dimensions
BUDGET[, budget := rnorm(3*12*5, 12*1e4, 12*1e3)] # budget value measure
For more complex processing I've found it very useful to organize single conceptual query on data into single chain of
[
calls.
In below example I will calculate year-to-date sales and budget values and last year sales vs current sales ratio.
# column names
measures <- c("sales","budget")
ytd_measures <- paste("ytd", measures, sep="_")
ly_ytd_measures <- paste("ly", ytd_measures, sep="_")
# query
DT[, .(sales = sum(sales)),, .(group, year = year(time), month = month(time))
][BUDGET, `:=`(budget = i.budget, budget_ratio = sales / i.budget)
][, c(ytd_measures) := lapply(.SD, cumsum), .(group, year), .SDcol = measures
][, c(ly_ytd_measures) := shift(.SD), .(group, month), .SDcol = ytd_measures
][, ytd_sales_vs_ly_ytd_sales := ytd_sales / ly_ytd_sales
]
Check dtq
logs again.
knitr::kable(dtl(print = TRUE))
seq | dtq_id | dtq_seq | src | query | timestamp | env | elapsed | in_rows | out_rows |
---|---|---|---|---|---|---|---|---|---|
1 | 1 | 1 | DT | [i = order(time), j = head(.SD, 1), by = .(user, year(time), month(time))] | 2015-06-04 00:17:05 | R_GlobalEnv | 5.0268161 | 5000000 | 390 |
2 | 2 | 1 | DT | [j = .(sales = sum(sales)), by = .(user, year(time), month(time))] | 2015-06-04 00:17:09 | R_GlobalEnv | 3.5434259 | 5000000 | 390 |
3 | 3 | 1 | DT | [j = .(sales = sum(sales)), keyby = .(group, year(time), month(time))] | 2015-06-04 00:17:12 | R_GlobalEnv | 3.4395734 | 5000000 | 195 |
4 | 4 | 1 | BUDGET | [j = := (budget, rnorm(3 * 12 * 5, 12 * 10000, 12 * 1000))] |
2015-06-04 00:17:12 | R_GlobalEnv | 0.0015612 | 180 | 180 |
5 | 5 | 1 | DT | [j = .(sales = sum(sales)), keyby = .(group, year = year(time), month = month(time))] | 2015-06-04 00:17:16 | R_GlobalEnv | 3.3816404 | 5000000 | 195 |
6 | 5 | 2 | DT | [i = BUDGET, j = := (budget = i.budget, budget_ratio = sales/i.budget)] |
2015-06-04 00:17:16 | R_GlobalEnv | 0.0026196 | 195 | 195 |
7 | 5 | 3 | DT | [j = := (c(ytd_measures), lapply(.SD, cumsum)), by = .(group, year), .SDcols = measures] |
2015-06-04 00:17:16 | R_GlobalEnv | 0.0018303 | 195 | 195 |
8 | 5 | 4 | DT | [j = := (c(ly_ytd_measures), shift(.SD)), by = .(group, month), .SDcols = ytd_measures] |
2015-06-04 00:17:16 | R_GlobalEnv | 0.0044990 | 195 | 195 |
9 | 5 | 5 | DT | [j = := (ytd_sales_vs_ly_ytd_sales, ytd_sales/ly_ytd_sales)] |
2015-06-04 00:17:16 | R_GlobalEnv | 0.0012220 | 195 | 195 |
Minimal overhead
Time added to transformation due to logging using dtq
is minimal.
For time consuming queries it is unnoticeable ~ 0% overhead.
For the most pessimistic, unrealistic query: for(i in seq_len(1e3)) data.table(a=1L)[,.(a)]
it adds around 30% overhead.
Processes evaluated by dtq
on each data.table [
call:
- gathering information
- current call
- top environment of the current call
nrow
on input and outputproc.time
on start and end, orget_nanotime
if available
- append gathered info to log storage
c
function is used to append current log to existing logs
All other information (sources, sequences, queries) are extracted from dtq
logs while accessing them by dtl()
. So they are not adding any overhead during data transformation.
The dtl()
is a function to query against R6 class log storage.
Using in package
It is common to form your scripts into package so you may want to track data transformation from within your package.
It is handled by default, you don't need anything more than library(dtq)
on script init.
Actually you can restrict logging to particular package or global env, which is useful if some of your dependencies are also using data.table and by default would be logged together with your logs.
Example use cases
Storing dtq logs in rds/csv/db
Traceability is good but if you close R session the dtq
logs will be gone.
Simplest would be write.table
with append=TRUE
on the end of script.
Use dtl(print = TRUE)
to get rid off list-type columns which stores the R6
or call
objects.
If you want to keep R data types you should then use saveRDS
on dtl()
.
Performance tuning
You can easily analyse your logs and detect the most time consuming sub-queries. Compare its timing to volume of in/our rows, etc.
Apply business rules on metadata
Your currency exchange source system is commited to provide daily exchanges rates in 5 minutes intervals.
- You expect exactly 288 rows on the input. If that is not true you catch it and send email directly from R to source system support.
- In your high level processing you expect to have 5 different KPIs on output. If it is not true you catch it and send email to yourself.
PS.
I would appreciate if someone could recommend a commenting service for github jekyll blogs, without tracking feature like in disqus service. Including self hosted of course.