Go并发时如何做到并发的控制?

遇到一个问题,我有一个列表,里面是很多的URL,我现在需要一条一条的访问并下载它们,用for循环搞定,但是如果一条一条的下载,那么速度将会非常非常慢。

于是考虑使用go做并发处理,可是却遇到一个问题,就是在并发的同时,如何来限制它的并发数量?

比如我有100条URL,我需要限制为每次下载20条,那么应该如何来写代码?

func main(){
    chs := make([]chan int, 20)

    for i,v := range urls {
        err := DownLoad(v)
        if err != nil {
            chs <- 0
        }else{
            chs <- 1
        }
    }

    select{
    case <-chs:
    }
}

我这样来写,但是发现数量并不是我想要的20个,而是全部,请问应该怎么写才对呢?

共 22 个回复


stevewang

大致的代码流程是这样的

func main() {
    const (
        GOROUTINE_COUNT = 20
        TASK_COUNT = 100
    )
    chReq := make(chan string, GOROUTINE_COUNT)
    chRes := make(chan error, GOROUTINE_COUNT)
    for i := 0; i < GOROUTINE_COUNT; i++ {
        go func() {
            url := <-chReq
            var err error
            // download
            chRes <- err
        }()
    }
    go func() {
        urls := make([]string, TASK_COUNT)
        // got urls
        for i := 0; i < TASK_COUNT; i++ {
            chReq <- urls[i]
        }
    }()
    for i := 0; i < TASK_COUNT; i++ {
        err := <-chRes
        // check error
        _ = err
    }
}
# 0

snake117

你这不对劲啊,怎么可能全部,明明是死锁啊。而且通道的容量设定不用切片的。看我下面这个。

package main

import "sync"

var (
    urls = []string{
        "01", "02", "03", "04", "05", "06",
        "07", "08", "09", "10", "11", "12",
        "13", "14", "15", "16", "17", "18",
        "19", "20", "21", "22", "23", "24",
        "25", "26", "27", "28", "29", "30",
    }
    wrg = sync.WaitGroup{}
    chs = make(chan int, 20)
    ans = make(chan string)
)

// 每个线程的操作
func work(v string) {
    defer func() {
        <-chs
        wrg.Done()
    }()
    ans <- v
}

func main() {
    // 用于分发的线程
    go func() {
        for _, v := range urls {
            chs <- 0 // 限制线程数
            wrg.Add(1)
            go work(v)
        }
        wrg.Wait() // 等待至所有分发出去的线程结束
        close(ans)
    }()
    // 收集各个线程返回的信息
    for each := range ans {
        println(`"` + each + `"`)
    }
}
# 1

snake117

@stevewang
你这样还是会有大量go程存在只是在一开始就阻塞住了,只有制定数量的go程在真正运作而已。

我觉得将限制数量的语句放在外面更好一点,当然这样这个分发过程就必须和主go程分开成两个go程。

# 2

stevewang

楼主的需求就是指定数量的goroutine工作。
你的代码在任务数比较少的时候可以工作,但是当任务数非常多的时候,就会创建相应数量的goroutine,非常的消耗系统资源(虽然goroutine很便宜但也不免费)。

# 3

ylqjgm

@stevewang 您好,我使用您的代码,却发现有错误,我代码如下:

package main

import (
    "fmt"
)

func main() {
    const (
        GOROUTINE_COUNT = 20
        TASK_COUNT      = 100
    )
    chReq := make(chan string, GOROUTINE_COUNT)
    chRes := make(chan int, GOROUTINE_COUNT)
    for i := 0; i < GOROUTINE_COUNT; i++ {
        go func() {
            url := <-chReq
            fmt.Println(url)
            chRes <- 0
        }()
    }
    go func() {
        urls := make([]string, TASK_COUNT)
        for i := 0; i < TASK_COUNT; i++ {
            urls[i] = fmt.Sprintf("http://www.%d.com", i)
        }
        // got urls
        for i := 0; i < TASK_COUNT; i++ {
            chReq <- urls[i]
        }
    }()
    for i := 0; i < TASK_COUNT; i++ {
        d := <-chRes
        // check error
        _ = d
    }
}

运行后却发现只输出了前20行的信息,而且在 d := <-chRes 时会报错,不知道什么原因?

# 4

ylqjgm

@snake117 您好,使用您的代码之后,在 chs <- 0 会报错

# 5

stevewang

我这个只是一个大致的代码示例,其中工作goroutine的代码没有写好(少了一个for循环),正确的代码应该是这样的

package main

import (
    "fmt"
)

func main() {
    const (
        GOROUTINE_COUNT = 20
        TASK_COUNT      = 100
    )
    chReq := make(chan string, GOROUTINE_COUNT)
    chRes := make(chan int, GOROUTINE_COUNT)
    for i := 0; i < GOROUTINE_COUNT; i++ {
        go func() {
            for {
                url := <-chReq
                fmt.Println(url)
                chRes <- 0
            }
        }()
    }
    go func() {
        urls := make([]string, TASK_COUNT)
        for i := 0; i < TASK_COUNT; i++ {
            urls[i] = fmt.Sprintf("http://www.%d.com", i)
        }
        // got urls
        for i := 0; i < TASK_COUNT; i++ {
            chReq <- urls[i]
        }
    }()
    for i := 0; i < TASK_COUNT; i++ {
        d := <-chRes
        // check error
        _ = d
    }
}
# 6

