Welcome to Pipeline’s documentation!¶
Introduction¶
What is Pipeline?¶
Pipeline is a PHP library for building reusable functions for manipulating values. Taking ideas from functional programming languages, the library promotes you to build small functions that each do one specific thing, and then combining them together to achieve what you want.
When Should I Use Pipeline?¶
Most of Pipeline’s functions’ functionality is available in built-in PHP functions. For example, if the only thing you want to do is to map an array, you’ll be better off using PHP’s array_map() instead of Map. Pipeline’s usefulness comes into play when you want to combine functions, e.g. build a function that reads input files, maps their data, and imports the data to a database.
Of course you could just write a normal PHP function, but Pipeline handles all function chaining for you, and makes it easier to understand the flow of data at a glance. Additionally every Pipeline function is lazy, meaning you (usually) don’t have to worry about large amount of data. Compare the following functions:
<?php
// Using normal PHP
function importToDatabase(array $files, int $batchSize = 100) {
foreach ($files as $file) {
$rows = $this->readRowsFromFile($file);
foreach ($rows as $idx => $row) {
$rows[$idx] = $this->mapFileData($row);
}
$chunks = array_chunk($rows, $batchSize);
foreach ($chunks as $chunk) {
$this->importRowsToDatabase($chunk);
}
foreach ($rows as $row) {
$logData = $this->formatLogData($data);
$this->logDataToFile($logData);
}
}
}
// Using Pipeline
function importToDatabase(array $files, int $batchSize = 100)
{
$fun = F::Compose([
F::Expand([$this, 'readRowsFromFile']),
F::Map([$this, 'mapFileData']),
F::Fork([
[
F::Chunk($batchSize),
F::Observe([$this, 'importRowsToDatabase']),
],
[
F::Map([$this, 'formatLogData']),
F::Observe([$this, 'logDataToFile']),
]
])
]);
iterator_to_array($fun($files));
}
Getting Started¶
Requirements¶
- PHP 5.5 or greater.
Installation¶
You can install Pipeline with Composer by adding "webbhuset/pipeline": "*"
to your
composer.json or by running composer require webbhuset/pipeline
in your terminal.
Building Pipeline Functions¶
The easiest way to construct Pipeline functions is to use the Constructor class. It has a static functions for constructing every Pipeline function, allowing you to construct all functions in a concise manner with a single use statement.
<?php
use Webbhuset\Pipeline\Constructor as F;
$take = F::Take(2);
$input = [1, 3, 5, 7, 9];
echo json_encode(iterator_to_array($take($input)));
// Output: [1,3]
It is of course also possible to construct the functions with new
if that is preferred.
Using the Result¶
Since all Pipeline functions return a Generator they are actually not executed until the generator is iterated, and you cannot iterate the result more than once. If you need to iterate more than once, consider converting the result to an array using iterator_to_array().
List of Pipeline Functions¶
Pipeline functions are divided into two types: Value Functions and Flow Functions. Value functions work with the values and modify them, while Flow functions are wrappers for other functions allowing you to combine multiple functions into one.
Value Functions¶
Chunk¶
Chunk ( int $size )
Groups all input values into arrays with size values in each array. The last output array may contain less than size values. This is similar to PHP’s array_chunk().
Examples¶
Example #1¶
Basic usage.
<?php
use Webbhuset\Pipeline\Constructor as F;
$chunk = F::Chunk(3);
$input = [1, 2, [3, 4], 'five', 6, null, 8];
echo json_encode(iterator_to_array($chunk($input)));
// Output: [[1,2,[3,4]],['five', 6, null],[8]]
Example #2¶
Using Chunk to batch database queries.
<?php
use Webbhuset\Pipeline\Constructor as F;
$fun = F::Compose([
F::Chunk(100),
F::Map(function ($ids) {
return $dbConnection->fetchValuesByIds($ids);
}),
F::Expand(),
]);
See Also¶
- GroupWhile - Group input values based on a callback function.
Drop¶
Drop ( int $amount )
Discards the first amount input values, returning the remaining values.
DropWhile¶
DropWhile ( callable $callback )
Discards input values while the callback function returns true, then all remaining values are returned. The first value returned is the one for which the callback function returned false.
Expand¶
Expand ( [ callable $callback ] )
Passes every input value to the callback function, yielding from the resulting Generator.
Parameters¶
Examples¶
Example #1¶
Basic usage with default callback.
<?php
use Webbhuset\Pipeline\Constructor as F;
$expand = F::Expand();
$input = [[1, 2, 3], [4, 5, 6]];
echo json_encode(iterator_to_array($expand($input)));
// Output: [1,2,3,4,5,6]
Example #2¶
Using Expand to create values of the cartesian product of two arrays.
<?php
use Webbhuset\Pipeline\Constructor as F;
$expand = F::Expand(function ($value) {
foreach ($value['foos'] as $foo) {
foreach ($value['bars'] as $bar) {
yield [
'foo' => $foo,
'bar' => $bar,
];
}
}
});
$input = [
[
'foos' => [1, 2, 3],
'bars' => [4, 5],
]
];
echo json_encode(iterator_to_array($expand($input)), JSON_PRETTY_PRINT);
/**
* Output:
* [
* {
* "foo": 1,
* "bar": 4
* },
* {
* "foo": 1,
* "bar": 5
* },
* {
* "foo": 2,
* "bar": 4
* },
* {
* "foo": 2,
* "bar": 5
* },
* {
* "foo": 3,
* "bar": 4
* },
* {
* "foo": 3,
* "bar": 5
* }
* ]
*/
Filter¶
Filter ( [ callable $callback ] )
Passes every input value to the callback function, returning only values for which the callback returns true.
Parameters¶
- callback
bool callback ( mixed $value )
If no callback is supplied, all values that equal false (after being converted to bool) are removed.
- value
- The current value.
GroupWhile¶
GroupWhile ( callable $callback )
Groups input values into arrays based on the callback function.
Parameters¶
Examples¶
Example #1¶
Basic usage example, grouping repeated values.
<?php
use Webbhuset\Pipeline\Constructor as F;
$group = F::GroupWhile(function ($value, $batch) {
return !$batch // Add to batch if empty
|| $value == reset($batch); // Add if value is the same as values in batch
});
$input = [1, 1, 1, 2, 3, 3, 1, 2, 2];
echo json_encode(iterator_to_array($group($input)));
// Output: [[1,1,1],[2],[3,3],[1],[2,2]]
Example #2¶
Group values in groups where the sum of their values is >= 10, and uses Filter to filter any trailing group.
<?php
use Webbhuset\Pipeline\Constructor as F;
$fun = F::Compose([
F::GroupWhile(function ($value, $batch) {
return array_sum($batch) < 10;
}),
F::Filter(function ($values) {
return array_sum($values) >= 10;
})
]);
$input = [1, 2, 3, 4, 5, 6, 7, 8, 9];
echo json_encode(iterator_to_array($fun($input)));
// Output: [[1,2,3,4],[5,6],[7,8]]
Map¶
Map ( callable $callback )
Modify every input value with a callback function.
Observe¶
Observe ( callable $callback )
Passes every input value to the callback function without modifying it.
Reduce¶
Reduce ( callable $callback [, mixed $initialValue = [] ] )
Reduces all input values to a single value with the callback function.
Parameters¶
- callback
mixed callback ( mixed $value , mixed $carry )
- value
- The current value that is being reduced.
- carry
- The return value of previous iteration.
- initialValue
- The initial value of $carry.
Examples¶
Example #1¶
Basic usage example, summing all input values.
<?php
use Webbhuset\Pipeline\Constructor as F;
$reduce = F::Reduce(function ($value, $carry) {
return $carry + $value;
}, 0);
$input = [1, 4, 8, 15];
echo json_encode(iterator_to_array($reduce($input)));
// Output: [28]
Example #2¶
Using Reduce, Filter, and Map to write to a file and return the path. Reduce opens a file pointer, and Map closes it and returns the path.
Since Reduce always returns a value even with an empty input, we use Filter to prevent an attempt to close a non-existent pointer and returning the path when input is empty.
<?php
use Webbhuset\Pipeline\Constructor as F;
$write = F::Compose([
F::Reduce(function ($value, $carry) {
if (!$carry) {
$path = 'file.txt';
$carry = [
'path' => $path,
'file' => fopen($path, 'w'),
];
}
fwrite($carry['file'], $value . "\n");
return $carry;
}),
F::Filter(),
F::Map(function ($carry) {
fclose($carry['file']);
return $carry['path'];
}),
]);
iterator_to_array($write(range(1, 10)));
See Also¶
- GroupWhile - Group input values based on a callback function.
- Scan - Reduce input values, returning the intermediate results.
Scan¶
Scan ( callable $callback [, mixed $initialValue = [] ] )
Reduces all input values to a single value with the callback function, while returning the intermediate result of every iteration.
Parameters¶
- callback
mixed callback ( mixed $value , mixed $carry )
- value
- The current value that is being reduced.
- carry
- The return value of previous iteration.
- initialValue
- The initial value of $carry.
Examples¶
Example #1¶
Basic usage example, summing all input values.
<?php
use Webbhuset\Pipeline\Constructor as F;
$scan = F::Scan(function ($value, $carry) {
return $carry + $value;
}, 0);
$input = [1, 4, 8, 15];
echo json_encode(iterator_to_array($scan($input)));
// Output: [0,1,5,13,28]
Example #2¶
Building a string, and using Drop to skip the initial value.
<?php
use Webbhuset\Pipeline\Constructor as F;
$function = F::Compose([
F::Scan(function ($value, $carry) {
return $carry . $value;
}, ''),
F::Drop(1),
]);
$input = ['Hello', ' ', 'world', '!'];
echo json_encode(iterator_to_array($function($input)));
// Output: ["Hello","Hello ","Hello world","Hello world!"]
Take¶
Take ( int $amount )
Returs the first amount input values, discarding the remaining values.
TakeWhile¶
TakeWhile ( callable $callback )
Returns input values while the callback function returns true, then all remaining values are ignored. The last value returned is the the one previous to the value for which the callback function returned false.
- Chunk - Group input values in groups of a specified size.
- Drop - Discard the first N input values and return the rest.
- DropWhile - Discard input values while callback returns true.
- Expand - Yields one or more values from every input value.
- Filter - Discard input values based on a callback.
- GroupWhile - Group input values based on a callback.
- Map - Modify every input value with a callback.
- Observe - Send input values to a callback without modifying them.
- Reduce - Reduce all input values to a single value.
- Scan - Reduce all input values, returning the intermediate results.
- Take - Return the first N input values and discard the rest.
- TakeWhile - Return input values while callback returns true.
Flow Functions¶
Compose¶
Compose ( array $functions )
Chains functions together, using the output of each function as input to the next. Output is the output of the last function in the chain.
Parameters¶
- functions
- Array of functions that should be chained. If the array is multidimensional and/or contains another Compose it will be flattened.
Examples¶
Example #1¶
Basic usage.
<?php
use Webbhuset\Pipeline\Constructor as F;
$compose = F::Compose([
F::Map('trim'),
F::Filter('is_numeric'),
F::Map('intval'),
]);
$input = ['1', ' 23 ', 'hello', '4.444', 5.75, '+12e3'];
echo json_encode(iterator_to_array($compose($input)));
// Output: [1,23,4,5,12000]
Example #2¶
Demonstrating how multidimensional arrays and other Composes are flattened.
<?php
use Webbhuset\Pipeline\Constructor as F;
function getMapFunction()
{
return [
F::Map('trim'),
F::Map('ucwords'),
];
}
$compose = F::Compose([
getMapFunction(),
F::Compose([
F::Filter('is_numeric')
]),
[],
[
[
F::Map('intval'),
],
],
]);
/**
* Result function would look like this:
* [
* F::Map('trim'),
* F::Map('ucwords'),
* F::Filter('is_numeric'),
* F::Map('intval'),
* ]
*/
Defer¶
Defer ( callable $callback )
Delays construction of the inner function until execution. Input is sent to inner function, and output is the output of the inner function. Defer is useful if for example constructing the inner function is resource-intensive.
Parameters¶
Examples¶
Example #1¶
Basic usage example.
<?php
use Webbhuset\Pipeline\Constructor as F;
$defer = F::Defer(function () {
sleep(1); // Sleep to simulate fetching IDs from a database.
$idMap = [
'alpha' => 1,
'beta' => 2,
'gamma' => 3,
];
return F::Map(function ($value) use ($idMap) {
return $idMap[$value] ?? null;
});
});
$input = ['alpha', 'gamma', 'omega'];
echo json_encode(iterator_to_array($defer($input)));
// Output: [1,3,null]
Fork¶
Fork ( array $functions )
Sends every input value to every inner function. Output is the output of every inner function.
Parameters¶
Examples¶
Example #1¶
Basic usage example.
<?php
use Webbhuset\Pipeline\Constructor as F;
$fork = F::Fork([
F::Map(function ($value) {
return $value * 2;
}),
F::Map(function ($value) {
return str_repeat('a', $value);
}),
]);
$input = [1, 2, 3, 4, 5];
echo json_encode(iterator_to_array($fork($input)));
// Output: [2,"a",4,"aa",6,"aaa",8,"aaaa",10,"aaaaa"]
Multiplex¶
Multiplex ( callable $callback , array $functions )
Sends every input value to one of the inner functions based on the result of the callback function. Output is the output of the inner functions.
Parameters¶
- callback
scalar callback ( mixed $value )
A callback that returns the key of the function to which the value should be passed.
Examples¶
Example #1¶
Basic usage example.
<?php
use Webbhuset\Pipeline\Constructor as F;
$multiplex = F::Multiplex(
function ($value) {
return $value % 2 == 0 ? 'even' : 'odd';
},
[
'even' => F::Map(function ($value) {
return $value / 2;
}),
'odd' => F::Map(function ($value) {
return $value * 2;
}),
]
);
$input = [1, 2, 3, 4, 5, 6];
echo json_encode(iterator_to_array($multiplex($input)));
// Output: [2,1,6,2,10,3]
Example #2¶
By leaving a branch empty we can run functions for only some values.
<?php
use Webbhuset\Pipeline\Constructor as F;
$multiplex = F::Multiplex(
function ($value) {
return $value <= 10;
},
[
true => F::Map(function ($value) {
return $value * 2;
}),
false => [],
]
);
$input = [1, 22, 3, 44, 5];
echo json_encode(iterator_to_array($multiplex($input)));
// Output: [2,22,6,44,10]
Useful Information¶
Pipeline and States¶
While some of Pipeline’s functions are stateless (e.g. Map and Expand), others (e.g. Take and Reduce) keep a state in some situations. Specifically, if $keepState (the second argument to __invoke() for every function) is true then these function will keep a state even after their generator has been fully iterated. As an example, Take will remember how many values it has returned:
<?php
use Webbhuset\Pipeline\Constructor as F;
$take = F::Take(3);
echo json_encode(iterator_to_array($take([1,2,3,4], true)));
echo "\n";
echo json_encode(iterator_to_array($take([5,6,7,8], true)));
/**
* Output:
* [1,2,3]
* []
*/
Since $keepState defaults to false this is normally not something that you have to worry about when using Pipeline, and is mostly used internally by some of the Flow Functions.
States can also cause issues if multiple generators are created from these functions and iterated simultaneously (even if $keepState is false). This is most easily circumvented by using a Defer to build the functions separately for every input:
<?php
use Webbhuset\Pipeline\Constructor as F;
$take = F::Take(5);
$takeWithDefer = F::Defer(function () {
return F::Take(5);
});
$input1 = range(0, 9);
$input2 = range(10, 19);
function loop($gen1, $gen2) {
while ($gen1->valid() || $gen2->valid()) {
if ($gen1->valid()) {
echo 'Gen1: ' . $gen1->current() . "\n";
$gen1->next();
}
if ($gen2->valid()) {
echo 'Gen2: ' . $gen2->current() . "\n";
$gen2->next();
}
}
}
$gen1 = $take($input1);
$gen2 = $take($input2);
echo "Without Defer, unexpected results:\n";
loop($gen1, $gen2);
$gen1 = $takeWithDefer($input1);
$gen2 = $takeWithDefer($input2);
echo "With Defer, expected results:\n";
loop($gen1, $gen2);
/**
* Output:
* Without Defer, unexpected results:
* Gen1: 0
* Gen2: 10
* Gen1: 1
* Gen2: 11
* Gen1: 2
* Gen1: 3
* Gen1: 4
* Gen1: 5
* Gen1: 6
* Gen1: 7
* With Defer, expected results:
* Gen1: 0
* Gen2: 10
* Gen1: 1
* Gen2: 11
* Gen1: 2
* Gen2: 12
* Gen1: 3
* Gen2: 13
* Gen1: 4
* Gen2: 14
*/
Separating Flow and Logic¶
Just like when writing normal functions it is preferable to have multiple functions with descriptive names instead of adding everything into one function that does everything. Additionally this promotes separating functions responsible for the flow of data and functions responsible for manipulating data. Compare the following:
Using named functions:
<?php
class myFunctionBuilder
{
public function buildMyFunction()
{
return [
$this->mapRows(),
$this->filterInvalid(),
$this->insertToDatabase(),
];
}
protected function mapRows()
{
return F::Map(function ($value) {
// ...
});
}
protected function filterInvalid()
{
return F::Filter(function ($value) {
// ...
});
}
protected function insertToDatabase()
{
return F::Observe(function ($value) {
// ...
});
}
}
Without named functions:
<?php
class myFunctionBuilder
{
public function buildMyFunction()
{
return [
F::Map(function ($value) {
// ...
}),
F::Filter(function ($value) {
// ...
}),
F::Observe(function ($value) {
// ...
}),
];
}
}