Pipeline is yet another common pattern we see in concurrent programming.
Stages for a data processing pipline can be for example,
- Read the file
- Process the data
- Transform the data
- Write the data to another file
Let’s check an example of such a pipeline in Golang.
package pipeline
import (
"fmt"
"time"
)
type FileData struct {
src string
data string
}
// readData stage where we read the file
// it returns a channel of FileData to use in the next stage
func readData(filePaths []string) <-chan FileData {
out := make(chan FileData)
go func() {
for _, path := range filePaths {
time.Sleep(1 * time.Second) // Simulate file read
out <- FileData{src: path, data: "data"}
}
close(out)
}()
return out
}
// processData stage where we process the data
// it returns a channel of processed FileData to use in the next stage
func processData(in <-chan FileData) <-chan FileData {
out := make(chan FileData)
go func() {
for data := range in {
time.Sleep(1 * time.Second) // Simulate data processing
out <- FileData{src: data.src, data: data.data + " processed"}
}
close(out)
}()
return out
}
// transformData stage where we transform the data
// it returns a channel of transformed FileData to use in the next stage
func transformData(in <-chan FileData) <-chan FileData {
out := make(chan FileData)
go func() {
for data := range in {
time.Sleep(1 * time.Second) // Simulate data transformation
out <- FileData{src: data.src, data: data.data + " transformed"}
}
close(out)
}()
return out
}
// writeData stage where we write the data to a file
func writeData(in <-chan FileData) bool {
for data := range in {
time.Sleep(1 * time.Second) // Simulate file write
fmt.Printf("Writing data to %s\n", data.src)
}
return true
}
// RunPipeline runs the pipeline
// each stage is a go routine
// and they are connected via channels
func RunPipeline(filePaths []string) {
readDataCh := readData(filePaths)
processDataCh := processData(readDataCh)
transformDataCh := transformData(processDataCh)
writeData(transformDataCh)
}
In the above example, we have implemented a pipeline with 4 stages. Each stage is a go routine and they are connected via channels.
We can run the pipeline by calling RunPipeline
function with the file paths.