Pipeline is yet another common pattern we see in concurrent programming.
Stages for a data processing pipline can be for example,
- Read the file
- Process the data
- Transform the data
- Write the data to another file
Let’s check an example of such a pipeline in Golang.
package pipeline
import (
    "fmt"
    "time"
)
type FileData struct {
    src string
    data string
}
// readData stage where we read the file
// it returns a channel of FileData to use in the next stage
func readData(filePaths []string) <-chan FileData {
    out := make(chan FileData)
    go func() {
        for _, path := range filePaths {
            time.Sleep(1 * time.Second) // Simulate file read
            out <- FileData{src: path, data: "data"}
        }
        close(out)
    }()
    return out
}
// processData stage where we process the data
// it returns a channel of processed FileData to use in the next stage
func processData(in <-chan FileData) <-chan FileData {
    out := make(chan FileData)
    go func() {
        for data := range in {
            time.Sleep(1 * time.Second) // Simulate data processing
            out <- FileData{src: data.src, data: data.data + " processed"}
        }
        close(out)
    }()
    return out
}
// transformData stage where we transform the data
// it returns a channel of transformed FileData to use in the next stage
func transformData(in <-chan FileData) <-chan FileData {
    out := make(chan FileData)
    go func() {
        for data := range in {
            time.Sleep(1 * time.Second) // Simulate data transformation
            out <- FileData{src: data.src, data: data.data + " transformed"}
        }
        close(out)
    }()
    return out
}
// writeData stage where we write the data to a file
func writeData(in <-chan FileData) bool {
    for data := range in {
        time.Sleep(1 * time.Second) // Simulate file write
        fmt.Printf("Writing data to %s\n", data.src)
    }
    return true
}
// RunPipeline runs the pipeline
// each stage is a go routine
// and they are connected via channels
func RunPipeline(filePaths []string) {
    readDataCh := readData(filePaths)
    processDataCh := processData(readDataCh)
    transformDataCh := transformData(processDataCh)
    writeData(transformDataCh)
}
In the above example, we have implemented a pipeline with 4 stages. Each stage is a go routine and they are connected via channels.
We can run the pipeline by calling RunPipeline function with the file paths.