Welcome to Pipeline’s documentation!

Introduction

What is Pipeline?

Pipeline is a PHP library for building reusable functions for manipulating values. Taking ideas from functional programming languages, the library promotes you to build small functions that each do one specific thing, and then combining them together to achieve what you want.

When Should I Use Pipeline?

Most of Pipeline’s functions’ functionality is available in built-in PHP functions. For example, if the only thing you want to do is to map an array, you’ll be better off using PHP’s array_map() instead of Map. Pipeline’s usefulness comes into play when you want to combine functions, e.g. build a function that reads input files, maps their data, and imports the data to a database.

Of course you could just write a normal PHP function, but Pipeline handles all function chaining for you, and makes it easier to understand the flow of data at a glance. Additionally every Pipeline function is lazy, meaning you (usually) don’t have to worry about large amount of data. Compare the following functions:

<?php

// Using normal PHP
function importToDatabase(array $files, int $batchSize = 100) {
    foreach ($files as $file) {
        $rows = $this->readRowsFromFile($file);
        foreach ($rows as $idx => $row) {
            $rows[$idx] = $this->mapFileData($row);
        }

        $chunks = array_chunk($rows, $batchSize);
        foreach ($chunks as $chunk) {
            $this->importRowsToDatabase($chunk);
        }

        foreach ($rows as $row) {
            $logData = $this->formatLogData($data);
            $this->logDataToFile($logData);
        }
    }
}

// Using Pipeline
function importToDatabase(array $files, int $batchSize = 100)
{
    $fun = F::Compose([
        F::Expand([$this, 'readRowsFromFile']),
        F::Map([$this, 'mapFileData']),
        F::Fork([
            [
                F::Chunk($batchSize),
                F::Observe([$this, 'importRowsToDatabase']),
            ],
            [
                F::Map([$this, 'formatLogData']),
                F::Observe([$this, 'logDataToFile']),
            ]
        ])
    ]);

    iterator_to_array($fun($files));
}

Getting Started

Requirements

  • PHP 5.5 or greater.

Installation

You can install Pipeline with Composer by adding "webbhuset/pipeline": "*" to your composer.json or by running composer require webbhuset/pipeline in your terminal.

Building Pipeline Functions

The easiest way to construct Pipeline functions is to use the Constructor class. It has a static functions for constructing every Pipeline function, allowing you to construct all functions in a concise manner with a single use statement.

<?php
use Webbhuset\Pipeline\Constructor as F;

$take = F::Take(2);

$input = [1, 3, 5, 7, 9];

echo json_encode(iterator_to_array($take($input)));

// Output: [1,3]

It is of course also possible to construct the functions with new if that is preferred.

Using the Result

Since all Pipeline functions return a Generator they are actually not executed until the generator is iterated, and you cannot iterate the result more than once. If you need to iterate more than once, consider converting the result to an array using iterator_to_array().

List of Pipeline Functions

Pipeline functions are divided into two types: Value Functions and Flow Functions. Value functions work with the values and modify them, while Flow functions are wrappers for other functions allowing you to combine multiple functions into one.

Value Functions

Chunk

Chunk ( int $size )

Groups all input values into arrays with size values in each array. The last output array may contain less than size values. This is similar to PHP’s array_chunk().

Parameters
size
How many values should be in every chunk.
Examples
Example #1

Basic usage.

<?php
use Webbhuset\Pipeline\Constructor as F;

$chunk = F::Chunk(3);

$input = [1, 2, [3, 4], 'five', 6, null, 8];

echo json_encode(iterator_to_array($chunk($input)));

// Output: [[1,2,[3,4]],['five', 6, null],[8]]
Example #2

Using Chunk to batch database queries.

<?php
use Webbhuset\Pipeline\Constructor as F;

$fun = F::Compose([
    F::Chunk(100),
    F::Map(function ($ids) {
        return $dbConnection->fetchValuesByIds($ids);
    }),
    F::Expand(),
]);
See Also
  • GroupWhile - Group input values based on a callback function.

