go 读取本地大文本数据,跑多协程同时写入数据库,随机发生重复写入的情况。

package main

import (
	"bufio"
	"fmt"
	"github.com/jmoiron/sqlx"
	"io"
	"os"
	"strings"

	//执行mysql包的sql驱动
	_ "github.com/go-sql-driver/mysql"
)

func init() {
	//建立数据可链接
	db, e = sqlx.Connect("mysql", "root:123456@tcp(127.0.0.1:3306)/gowk")
	HandleError(e, "sqlx.Connect")
	defer db.Close()

	//必要时建表
	_, e = db.Exec("create table if not exists t_bigData_kf(id int primary key auto_increment,name varchar(30),idcard char(18));")
	HandleError(e, "db.Exec create table")
	fmt.Println("数据表已创建")

	//初始存取数据管道
	chkp = make(chan *kfperson, 10000000)
	chbd = make(chan *kfperson, 100)
	//开启1000条协程写入数据,
	for i := 0; i < 100; i++ {
		go insertKftable()
	}
	//准备一个处理脏数据的协程
	go writeBadTxt()

	//创建一个失败数据Txt
	A_bad, _ = os.OpenFile("E:/假数据/清洗数据/A_bad.txt", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0754)
	defer A_bad.Close()

	//读取大数据文本
	file, e := os.Open("E:/假数据/清洗数据/A.txt")
	HandleError(e, "os.Open")
	defer file.Close()
	//以缓存的方式读出,区别于ioutil.ReadFile()一次性读入文件到内存,不适合大文本的读取
	reader := bufio.NewReader(file)
	//kfps := new(kfperson)
	for {
		linStr, e := reader.ReadString('\n')
		if e == io.EOF {
			close(chbd)
			close(chkp)
			break
		}
		HandleError(e, "reader.ReadString")
		linSplic := strings.Split(linStr, ",")
		//kfps.Name, kfps.Idcard = linSplic[0], linSplic[1]

		name,idcard:=linSplic[0], linSplic[1]
		kfps := kfperson{Name: name, Idcard: idcard}

		chkp <- &kfps
	}

}

func writeBadTxt() {
	writer := bufio.NewWriter(A_bad)
	for ps := range chbd {
		writer.WriteString(ps.Name + "," + ps.Idcard + "\n")
		fmt.Println("***************************************", "Gid:", GetGID())
	}
	writer.Flush()
}

func insertKftable() {

	for ps := range chkp {
		_, e := db.Exec("insert into t_bigData_kf(name,idcard) values(?,?);", ps.Name, ps.Idcard)
		if e == nil {
			fmt.Println("chkp:", len(chkp), "\t", "Gid:", GetGID())
		} else {
			chbd <- ps
		}
	}

}

func main() {
	fmt.Println("game over!")
}

这些代码我看了一个晚上,我找不出问题在哪里?请求高手指点迷津!

共 1 个回复


fising

同看不出问题。你这是单协程读取文件,应该不存在重复读取的问题啊。

大胆猜一句:是不是A.txt文件本身就有重复行?

楼主check一下?

# 0