diarbao

循环的时候不是有i么
根据i的值进行time.Sleep()
业务上允许么?

# 7

ylqjgm

@diarbao 不好意思,如果这样的话就太不合理了,time.Sleep()怎么能确定我的业务的确处理好了呢?

# 9

snake117

@stevewang 你说错了吧,我的是先判断通道是否饱和才新增go程的,缺点是执行完的go程有可能不会被立刻销毁,你的是直接开指定个go程每个go程都循环执行函数。

# 10

snake117

@ylqjgm 不对啊,我这里正常执行,win7-386系统下。

除非你修改的时候那里弄错了。

# 11

laoshe

var c =make(chan string)
go func(){
for i := 0; i < 100; i++ {
        c<-url[i]  
}
close(c)
}()
func Work(c chan string)  {

    var w sync.WaitGroup
    w.Add(20)
    for i := 0; i < 20; i++ {
        go func() {
            for item := range c {
                //处理url
                  Do(item)
            }
            w.Done()
        }()
    }

        w.Wait()


}

work(c)
# 14

shanks

package main
import (
    "strconv"
    "runtime"
)


func main() {

    println("-----")

    chs := make([]string, 100)

    for index := range chs {
        chs[index]="abc--"+"**"+strconv.Itoa(index)
        //println(chs[index])
    }
    runtime.GOMAXPROCS(4)


    /////开 20个go程处理
    ch := make(chan string, 20)

    var count int = 20
    final_count := len(chs)
    //=========
    index := 0
    for {
        if (index>=len(chs)&&final_count==0) {////保证所有的GO程消费结束
            println("break")
            break
        }

        if count>0&&index<len(chs) {
            count-=1
            go Task(ch, chs[index])
            index+=1
        }else {
            cc := <-ch
            count+=1
            final_count-=1
            println("++"+strconv.Itoa(count)+"===="+cc)
        }

    }
    println(count)

}


func Task(ch chan string, str string) {

    //time.Sleep(2*time.Second)
    ch <- str

}

思路:

首先建立计数器20,GO程消费到0的时候,等待空余的线程释放计数器,这个时候使用channel来通信,这个时候再次进入,保证池子里面永远是打满20个线程
@ylqjgm

# 15

kzzhr

@stevewang

外层的for感觉是多余的吧,20个goroutine共用的一个channel

# 16

afocus

package main

import (
    "fmt"
    "time"
)

var urls = []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"}

func main() {
    //最多4个同时运行
    const MAX int = 4
    urlChan := make(chan string)
    lens := make(chan bool, MAX)
    go func() {
        for {
            select {
            case url := <-urlChan:
                go func() {
                    fmt.Println(url)
                    //模拟下载...
                    time.Sleep(time.Second * 2)
                    lens <- true
                }()
            }
        }
    }()
    for k, v := range urls {
        if k < MAX {
            lens <- true
        }
        <-lens
        urlChan <- v
    }
}
# 17

joe9i0

package main

import "sync"

type Pool struct {
    queue chan int
    wg    *sync.WaitGroup
}

// 创建并发控制池, 设置并发数量与总数量
func NewPool(cap, total int) *Pool {
    if cap < 1 {
        cap = 1
    }
    p := &Pool{
        queue: make(chan int, cap),
        wg:    new(sync.WaitGroup),
    }
    p.wg.Add(total)
    return p
}

// 向并发队列中添加一个
func (p *Pool) AddOne() {
    p.queue <- 1
}

// 并发队列中释放一个, 并从总数量中减去一个
func (p *Pool) DelOne() {
    <-p.queue
    p.wg.Done()
}

func main() {
    urls := []string{"a", "b", "c"}

    pool := NewPool(20, len(urls)) // 初始化一个容量为20的并发控制池

    for _, v := range urls {
        go func(url string) {
            pool.AddOne() // 向并发控制池中添加一个, 一旦池满则此处阻塞

            err := Download(url)
            if nil != err {
                println(err)
            }

            pool.DelOne() // 从并发控制池中释放一个, 之后其他被阻塞的可以进入池中
        }(v)
    }

    pool.wg.Wait()  // 等待所有下载全部完成
}

func Download(s string) error {
    // do download logic
    println(s)
    return nil
}
# 18

modood

楼上的 @joe9i0 代码可以实现这个功能,但是有一个问题,如果 urls 切片长度超级大,虽然控制了并发为 20 个,但是也会创建大量阻塞状态的 goroutine。

最好的实现方法是用 go 官方提供的这个包:
golang.org/x/sync/semaphore

查看文档和示例:https://godoc.org/golang.org/x/sync/semaphore#example-package--WorkerPool

挖个老坟,希望能帮到有需要的人。

# 19

voson

@modood 弱弱问一句,能加个好友么?

# 20

lllrrrccc

哎,看你们写的这么累,其实就两个要点,第一个:用sync.WaitGroup控制主程等待,第二点:在把全部url放入chan以后立刻把chan给close掉,这样worker协程在获取完url后会获得一个error,让你控制worker退出时机。
第一点我看有几个人都用到了,第二点怎么没人想到?都是教科书里的例子。。。。。。

# 21