Advanced: How to use pipeflow with split data sets

Motivation

A common scenario is to split a data set into subsets and then apply the same analysis to each part. In context of pipelines, this means that we would like to apply the same pipeline multiple times to each data subset. In additon, we may then want to combine parts of the individual output. As we will see, pipeflow provides a built-in function to handle this scenario.

Define pipeline

Let’s first define our pipeline, which, to keep matters simple, just fits a linear model and outputs the model coefficients.

library(pipeflow)

pip <- pipe_new(
        "my-pipeline"
    ) |>

    pipe_add(
        "fit",
        function(
            data = ~data,
            xVar = "x",
            yVar = "y"
        ) {
            lm(paste(yVar, "~", xVar), data = data)
        }
    ) |>

    pipe_add(
        "coefs",
        function(
            fit = ~fit
        ) {
            coefficients(fit)
        },
        keepOut = TRUE
    )

So our pipeline looks like this:

pip
#      step depends    out keepOut  group  state
#    <char>  <list> <list>  <lgcl> <char> <char>
# 1:   data         [NULL]   FALSE   data    New
# 2:    fit    data [NULL]   FALSE    fit    New
# 3:  coefs     fit [NULL]    TRUE  coefs    New

Or graphically:

library(visNetwork)
do.call(visNetwork, args = c(pip$get_graph(), list(height = 100))) |>
    visHierarchicalLayout(direction = "LR")

We use the iris data set as our working example.

head(iris)
#   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
# 1          5.1         3.5          1.4         0.2  setosa
# 2          4.9         3.0          1.4         0.2  setosa
# 3          4.7         3.2          1.3         0.2  setosa
# 4          4.6         3.1          1.5         0.2  setosa
# 5          5.0         3.6          1.4         0.2  setosa
# 6          5.4         3.9          1.7         0.4  setosa

First, we apply the pipeline to the whole data set.

pip$set_data(iris)
pip$set_params(list(xVar = "Sepal.Length", yVar = "Sepal.Width"))
pip$run()
# INFO  [2024-12-22 21:50:28.386] Start run of 'my-pipeline' pipeline:
# INFO  [2024-12-22 21:50:28.387] Step 1/3 data
# INFO  [2024-12-22 21:50:28.389] Step 2/3 fit
# INFO  [2024-12-22 21:50:28.392] Step 3/3 coefs
# INFO  [2024-12-22 21:50:28.393] Finished execution of steps.
# INFO  [2024-12-22 21:50:28.393] Done.
pip$collect_out()
# $coefs
#  (Intercept) Sepal.Length 
#    3.4189468   -0.0618848

Split data set

Next, we want to apply the pipeline to each species separately. One way to do this would be to use R’s split function. We can split it by the Species column and then run the pipeline for each subset. For example:

run_pipeline <- function(data) {
    pip$set_data(data)
    pip$run()
    pip$collect_out()
}

results <- lapply(split(iris, iris$Species), FUN = run_pipeline)
# INFO  [2024-12-22 21:50:28.463] Start run of 'my-pipeline' pipeline:
# INFO  [2024-12-22 21:50:28.464] Step 1/3 data
# INFO  [2024-12-22 21:50:28.466] Step 2/3 fit
# INFO  [2024-12-22 21:50:28.469] Step 3/3 coefs
# INFO  [2024-12-22 21:50:28.471] Finished execution of steps.
# INFO  [2024-12-22 21:50:28.471] Done.
# INFO  [2024-12-22 21:50:28.489] Start run of 'my-pipeline' pipeline:
# INFO  [2024-12-22 21:50:28.490] Step 1/3 data
# INFO  [2024-12-22 21:50:28.492] Step 2/3 fit
# INFO  [2024-12-22 21:50:28.495] Step 3/3 coefs
# INFO  [2024-12-22 21:50:28.496] Finished execution of steps.
# INFO  [2024-12-22 21:50:28.497] Done.
# INFO  [2024-12-22 21:50:28.499] Start run of 'my-pipeline' pipeline:
# INFO  [2024-12-22 21:50:28.500] Step 1/3 data
# INFO  [2024-12-22 21:50:28.502] Step 2/3 fit
# INFO  [2024-12-22 21:50:28.505] Step 3/3 coefs
# INFO  [2024-12-22 21:50:28.506] Finished execution of steps.
# INFO  [2024-12-22 21:50:28.506] Done.
results
# $setosa
# $setosa$coefs
#  (Intercept) Sepal.Length 
#   -0.5694327    0.7985283 
# 
# 
# $versicolor
# $versicolor$coefs
#  (Intercept) Sepal.Length 
#    0.8721460    0.3197193 
# 
# 
# $virginica
# $virginica$coefs
#  (Intercept) Sepal.Length 
#    1.4463054    0.2318905