Drop

Drop ( int $amount )

Discards the first amount input values, returning the remaining values.

Parameters
amount
The amount of input values that should be discarded.
Examples
Example #1

Basic usage example.

<?php
use Webbhuset\Pipeline\Constructor as F;

$drop = F::Drop(2);

$input = [1, 3, 5, 7, 9];

echo json_encode(iterator_to_array($drop($input)));

// Output: [5,7,9]
See Also
  • DropWhile - Discard input values while a callback function returns true.
  • Filter - Discard input values based on a callback function.
  • Take - Return a specific amount of input values.

DropWhile

DropWhile ( callable $callback )

Discards input values while the callback function returns true, then all remaining values are returned. The first value returned is the one for which the callback function returned false.

Parameters
callback
bool callback ( mixed $value )
value
The current value.
Examples
Example #1

Basic usage example.

<?php
use Webbhuset\Pipeline\Constructor as F;

$dropWhile = F::DropWhile(function ($value) {
    return $value < 7;
});

$input = [1, 3, 5, 7, 9, 2, 4, 6, 8];

echo json_encode(iterator_to_array($dropWhile($input)));

// Output: [7,9,2,4,6,8]
See Also
  • Drop - Discard a specific amount of input values.
  • Filter - Discard input values based on a callback.
  • TakeWhile - Return input values while callback returns true.

Expand

Expand ( [ callable $callback ] )

Passes every input value to the callback function, yielding from the resulting Generator.

Parameters
callback
Generator callback ( mixed $value )

Callback function that returns a Generator.

If no callback is supplied, every input value is passed to yield from.

value
The current value.
Examples
Example #1

Basic usage with default callback.

<?php

use Webbhuset\Pipeline\Constructor as F;

$expand = F::Expand();

$input = [[1, 2, 3], [4, 5, 6]];

echo json_encode(iterator_to_array($expand($input)));

// Output: [1,2,3,4,5,6]
Example #2

Using Expand to create values of the cartesian product of two arrays.

<?php

use Webbhuset\Pipeline\Constructor as F;

$expand = F::Expand(function ($value) {
    foreach ($value['foos'] as $foo) {
        foreach ($value['bars'] as $bar) {
            yield [
                'foo' => $foo,
                'bar' => $bar,
            ];
        }
    }
});

$input = [
    [
        'foos' => [1, 2, 3],
        'bars' => [4, 5],
    ]
];

echo json_encode(iterator_to_array($expand($input)), JSON_PRETTY_PRINT);

/**
 * Output:
 *  [
 *      {
 *          "foo": 1,
 *          "bar": 4
 *      },
 *      {
 *          "foo": 1,
 *          "bar": 5
 *      },
 *      {
 *          "foo": 2,
 *          "bar": 4
 *      },
 *      {
 *          "foo": 2,
 *          "bar": 5
 *      },
 *      {
 *          "foo": 3,
 *          "bar": 4
 *      },
 *      {
 *          "foo": 3,
 *          "bar": 5
 *      }
 *  ]
 */
See Also
  • Map - Modify every input value with a callback function.

Filter

Filter ( [ callable $callback ] )

Passes every input value to the callback function, returning only values for which the callback returns true.

Parameters
callback
bool callback ( mixed $value )

If no callback is supplied, all values that equal false (after being converted to bool) are removed.

value
The current value.
Examples
Example #1

Basic usage example.

<?php
use Webbhuset\Pipeline\Constructor as F;

$filter = F::Filter(function ($value) {
    return $value % 2 == 0;
});

$input = [1, 2, 3, 4, 5, 6, 7, 8];

echo json_encode(iterator_to_array($filter($input)));

// Output: [2,4,6,8]
See Also
  • Drop - Discard a specific amount of input values.
  • DropWhile - Discard input values while a callback function returns true.

GroupWhile

GroupWhile ( callable $callback )

