Go - Concurrency Pipeline
Pipeline is yet another common pattern we see in concurrent programming. Stages for a data processing pipline can be for example, Read the file Process the data Transform the data Write the data to another file Let’s check an example of such a pipeline in Golang. package pipeline import ( "fmt" "time" ) type FileData struct { src string data string } // readData stage where we read the file // it returns a channel of FileData to use in the next stage func readData(filePaths []string) <-chan FileData { out := make(chan FileData) go func() { for _, path := range filePaths { time.Sleep(1 * time.Second) // Simulate file read out <- FileData{src: path, data: "data"} } close(out) }() return out } // processData stage where we process the data // it returns a channel of processed FileData to use in the next stage func processData(in <-chan FileData) <-chan FileData { out := make(chan FileData) go func() { for data := range in { time.Sleep(1 * time.Second) // Simulate data processing out <- FileData{src: data.src, data: data.data + " processed"} } close(out) }() return out } // transformData stage where we transform the data // it returns a channel of transformed FileData to use in the next stage func transformData(in <-chan FileData) <-chan FileData { out := make(chan FileData) go func() { for data := range in { time.Sleep(1 * time.Second) // Simulate data transformation out <- FileData{src: data.src, data: data.data + " transformed"} } close(out) }() return out } // writeData stage where we write the data to a file func writeData(in <-chan FileData) bool { for data := range in { time.Sleep(1 * time.Second) // Simulate file write fmt.Printf("Writing data to %s\n", data.src) } return true } // RunPipeline runs the pipeline // each stage is a go routine // and they are connected via channels func RunPipeline(filePaths []string) { readDataCh := readData(filePaths) processDataCh := processData(readDataCh) transformDataCh := transformData(processDataCh) writeData(transformDataCh) } In the above example, we have implemented a pipeline with 4 stages. Each stage is a go routine and they are connected via channels. ...