Unfortunately, with this approach we had to create additional code that had to be run outside the pipeline framework. Let’s now see how to handle this scenario within the pipeline framework.

As a reminder, our pipeline looks like this:

pip
#      step depends                 out keepOut  group  state
#    <char>  <list>              <list>  <lgcl> <char> <char>
# 1:   data          <data.frame[50x5]>   FALSE   data   Done
# 2:    fit    data            <lm[12]>   FALSE    fit   Done
# 3:  coefs     fit 1.4463054,0.2318905    TRUE  coefs   Done

To deal with split data sets, we use the built-in function set_data_split.

splitData <- split(iris, iris$Species)
pip$set_data_split(splitData)

This function actually transforms the pipeline:

pip
#                step         depends                 out keepOut      group    state
#              <char>          <list>              <list>  <lgcl>     <char>   <char>
# 1:      data.setosa                              [NULL]   FALSE     setosa      New
# 2:       fit.setosa     data.setosa            <lm[12]>   FALSE     setosa Outdated
# 3:     coefs.setosa      fit.setosa 1.4463054,0.2318905    TRUE     setosa Outdated
# 4:  data.versicolor                              [NULL]   FALSE versicolor      New
# 5:   fit.versicolor data.versicolor            <lm[12]>   FALSE versicolor Outdated
# 6: coefs.versicolor  fit.versicolor 1.4463054,0.2318905    TRUE versicolor Outdated
# 7:   data.virginica                              [NULL]   FALSE  virginica      New
# 8:    fit.virginica  data.virginica            <lm[12]>   FALSE  virginica Outdated
# 9:  coefs.virginica   fit.virginica 1.4463054,0.2318905    TRUE  virginica Outdated

As we can see, the pipeline now has replicated itself for each data subset.

do.call(visNetwork, args = pip$get_graph()) |>
    visHierarchicalLayout(direction = "LR", sortMethod = "directed")

Note that set_data_split accepts any list of data frames, not just the output of split. Now let’s run this pipeline.

pip$run()
# INFO  [2024-12-22 21:50:28.740] Start run of 'my-pipeline' pipeline:
# INFO  [2024-12-22 21:50:28.741] Step 1/9 data.setosa
# INFO  [2024-12-22 21:50:28.743] Step 2/9 fit.setosa
# INFO  [2024-12-22 21:50:28.746] Step 3/9 coefs.setosa
# INFO  [2024-12-22 21:50:28.748] Step 4/9 data.versicolor
# INFO  [2024-12-22 21:50:28.750] Step 5/9 fit.versicolor
# INFO  [2024-12-22 21:50:28.752] Step 6/9 coefs.versicolor
# INFO  [2024-12-22 21:50:28.754] Step 7/9 data.virginica
# INFO  [2024-12-22 21:50:28.756] Step 8/9 fit.virginica
# INFO  [2024-12-22 21:50:28.759] Step 9/9 coefs.virginica
# INFO  [2024-12-22 21:50:28.760] Finished execution of steps.
# INFO  [2024-12-22 21:50:28.761] Done.
pip$collect_out()
# $setosa
#  (Intercept) Sepal.Length 
#   -0.5694327    0.7985283 
# 
# $versicolor
#  (Intercept) Sepal.Length 
#    0.8721460    0.3197193 
# 
# $virginica
#  (Intercept) Sepal.Length 
#    1.4463054    0.2318905

As we can see, the output is now the same as before but was obtained without the need to write additional code outside the pipeline framework.

Finally, as a side note there is another built-in function named split, which can be used to split the pipeline into its independent parts. While this works for any pipeline, in our working example, it naturally will split the pipeline into the parts defined by the data split.

pip$split()
# [[1]]
#            step     depends                   out keepOut  group  state
#          <char>      <list>                <list>  <lgcl> <char> <char>
# 1:  data.setosa                <data.frame[50x5]>   FALSE setosa   Done
# 2:   fit.setosa data.setosa              <lm[12]>   FALSE setosa   Done
# 3: coefs.setosa  fit.setosa -0.5694327, 0.7985283    TRUE setosa   Done
# 
# [[2]]
#                step         depends                 out keepOut      group  state
#              <char>          <list>              <list>  <lgcl>     <char> <char>
# 1:  data.versicolor                  <data.frame[50x5]>   FALSE versicolor   Done
# 2:   fit.versicolor data.versicolor            <lm[12]>   FALSE versicolor   Done
# 3: coefs.versicolor  fit.versicolor 0.8721460,0.3197193    TRUE versicolor   Done
# 
# [[3]]
#               step        depends                 out keepOut     group  state
#             <char>         <list>              <list>  <lgcl>    <char> <char>
# 1:  data.virginica                 <data.frame[50x5]>   FALSE virginica   Done
# 2:   fit.virginica data.virginica            <lm[12]>   FALSE virginica   Done
# 3: coefs.virginica  fit.virginica 1.4463054,0.2318905    TRUE virginica   Done