Groups input values into arrays based on the callback function.

Parameters
callback
bool callback ( mixed $value , array $batch )

If the callback function returns true the current value is added to the current batch. If it returns false it is added to a new batch.

value
The current value.
batch
The current batch of grouped values.
Examples
Example #1

Basic usage example, grouping repeated values.

<?php

use Webbhuset\Pipeline\Constructor as F;

$group = F::GroupWhile(function ($value, $batch) {
    return !$batch                  // Add to batch if empty
        || $value == reset($batch); // Add if value is the same as values in batch
});

$input = [1, 1, 1, 2, 3, 3, 1, 2, 2];

echo json_encode(iterator_to_array($group($input)));

// Output: [[1,1,1],[2],[3,3],[1],[2,2]]
Example #2

Group values in groups where the sum of their values is >= 10, and uses Filter to filter any trailing group.

<?php

use Webbhuset\Pipeline\Constructor as F;

$fun = F::Compose([
    F::GroupWhile(function ($value, $batch) {
        return array_sum($batch) < 10;
    }),
    F::Filter(function ($values) {
        return array_sum($values) >= 10;
    })
]);

$input = [1, 2, 3, 4, 5, 6, 7, 8, 9];

echo json_encode(iterator_to_array($fun($input)));

// Output: [[1,2,3,4],[5,6],[7,8]]
See Also
  • Chunk - Group input values into arrays of a specified size.

Map

Map ( callable $callback )

Modify every input value with a callback function.

Parameters
callback
mixed callback ( mixed $value )
value
The current value.
Examples
Example #1

Basic usage example.

<?php
use Webbhuset\Pipeline\Constructor as F;

$map = F::Map(function ($value) {
    return $value * 2;
});

$input = [1, 2, 5, 12];

echo json_encode(iterator_to_array($map($input)));

// Output: [2,4,10,24]
See Also
  • Expand - Yield one or more values from every input value.
  • Observe - Send every input value to a callback function without modifying it.

Observe

Observe ( callable $callback )

Passes every input value to the callback function without modifying it.

Parameters
callback
void callback ( mixed $value )
value
The current value.
Examples
Example #1

Basic usage example.

<?php

use Webbhuset\Pipeline\Constructor as F;

$observe = F::Observe(function ($value) {
    echo $value . "\n";
});

$input = [1, 2, 3, 4, 5, 6];

iterator_to_array($observe($input));

/**
 * Output:
 *  1
 *  2
 *  3
 *  4
 *  5
 *  6
 */
See Also
  • Map - Modify every input value with a callback function.

Reduce

Reduce ( callable $callback [, mixed $initialValue = [] ] )

Reduces all input values to a single value with the callback function.

Parameters
callback
mixed callback ( mixed $value , mixed $carry )
value
The current value that is being reduced.
carry
The return value of previous iteration.
initialValue
The initial value of $carry.
Examples
Example #1

Basic usage example, summing all input values.

<?php
use Webbhuset\Pipeline\Constructor as F;

$reduce = F::Reduce(function ($value, $carry) {
    return $carry + $value;
}, 0);

$input = [1, 4, 8, 15];

echo json_encode(iterator_to_array($reduce($input)));

// Output: [28]
Example #2

Using Reduce, Filter, and Map to write to a file and return the path. Reduce opens a file pointer, and Map closes it and returns the path.

Since Reduce always returns a value even with an empty input, we use Filter to prevent an attempt to close a non-existent pointer and returning the path when input is empty.

<?php
use Webbhuset\Pipeline\Constructor as F;

$write = F::Compose([
    F::Reduce(function ($value, $carry) {
        if (!$carry) {
            $path = 'file.txt';
            $carry = [
                'path' => $path,
                'file' => fopen($path, 'w'),
            ];
        }

        fwrite($carry['file'], $value . "\n");

        return $carry;
    }),
    F::Filter(),
    F::Map(function ($carry) {
        fclose($carry['file']);

        return $carry['path'];
    }),
]);

