Skip to content

Commit dbf6873

Browse files
author
jinlong
committed
新增并发执行单元
1 parent d640478 commit dbf6873

File tree

1 file changed

+77
-0
lines changed

1 file changed

+77
-0
lines changed

coroutine/multi_task.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
负责人员:InBug Team
3+
创建时间:2021/4/16
4+
程序用途:并发执行单元
5+
*/
6+
package coroutine
7+
8+
import (
9+
"fmt"
10+
"math"
11+
"sync"
12+
"time"
13+
)
14+
15+
type Task func(*sync.WaitGroup, chan bool, chan int, chan interface{}, ...interface{})
16+
type Iter func(*sync.WaitGroup, chan bool, chan int, chan interface{}, Task, ...interface{})
17+
type Done func(interface{}) error
18+
type Ing func(float64)
19+
type After func(int)
20+
21+
func MultiTask(
22+
totalTask, workerNumber int,
23+
iter Iter,
24+
task Task,
25+
done Done,
26+
ing Ing,
27+
after After,
28+
msgStart, msgIng, msgEnd string,
29+
data ...interface{},
30+
) {
31+
if totalTask == 0 {
32+
return
33+
}
34+
fmt.Println("****************<-START->****************")
35+
fmt.Println(msgStart)
36+
start := time.Now()
37+
if totalTask <= workerNumber {
38+
workerNumber = totalTask
39+
}
40+
var wg sync.WaitGroup
41+
worker := make(chan bool, workerNumber)
42+
counts := make(chan int)
43+
result := make(chan interface{}, workerNumber)
44+
ingTask, scaleTask, doneTask := 0, 0, 0
45+
size := int(math.Floor(float64(totalTask) / float64(10)))
46+
47+
go func() {
48+
iter(&wg, worker, counts, result, task, data...)
49+
wg.Wait()
50+
close(worker)
51+
}()
52+
53+
for {
54+
select {
55+
case number := <-counts:
56+
ingTask += number
57+
scaleTask = int((float64(ingTask) / float64(totalTask)) * 100)
58+
switch ingTask {
59+
case 1, size * 1, size * 2, size * 3, size * 4, size * 5, size * 6, size * 7, size * 8, size * 9, totalTask:
60+
ing(float64(scaleTask))
61+
}
62+
fmt.Println(fmt.Sprintf(`%s-进度%d%%,%d/%d`, msgIng, scaleTask, ingTask, totalTask))
63+
if ingTask == totalTask {
64+
after(doneTask)
65+
fmt.Println(fmt.Sprintf(`%s,执行总耗时:%f秒`, msgEnd, time.Since(start).Seconds()))
66+
fmt.Println("****************<-END->****************")
67+
return
68+
}
69+
case item := <-result:
70+
if err := done(item); err == nil {
71+
doneTask++
72+
}
73+
default:
74+
time.Sleep(1 * time.Second)
75+
}
76+
}
77+
}

0 commit comments

Comments
 (0)