This function is especially useful if you want separate parts of the pipeline code in order to run them in parallel.

Combine output

In some cases, we may want to (re-)combine the output of the pipeline parts. For example, we may want to combine the coefficients of the linear models into one table.

Let’s for this matter extend our pipeline by one step at the end.

pip <- pipe_new(
        "my-pipeline"
    ) |>

    pipe_add(
        "fit",
        function(
            data = ~data,
            xVar = "x",
            yVar = "y"
        ) {
            lm(paste(yVar, "~", xVar), data = data)
        }
    ) |>

    pipe_add(
        "coefs",
        function(
            fit = ~fit
        ) {
            coefficients(fit)
        }
    )  |>

    pipe_add(
        "combine_coefs",
        function(
            coefs = ~coefs
        ) {
            coefs |> do.call(rbind, args = _) |> as.data.frame()
        },
        keepOut = TRUE
    )

Here is how the pipeline looks for now.

pip
#             step depends    out keepOut         group  state
#           <char>  <list> <list>  <lgcl>        <char> <char>
# 1:          data         [NULL]   FALSE          data    New
# 2:           fit    data [NULL]   FALSE           fit    New
# 3:         coefs     fit [NULL]   FALSE         coefs    New
# 4: combine_coefs   coefs [NULL]    TRUE combine_coefs    New

Next we again want to apply the set_data_split function, but we basically need to make sure that the pipeline is split up except for the last step that combines everything. We achieve this by using the toStep parameter, which basically tells the pipeline to split up to a certain step.

pip$set_data_split(split(iris, iris$Species), toStep = "coefs")
pip
#                 step         depends    out keepOut         group    state
#               <char>          <list> <list>  <lgcl>        <char>   <char>
#  1:      data.setosa                 [NULL]   FALSE        setosa      New
#  2:       fit.setosa     data.setosa [NULL]   FALSE        setosa Outdated
#  3:     coefs.setosa      fit.setosa [NULL]   FALSE        setosa Outdated
#  4:  data.versicolor                 [NULL]   FALSE    versicolor      New
#  5:   fit.versicolor data.versicolor [NULL]   FALSE    versicolor Outdated
#  6: coefs.versicolor  fit.versicolor [NULL]   FALSE    versicolor Outdated
#  7:   data.virginica                 [NULL]   FALSE     virginica      New
#  8:    fit.virginica  data.virginica [NULL]   FALSE     virginica Outdated
#  9:  coefs.virginica   fit.virginica [NULL]   FALSE     virginica Outdated
# 10:    combine_coefs       <list[1]> [NULL]    TRUE combine_coefs      New

We see that the last step is not replicated for each data subset and it now contains a list of dependencies, namely:

pip$get_depends()[["combine_coefs"]]
# $coefs
# [1] "coefs.setosa"     "coefs.versicolor" "coefs.virginica"

Graphically it becomes even more clear:

do.call(visNetwork, args = pip$get_graph()) |>
    visHierarchicalLayout(direction = "LR", sortMethod = "directed")

Finally, let’s see how this plays out when we run the pipeline.

pip$set_params(list(xVar = "Sepal.Length", yVar = "Sepal.Width"))
pip$run()
# INFO  [2024-12-22 21:50:29.084] Start run of 'my-pipeline' pipeline:
# INFO  [2024-12-22 21:50:29.085] Step 1/10 data.setosa
# INFO  [2024-12-22 21:50:29.087] Step 2/10 fit.setosa
# INFO  [2024-12-22 21:50:29.090] Step 3/10 coefs.setosa
# INFO  [2024-12-22 21:50:29.092] Step 4/10 data.versicolor
# INFO  [2024-12-22 21:50:29.094] Step 5/10 fit.versicolor
# INFO  [2024-12-22 21:50:29.097] Step 6/10 coefs.versicolor
# INFO  [2024-12-22 21:50:29.099] Step 7/10 data.virginica
# INFO  [2024-12-22 21:50:29.101] Step 8/10 fit.virginica
# INFO  [2024-12-22 21:50:29.104] Step 9/10 coefs.virginica
# INFO  [2024-12-22 21:50:29.106] Step 10/10 combine_coefs
# INFO  [2024-12-22 21:50:29.107] Finished execution of steps.
# INFO  [2024-12-22 21:50:29.108] Done.
pip$collect_out()
# $combine_coefs
#                  (Intercept) Sepal.Length
# coefs.setosa      -0.5694327    0.7985283
# coefs.versicolor   0.8721460    0.3197193
# coefs.virginica    1.4463054    0.2318905