iterator_to_array($write(range(1, 10)));
See Also
  • GroupWhile - Group input values based on a callback function.
  • Scan - Reduce input values, returning the intermediate results.

Scan

Scan ( callable $callback [, mixed $initialValue = [] ] )

Reduces all input values to a single value with the callback function, while returning the intermediate result of every iteration.

Parameters
callback
mixed callback ( mixed $value , mixed $carry )
value
The current value that is being reduced.
carry
The return value of previous iteration.
initialValue
The initial value of $carry.
Examples
Example #1

Basic usage example, summing all input values.

<?php
use Webbhuset\Pipeline\Constructor as F;

$scan = F::Scan(function ($value, $carry) {
    return $carry + $value;
}, 0);

$input = [1, 4, 8, 15];

echo json_encode(iterator_to_array($scan($input)));

// Output: [0,1,5,13,28]
Example #2

Building a string, and using Drop to skip the initial value.

<?php
use Webbhuset\Pipeline\Constructor as F;

$function = F::Compose([
    F::Scan(function ($value, $carry) {
        return $carry . $value;
    }, ''),
    F::Drop(1),
]);

$input = ['Hello', ' ', 'world', '!'];

echo json_encode(iterator_to_array($function($input)));

// Output: ["Hello","Hello ","Hello world","Hello world!"]
See Also
  • Reduce - Reduce input values, returning only the final value.

Take

Take ( int $amount )

Returs the first amount input values, discarding the remaining values.

Parameters
amount
The amount of input values to take.
Examples
Example #1

Basic usage example.

<?php
use Webbhuset\Pipeline\Constructor as F;

$take = F::Take(2);

$input = [1, 3, 5, 7, 9];

echo json_encode(iterator_to_array($take($input)));

// Output: [1,3]
See Also
  • Drop - Discard a specific amount of input values.
  • TakeWhile - Return input values while a callback function returns true.

TakeWhile

TakeWhile ( callable $callback )

Returns input values while the callback function returns true, then all remaining values are ignored. The last value returned is the the one previous to the value for which the callback function returned false.

Parameters
callback
bool callback ( mixed $value )
value
The current value.
Examples
Example #1

Basic usage example.

<?php
use Webbhuset\Pipeline\Constructor as F;

$takeWhile = F::TakeWhile(function ($value) {
    return $value < 7;
});

$input = [1, 3, 5, 7, 9, 2, 4, 6, 8];

echo json_encode(iterator_to_array($takeWhile($input)));

// Output: [1,3,5]
See Also
  • Take - Take a specific amount of input values.
  • DropWhile - Discard input values while a callback function returns true.
  • Chunk - Group input values in groups of a specified size.
  • Drop - Discard the first N input values and return the rest.
  • DropWhile - Discard input values while callback returns true.
  • Expand - Yields one or more values from every input value.
  • Filter - Discard input values based on a callback.
  • GroupWhile - Group input values based on a callback.
  • Map - Modify every input value with a callback.
  • Observe - Send input values to a callback without modifying them.
  • Reduce - Reduce all input values to a single value.
  • Scan - Reduce all input values, returning the intermediate results.
  • Take - Return the first N input values and discard the rest.
  • TakeWhile - Return input values while callback returns true.

Flow Functions

Compose

Compose ( array $functions )

Chains functions together, using the output of each function as input to the next. Output is the output of the last function in the chain.

Parameters
functions
Array of functions that should be chained. If the array is multidimensional and/or contains another Compose it will be flattened.
Examples
Example #1

Basic usage.

<?php
use Webbhuset\Pipeline\Constructor as F;

$compose = F::Compose([
    F::Map('trim'),
    F::Filter('is_numeric'),
    F::Map('intval'),
]);

$input = ['1', '  23 ', 'hello', '4.444', 5.75, '+12e3'];

echo json_encode(iterator_to_array($compose($input)));

