A common scenario is to split a data set into subsets and then apply
the same analysis to each part. In context of pipelines, this means that
we would like to apply the same pipeline multiple times to each data
subset. In additon, we may then want to combine parts of the individual
output. As we will see, pipeflow
provides a built-in
function to handle this scenario.
Let’s first define our pipeline, which, to keep matters simple, just fits a linear model and outputs the model coefficients.
library(pipeflow)
pip <- pipe_new(
"my-pipeline"
) |>
pipe_add(
"fit",
function(
data = ~data,
xVar = "x",
yVar = "y"
) {
lm(paste(yVar, "~", xVar), data = data)
}
) |>
pipe_add(
"coefs",
function(
fit = ~fit
) {
coefficients(fit)
},
keepOut = TRUE
)
So our pipeline looks like this:
pip
# step depends out keepOut group state
# <char> <list> <list> <lgcl> <char> <char>
# 1: data [NULL] FALSE data New
# 2: fit data [NULL] FALSE fit New
# 3: coefs fit [NULL] TRUE coefs New
Or graphically:
library(visNetwork)
do.call(visNetwork, args = c(pip$get_graph(), list(height = 100))) |>
visHierarchicalLayout(direction = "LR")
We use the iris
data set as our working example.
head(iris)
# Sepal.Length Sepal.Width Petal.Length Petal.Width Species
# 1 5.1 3.5 1.4 0.2 setosa
# 2 4.9 3.0 1.4 0.2 setosa
# 3 4.7 3.2 1.3 0.2 setosa
# 4 4.6 3.1 1.5 0.2 setosa
# 5 5.0 3.6 1.4 0.2 setosa
# 6 5.4 3.9 1.7 0.4 setosa
First, we apply the pipeline to the whole data set.
pip$set_data(iris)
pip$set_params(list(xVar = "Sepal.Length", yVar = "Sepal.Width"))
pip$run()
# INFO [2024-12-22 21:50:28.386] Start run of 'my-pipeline' pipeline:
# INFO [2024-12-22 21:50:28.387] Step 1/3 data
# INFO [2024-12-22 21:50:28.389] Step 2/3 fit
# INFO [2024-12-22 21:50:28.392] Step 3/3 coefs
# INFO [2024-12-22 21:50:28.393] Finished execution of steps.
# INFO [2024-12-22 21:50:28.393] Done.
Next, we want to apply the pipeline to each species separately. One
way to do this would be to use R’s split
function. We can
split it by the Species
column and then run the pipeline
for each subset. For example:
run_pipeline <- function(data) {
pip$set_data(data)
pip$run()
pip$collect_out()
}
results <- lapply(split(iris, iris$Species), FUN = run_pipeline)
# INFO [2024-12-22 21:50:28.463] Start run of 'my-pipeline' pipeline:
# INFO [2024-12-22 21:50:28.464] Step 1/3 data
# INFO [2024-12-22 21:50:28.466] Step 2/3 fit
# INFO [2024-12-22 21:50:28.469] Step 3/3 coefs
# INFO [2024-12-22 21:50:28.471] Finished execution of steps.
# INFO [2024-12-22 21:50:28.471] Done.
# INFO [2024-12-22 21:50:28.489] Start run of 'my-pipeline' pipeline:
# INFO [2024-12-22 21:50:28.490] Step 1/3 data
# INFO [2024-12-22 21:50:28.492] Step 2/3 fit
# INFO [2024-12-22 21:50:28.495] Step 3/3 coefs
# INFO [2024-12-22 21:50:28.496] Finished execution of steps.
# INFO [2024-12-22 21:50:28.497] Done.
# INFO [2024-12-22 21:50:28.499] Start run of 'my-pipeline' pipeline:
# INFO [2024-12-22 21:50:28.500] Step 1/3 data
# INFO [2024-12-22 21:50:28.502] Step 2/3 fit
# INFO [2024-12-22 21:50:28.505] Step 3/3 coefs
# INFO [2024-12-22 21:50:28.506] Finished execution of steps.
# INFO [2024-12-22 21:50:28.506] Done.
results
# $setosa
# $setosa$coefs
# (Intercept) Sepal.Length
# -0.5694327 0.7985283
#
#
# $versicolor
# $versicolor$coefs
# (Intercept) Sepal.Length
# 0.8721460 0.3197193
#
#
# $virginica
# $virginica$coefs
# (Intercept) Sepal.Length
# 1.4463054 0.2318905
Unfortunately, with this approach we had to create additional code that had to be run outside the pipeline framework. Let’s now see how to handle this scenario within the pipeline framework.
As a reminder, our pipeline looks like this:
pip
# step depends out keepOut group state
# <char> <list> <list> <lgcl> <char> <char>
# 1: data <data.frame[50x5]> FALSE data Done
# 2: fit data <lm[12]> FALSE fit Done
# 3: coefs fit 1.4463054,0.2318905 TRUE coefs Done
To deal with split data sets, we use the built-in function
set_data_split
.
This function actually transforms the pipeline:
pip
# step depends out keepOut group state
# <char> <list> <list> <lgcl> <char> <char>
# 1: data.setosa [NULL] FALSE setosa New
# 2: fit.setosa data.setosa <lm[12]> FALSE setosa Outdated
# 3: coefs.setosa fit.setosa 1.4463054,0.2318905 TRUE setosa Outdated
# 4: data.versicolor [NULL] FALSE versicolor New
# 5: fit.versicolor data.versicolor <lm[12]> FALSE versicolor Outdated
# 6: coefs.versicolor fit.versicolor 1.4463054,0.2318905 TRUE versicolor Outdated
# 7: data.virginica [NULL] FALSE virginica New
# 8: fit.virginica data.virginica <lm[12]> FALSE virginica Outdated
# 9: coefs.virginica fit.virginica 1.4463054,0.2318905 TRUE virginica Outdated
As we can see, the pipeline now has replicated itself for each data subset.
do.call(visNetwork, args = pip$get_graph()) |>
visHierarchicalLayout(direction = "LR", sortMethod = "directed")
Note that set_data_split
accepts any list of data
frames, not just the output of split
. Now let’s run this
pipeline.
pip$run()
# INFO [2024-12-22 21:50:28.740] Start run of 'my-pipeline' pipeline:
# INFO [2024-12-22 21:50:28.741] Step 1/9 data.setosa
# INFO [2024-12-22 21:50:28.743] Step 2/9 fit.setosa
# INFO [2024-12-22 21:50:28.746] Step 3/9 coefs.setosa
# INFO [2024-12-22 21:50:28.748] Step 4/9 data.versicolor
# INFO [2024-12-22 21:50:28.750] Step 5/9 fit.versicolor
# INFO [2024-12-22 21:50:28.752] Step 6/9 coefs.versicolor
# INFO [2024-12-22 21:50:28.754] Step 7/9 data.virginica
# INFO [2024-12-22 21:50:28.756] Step 8/9 fit.virginica
# INFO [2024-12-22 21:50:28.759] Step 9/9 coefs.virginica
# INFO [2024-12-22 21:50:28.760] Finished execution of steps.
# INFO [2024-12-22 21:50:28.761] Done.
pip$collect_out()
# $setosa
# (Intercept) Sepal.Length
# -0.5694327 0.7985283
#
# $versicolor
# (Intercept) Sepal.Length
# 0.8721460 0.3197193
#
# $virginica
# (Intercept) Sepal.Length
# 1.4463054 0.2318905
As we can see, the output is now the same as before but was obtained without the need to write additional code outside the pipeline framework.
Finally, as a side note there is another built-in function named
split
, which can be used to split the pipeline into its
independent parts. While this works for any pipeline, in our working
example, it naturally will split the pipeline into the parts defined by
the data split.
pip$split()
# [[1]]
# step depends out keepOut group state
# <char> <list> <list> <lgcl> <char> <char>
# 1: data.setosa <data.frame[50x5]> FALSE setosa Done
# 2: fit.setosa data.setosa <lm[12]> FALSE setosa Done
# 3: coefs.setosa fit.setosa -0.5694327, 0.7985283 TRUE setosa Done
#
# [[2]]
# step depends out keepOut group state
# <char> <list> <list> <lgcl> <char> <char>
# 1: data.versicolor <data.frame[50x5]> FALSE versicolor Done
# 2: fit.versicolor data.versicolor <lm[12]> FALSE versicolor Done
# 3: coefs.versicolor fit.versicolor 0.8721460,0.3197193 TRUE versicolor Done
#
# [[3]]
# step depends out keepOut group state
# <char> <list> <list> <lgcl> <char> <char>
# 1: data.virginica <data.frame[50x5]> FALSE virginica Done
# 2: fit.virginica data.virginica <lm[12]> FALSE virginica Done
# 3: coefs.virginica fit.virginica 1.4463054,0.2318905 TRUE virginica Done
This function is especially useful if you want separate parts of the pipeline code in order to run them in parallel.
In some cases, we may want to (re-)combine the output of the pipeline parts. For example, we may want to combine the coefficients of the linear models into one table.
Let’s for this matter extend our pipeline by one step at the end.
pip <- pipe_new(
"my-pipeline"
) |>
pipe_add(
"fit",
function(
data = ~data,
xVar = "x",
yVar = "y"
) {
lm(paste(yVar, "~", xVar), data = data)
}
) |>
pipe_add(
"coefs",
function(
fit = ~fit
) {
coefficients(fit)
}
) |>
pipe_add(
"combine_coefs",
function(
coefs = ~coefs
) {
coefs |> do.call(rbind, args = _) |> as.data.frame()
},
keepOut = TRUE
)
Here is how the pipeline looks for now.
pip
# step depends out keepOut group state
# <char> <list> <list> <lgcl> <char> <char>
# 1: data [NULL] FALSE data New
# 2: fit data [NULL] FALSE fit New
# 3: coefs fit [NULL] FALSE coefs New
# 4: combine_coefs coefs [NULL] TRUE combine_coefs New
Next we again want to apply the set_data_split
function,
but we basically need to make sure that the pipeline is split up
except for the last step that combines everything. We achieve
this by using the toStep
parameter, which basically tells
the pipeline to split up to a certain step.
pip$set_data_split(split(iris, iris$Species), toStep = "coefs")
pip
# step depends out keepOut group state
# <char> <list> <list> <lgcl> <char> <char>
# 1: data.setosa [NULL] FALSE setosa New
# 2: fit.setosa data.setosa [NULL] FALSE setosa Outdated
# 3: coefs.setosa fit.setosa [NULL] FALSE setosa Outdated
# 4: data.versicolor [NULL] FALSE versicolor New
# 5: fit.versicolor data.versicolor [NULL] FALSE versicolor Outdated
# 6: coefs.versicolor fit.versicolor [NULL] FALSE versicolor Outdated
# 7: data.virginica [NULL] FALSE virginica New
# 8: fit.virginica data.virginica [NULL] FALSE virginica Outdated
# 9: coefs.virginica fit.virginica [NULL] FALSE virginica Outdated
# 10: combine_coefs <list[1]> [NULL] TRUE combine_coefs New
We see that the last step is not replicated for each data subset and it now contains a list of dependencies, namely:
pip$get_depends()[["combine_coefs"]]
# $coefs
# [1] "coefs.setosa" "coefs.versicolor" "coefs.virginica"
Graphically it becomes even more clear:
do.call(visNetwork, args = pip$get_graph()) |>
visHierarchicalLayout(direction = "LR", sortMethod = "directed")
Finally, let’s see how this plays out when we run the pipeline.
pip$set_params(list(xVar = "Sepal.Length", yVar = "Sepal.Width"))
pip$run()
# INFO [2024-12-22 21:50:29.084] Start run of 'my-pipeline' pipeline:
# INFO [2024-12-22 21:50:29.085] Step 1/10 data.setosa
# INFO [2024-12-22 21:50:29.087] Step 2/10 fit.setosa
# INFO [2024-12-22 21:50:29.090] Step 3/10 coefs.setosa
# INFO [2024-12-22 21:50:29.092] Step 4/10 data.versicolor
# INFO [2024-12-22 21:50:29.094] Step 5/10 fit.versicolor
# INFO [2024-12-22 21:50:29.097] Step 6/10 coefs.versicolor
# INFO [2024-12-22 21:50:29.099] Step 7/10 data.virginica
# INFO [2024-12-22 21:50:29.101] Step 8/10 fit.virginica
# INFO [2024-12-22 21:50:29.104] Step 9/10 coefs.virginica
# INFO [2024-12-22 21:50:29.106] Step 10/10 combine_coefs
# INFO [2024-12-22 21:50:29.107] Finished execution of steps.
# INFO [2024-12-22 21:50:29.108] Done.