Golang中国
package main

import (
    "container/list"
    "fmt"
    "reflect"
    "sync"
)

/**
 * 线程池
 */
type Pool struct {
    Number int
    Timeout chan bool
}

/**
 * 任务列表
 */
type Tasks struct {
    List *list.List
    Result chan int
    Mutex sync.Mutex
    NoticeStatus bool
}
func (this *Tasks) Init() {
    this.List = list.New()
    this.Result = make(chan int)
    this.NoticeStatus = false
}
func (this *Tasks) Add(value int) {
    this.List.PushBack(value)

    if this.NoticeStatus == false {
        this.NoticeStatus = true
    }
}
func (this *Tasks) Fetch() (interface{}) {
    val := this.List.Front()
    this.List.Remove(val)

    return val.Value
}

type Work struct {}
func (this Work) process(value interface{}, task *Tasks) {
    val := int(reflect.ValueOf(value).Int())
    task.Result <- val
}

/**
 * 处理
 */
type Process struct { }
func (this Process) Run(task *Tasks, pool *Pool) {
    for {
        len := task.List.Len()
        if len > 0 {
            value := task.Fetch()

            work := &Work{}
            work.process(value, task)
        } else {
            task.Mutex.Lock()
            pool.Timeout <- true
            if task.NoticeStatus == true {
                task.NoticeStatus = false
            }
            task.Mutex.Unlock()
        }
    }
}
/**
 * 调用run
 */
func (this Process) Invoke(task *Tasks, pool *Pool) {
    for i := 0; i < pool.Number; i++ {
        go this.Run(task, pool)
    }
}
/**
 * 监听任务队列
 */
func (this *Process) Listen(task *Tasks, pool *Pool) {
    for {
        if task.NoticeStatus == true {
            this.Invoke(task, pool)
        }
    }
}

func main() {
    task := &Tasks{}
    pool := &Pool{}
    process := &Process{}
    task.Init()
    pool.Number = 5
    pool.Timeout = make(chan bool, 1)

    // 监听任务队列
    go process.Listen(task, pool)
    // 写入测试数据
    for i := 1; i < 20; i++ {
        task.Add(i)
    }

    for {
        select {
        case <-task.Result:
            fmt.Printf("this is: %d\n", <-task.Result)
        case <-pool.Timeout:
            fmt.Println("timeout!")
        }
    }
}

我想要的是,输出所有1-20的数字.

第一次运行结果:
this is: 2
timeout!
this is: 5
this is: 7
timeout!
this is: 9
this is: 8
this is: 12
this is: 14
this is: 16
this is: 18

第二次运行结果:
this is: 2
timeout!
this is: 5
this is: 7
this is: 9
this is: 11
this is: 3
timeout!
this is: 13
this is: 16
this is: 18


cqheshuang 于 2016-12-02 17:09 修改
8 回复
Gkond
#1 Gkond • 2016-12-12 14:01

请问题主,这个问题解决了吗?我想了想也没找出问题在哪。可以的话求解惑。谢谢啦

cqheshuang
#2 cqheshuang • 2016-12-12 15:46

@Gkond 线程池分配的时候,最好为每个线程建立一个单独chan,这样最后输出的时候,分别select这几个chan结果

Gkond
#3 Gkond • 2016-12-12 15:53

@cqheshuang 谢谢!

Gkond
#4 Gkond • 2016-12-12 15:58

@cqheshuang 但是我还是不能理解为什么会输出结果丢失的情况。能大概讲下吗?麻烦了

cqheshuang
#5 cqheshuang • 2016-12-12 17:41

@Gkond 不难发现,我写的执行timeout是有问题,其实给你说的单独chan还不能解决,因为最主要的还是timeout问题,为什么呢?
比如这里开了5个goroutine,但是很有可能每个处理的任务数量是不同的,有些可能多,有些可能少,这样数量少的就会去设置timout,但是其它的并没有执行完,然后也去执行了timeout, 这样可能就会出现问题.
看了网上的实例,使用sync.group和wait去等待处理完成.
具体现在没有去测试,只是我现在发现有可能是这个问题,没时间,最近工作忙

Gkond
#6 Gkond • 2016-12-12 18:59

@cqheshuang 谢谢了。我自己再去好好看看.

huangxianghan
#7 huangxianghan • 2016-12-22 14:25

你确定你run方法里这两句:
len := task.List.Len()
value := task.Fetch()
是线程安全的?
container/list 包源码里并没有任何线程安全的保障哦。
this.List.Remove(val) 的时候多线程肯定出问题
list包源码是这样
func (l List) remove(e Element) *Element {
e.prev.next = e.next
e.next.prev = e.prev
e.next = nil // avoid memory leaks
e.prev = nil // avoid memory leaks
e.list = nil
l.len—
return e
}
next 和 prev 指针多线程竞态肯定出问题的。

1934C
#8 1934C • 2017-02-06 17:48

你的程序有两个问题哦
问题1、如楼上所说,List是线程不安全的,这个也可以解决,把Run函数
改成

func (this Process) Run(task *Tasks, pool *Pool) {
    for {
        task.Mutex.Lock()
        len := task.List.Len()
        if len > 0 {
            value := task.Fetch()
            work := &Work{}
            work.process(value, task)
        } else {
            pool.Timeout <- true
            if task.NoticeStatus == true {
                task.NoticeStatus = false
            } 
        }
        task.Mutex.Unlock()
    }
}

就是锁的范围扩大一点,保证List操作的原子性。

问题2、

fmt.Printf("this is: %d\n", <-task.Result)//是错的!

你在输出的时候,对task.Result chan又读取了一次,(case里已经读取过一次了)所以你输出的里面只有偶数,或者只有奇数的。

需要 登录 后方可回复, 如果你还没有账号你可以 注册 一个帐号。