// Output: [1,23,4,5,12000]
Example #2

Demonstrating how multidimensional arrays and other Composes are flattened.

<?php
use Webbhuset\Pipeline\Constructor as F;

function getMapFunction()
{
    return [
        F::Map('trim'),
        F::Map('ucwords'),
    ];
}

$compose = F::Compose([
    getMapFunction(),
    F::Compose([
        F::Filter('is_numeric')
    ]),
    [],
    [
        [
            F::Map('intval'),
        ],
    ],
]);

/**
 * Result function would look like this:
 *  [
 *       F::Map('trim'),
 *       F::Map('ucwords'),
 *       F::Filter('is_numeric'),
 *       F::Map('intval'),
 *  ]
 */
See Also
  • Fork - Send every input value to multiple functions.

Defer

Defer ( callable $callback )

Delays construction of the inner function until execution. Input is sent to inner function, and output is the output of the inner function. Defer is useful if for example constructing the inner function is resource-intensive.

Parameters
callback
mixed callback ( void )

A callback that returns either a FunctionInterface or an array. If the callback returns an array it will be passed as argument to Compose, which is then used as the inner function.

Examples
Example #1

Basic usage example.

<?php
use Webbhuset\Pipeline\Constructor as F;

$defer = F::Defer(function () {
    sleep(1); // Sleep to simulate fetching IDs from a database.
    $idMap = [
        'alpha' => 1,
        'beta'  => 2,
        'gamma' => 3,
    ];

    return F::Map(function ($value) use ($idMap) {
        return $idMap[$value] ?? null;
    });
});

$input = ['alpha', 'gamma', 'omega'];

echo json_encode(iterator_to_array($defer($input)));

// Output: [1,3,null]

Fork

Fork ( array $functions )

Sends every input value to every inner function. Output is the output of every inner function.

Parameters
functions
An array of Pipeline functions. If any of the elements in the array is an array it will be passed as argument to Compose, which is then used as the function.
Examples
Example #1

Basic usage example.

<?php
use Webbhuset\Pipeline\Constructor as F;

$fork = F::Fork([
    F::Map(function ($value) {
        return $value * 2;
    }),
    F::Map(function ($value) {
        return str_repeat('a', $value);
    }),
]);

$input = [1, 2, 3, 4, 5];

echo json_encode(iterator_to_array($fork($input)));

// Output: [2,"a",4,"aa",6,"aaa",8,"aaaa",10,"aaaaa"]
See Also
  • Compose - Chain functions together sequentially.
  • Multiplex - Send every input value to one function based on a callback function.

Multiplex

Multiplex ( callable $callback , array $functions )

Sends every input value to one of the inner functions based on the result of the callback function. Output is the output of the inner functions.

Parameters
callback
scalar callback ( mixed $value )

A callback that returns the key of the function to which the value should be passed.

functions
An array of Pipeline functions. If any of the elements in the array is an array it will be passed as argument to Compose, which is then used as the function.
Examples
Example #1

Basic usage example.

<?php

use Webbhuset\Pipeline\Constructor as F;

$multiplex = F::Multiplex(
    function ($value) {
        return $value % 2 == 0 ? 'even' : 'odd';
    },
    [
        'even' => F::Map(function ($value) {
            return $value / 2;
        }),
        'odd' => F::Map(function ($value) {
            return $value * 2;
        }),
    ]
);

$input = [1, 2, 3, 4, 5, 6];

echo json_encode(iterator_to_array($multiplex($input)));

// Output: [2,1,6,2,10,3]
Example #2

By leaving a branch empty we can run functions for only some values.

<?php

use Webbhuset\Pipeline\Constructor as F;

$multiplex = F::Multiplex(
    function ($value) {
        return $value <= 10;
    },
    [
        true => F::Map(function ($value) {
            return $value * 2;
        }),
        false => [],
    ]
);

$input = [1, 22, 3, 44, 5];

