goroutines使用通道同步的问题
我正在使用以下代码来同步 goroutine。最近在调查一个错误时,我发现下面的代码并不总是有效。大约五分之一的失败。频道quit在我之前收到消息out频道。我能够在我的本地(不是在 go-playground)和 k8s 环境中一致地重现这个问题。作为一种解决方法,我现在使用sync.Map同步。
有没有办法修复下面的代码?
package main
import (
"fmt"
"io/ioutil"
"log"
"os"
"path"
"sync"
"sync/atomic"
"time"
)
func main() {
//test setup
filePaths := []string{
path.Join(os.TempDir(), fmt.Sprint("f1-", time.Now().Nanosecond())),
path.Join(os.TempDir(), fmt.Sprint("f2-", time.Now().Nanosecond())),
path.Join(os.TempDir(), fmt.Sprint("f3-", time.Now().Nanosecond())),
path.Join(os.TempDir(), fmt.Sprint("f4-", time.Now().Nanosecond())),
path.Join(os.TempDir(), fmt.Sprint("f5-", time.Now().Nanosecond())),
path.Join(os.TempDir(), fmt.Sprint("f6-", time.Now().Nanosecond())),
}
for _, filePath := range filePaths {
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
log.Fatal(err)
}
_, err = f.WriteString("There are many variations of passages of Lorem Ipsum available, but the majority have suffered alteration in some form, by injected humour, or randomised words which don't look even slightly believable. If you are going to use a passage of Lorem Ipsum, you need to be sure there isn't anything embarrassing hidden in the middle of text. All the Lorem Ipsum generators on the Internet tend to repeat predefined chunks as necessary, making this the first true generator on the Internet. It uses a dictionary of over 200 Latin words, combined with a handful of model sentence structures, to generate Lorem Ipsum which looks reasonable. The generated Lorem Ipsum is therefore always free from repetition, injected humour, or non-characteristic words etc.")
if err != nil {
log.Fatal(err)
}
err = f.Close()
if err != nil {
log.Fatal(err)
}
}
for {
responses, err := getContents(filePaths)
if err != nil {
log.Fatal(err)
}
if len(responses) != len(filePaths) {
log.Fatalf("Responses Does not Match, need %d Got %d",len(filePaths), len(responses))
}
time.Sleep(1 * time.Second)
}
}
func getContents(fileNames []string) ([][]byte, error) {
wg := sync.WaitGroup{}
var responseBytes [][]byte
out := make(chan []byte, 10)
defer close(out)
quit := make(chan int)
var opsFileRead uint64
var opsChannelGot uint64
defer close(quit)
go func() {
for {
select {
case bts := <-out:
if len(bts) > 0 {
atomic.AddUint64(&opsChannelGot, 1)
responseBytes = append(responseBytes, bts)
}
break
case <-quit:
fmt.Printf("I quit, i read %d, i got %dn", opsFileRead, opsChannelGot)
return
}
}
}()
for _, fileName := range fileNames {
wg.Add(1)
go func(fName string, out chan []byte, wg *sync.WaitGroup) {
defer wg.Done()
data, err := ioutil.ReadFile(fName)
if err != nil {
log.Fatal(err)
}
out <- data
atomic.AddUint64(&opsFileRead, 1)
}(fileName, out, &wg)
}
wg.Wait()
quit <- 1
return responseBytes, nil
}
回答
当接收到退出时,输出通道可以包含值。通过制作无缓冲通道来修复:
out := make(chan []byte)
这确保在退出之前收到从工作人员发送的值:
- 在无缓冲通道上发送/接收发生在调用之前
wg.Done() wg.Done()之前发生的所有调用wg.Wait()返回wg.Wait()在值被发送到之前返回quit
因此,在值被out发送到之前接收值quit。
另一种方法是关闭out通道以通知结果收集器工作人员已完成:
func getContents(fileNames []string) ([][]byte, error) {
wg := sync.WaitGroup{}
var responseBytes [][]byte
out := make(chan []byte)
var opsFileRead uint64
var opsChannelGot uint64
for _, fileName := range fileNames {
wg.Add(1)
go func(fName string, out chan []byte, wg *sync.WaitGroup) {
defer wg.Done()
data, err := ioutil.ReadFile(fName)
if err != nil {
log.Fatal(err)
}
out <- data
atomic.AddUint64(&opsFileRead, 1)
}(fileName, out, &wg)
}
// Close out after workers are done.
go func() {
wg.Wait()
close(out)
}()
// Loop over outputs until done.
for bts := range out {
if len(bts) > 0 {
atomic.AddUint64(&opsChannelGot, 1)
responseBytes = append(responseBytes, bts)
}
}
fmt.Printf("I quit, i read %d, i got %dn", opsFileRead, opsChannelGot)
return responseBytes, nil
}