echo json_encode(iterator_to_array($multiplex($input)));

// Output: [2,22,6,44,10]
See Also
  • Fork - Send every input value to multiple functions.
  • Compose - Chain functions together.
  • Defer - Delay construction of a function.
  • Fork - Send every input value to multiple functions.
  • Multiplex - Send every input value to one function based on a callback.

Useful Information

Pipeline and States

While some of Pipeline’s functions are stateless (e.g. Map and Expand), others (e.g. Take and Reduce) keep a state in some situations. Specifically, if $keepState (the second argument to __invoke() for every function) is true then these function will keep a state even after their generator has been fully iterated. As an example, Take will remember how many values it has returned:

<?php
use Webbhuset\Pipeline\Constructor as F;

$take = F::Take(3);

echo json_encode(iterator_to_array($take([1,2,3,4], true)));
echo "\n";
echo json_encode(iterator_to_array($take([5,6,7,8], true)));

/**
 * Output:
 *  [1,2,3]
 *  []
 */

Since $keepState defaults to false this is normally not something that you have to worry about when using Pipeline, and is mostly used internally by some of the Flow Functions.

States can also cause issues if multiple generators are created from these functions and iterated simultaneously (even if $keepState is false). This is most easily circumvented by using a Defer to build the functions separately for every input:

<?php
use Webbhuset\Pipeline\Constructor as F;

$take = F::Take(5);

$takeWithDefer = F::Defer(function () {
    return F::Take(5);
});

$input1 = range(0, 9);
$input2 = range(10, 19);

function loop($gen1, $gen2) {
    while ($gen1->valid() || $gen2->valid()) {
        if ($gen1->valid()) {
            echo 'Gen1: ' . $gen1->current() . "\n";
            $gen1->next();
        }
        if ($gen2->valid()) {
            echo 'Gen2: ' . $gen2->current() . "\n";
            $gen2->next();
        }
    }
}

$gen1 = $take($input1);
$gen2 = $take($input2);

echo "Without Defer, unexpected results:\n";
loop($gen1, $gen2);

$gen1 = $takeWithDefer($input1);
$gen2 = $takeWithDefer($input2);

echo "With Defer, expected results:\n";
loop($gen1, $gen2);

/**
 * Output:
 *  Without Defer, unexpected results:
 *  Gen1: 0
 *  Gen2: 10
 *  Gen1: 1
 *  Gen2: 11
 *  Gen1: 2
 *  Gen1: 3
 *  Gen1: 4
 *  Gen1: 5
 *  Gen1: 6
 *  Gen1: 7
 *  With Defer, expected results:
 *  Gen1: 0
 *  Gen2: 10
 *  Gen1: 1
 *  Gen2: 11
 *  Gen1: 2
 *  Gen2: 12
 *  Gen1: 3
 *  Gen2: 13
 *  Gen1: 4
 *  Gen2: 14
 */

Separating Flow and Logic

Just like when writing normal functions it is preferable to have multiple functions with descriptive names instead of adding everything into one function that does everything. Additionally this promotes separating functions responsible for the flow of data and functions responsible for manipulating data. Compare the following:

Using named functions:

<?php
class myFunctionBuilder
{
    public function buildMyFunction()
    {
        return [
            $this->mapRows(),
            $this->filterInvalid(),
            $this->insertToDatabase(),
        ];
    }

    protected function mapRows()
    {
        return F::Map(function ($value) {
            // ...
        });
    }

    protected function filterInvalid()
    {
        return F::Filter(function ($value) {
            // ...
        });
    }

    protected function insertToDatabase()
    {
        return F::Observe(function ($value) {
            // ...
        });
    }
}

Without named functions:

<?php
class myFunctionBuilder
{
    public function buildMyFunction()
    {
        return [
            F::Map(function ($value) {
                // ...
            }),
            F::Filter(function ($value) {
                // ...
            }),
            F::Observe(function ($value) {
                // ...
            }),
        ];
    }
}