Go資深工程師講解(慕課) 004

004

goroutine

package main

import (
    "fmt"
    "time"
)

func main() {
    for i:=0;i<10;i++{
        go func(i int) {
            fmt.Printf("Hello from goroutine %d \n",i) // i/o操作可以交出控制權,手動操作交出控制權runtime.Gosched()
        }(i)
    }
    // main直接退出了,還沒來得急執行協程
  time.Sleep(time.Millisecond)
}
  • 輕量級“線程”
  • 非搶佔式多任務處理,由協程主動交出控制權(線程是搶佔式任務處理)
  • 編譯器/解釋器/虛擬機層面的多任務
  • 多個協程可能在一個或多個線程上運行
  • 手動交出控制權 runtime.Gosched()

子程序是協程的一個特例,協程是雙向的

線程與協程
goroutine 協程
誰和誰放在一個線程,這個不用管由調度器來管理

  • 在函數前加上 go 關鍵字,就能送給調度器運行
  • 不需要 在定義時區分是否是異步函數
  • 調度器在合適的點進行切換
  • 使用 race 來檢測數據訪問衝突

goroutine 可能的切換點

  • I/O,select
  • channel
  • 等待鎖
  • 函數調用
  • runtime.Gosched() 手動

channel

goroutine 和 goroutine 之間可以開出很多個雙向通道,它是 goroutine 和 goroutine 之間的交互。有發,就得有收,要不就會卡在那(deadlock)。

package main

import (
    "fmt"
    "time"
)

func worker(c chan int)  {
    for{
        n:=<-c
        fmt.Println(n)
    }
}

func chanDemo()  {
    c:=make(chan int) //var c chan int 這只是定義了一個c,這是一個chan類型內部值是int的變量,並沒有生成一個chan 要生成,需要用make
    //用go關鍵詞
    /*go func() {
        for{
            n:=<-c //收數據
            fmt.Println(n)
        }
    }()*/
    go worker(c)
    c<-1 //有發就得有收
    c<-2
    time.Sleep(time.Millisecond) //簡單發一個sleep這樣這兩個值都打印出來,要不c還沒打印,main函數已經結束,收的2還來不及打印
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %d\n",id,<-c)
    }
}

func chanDemo()  {
    c:=make(chan int)
    go worker(0,c)
    c<-1 //有發就得有收
    c<-2
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %c\n",id,<-c)
    }
}

func chanDemo()  {
    //c:=make(chan int)
    //go worker(0,c)
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=make(chan int)
        go worker(i,channels[i])
    }
    //c<-1 //有發就得有收
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

channel 做爲返回值,返回一個 channel

package main

import (
    "fmt"
    "time"
)

//
func createWorker(id int) chan int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func chanDemo()  {
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=createWorker(i)
    }
    //c<-1 //有發就得有收
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

有時候我們要明確收發功能,所以我們可以在返回類型上加箭頭,(可能收,可能發,注意位置)chan<-收數據 <-chan 發數據,所以func createWorker(id int) chan<- int {var channels [10]chan<- int 改成有箭頭

bufferedChan

我們建立 channel 有發就得有收,要不程序就會 deadlock,如下

func bufferedChannel(){
    c:=make(chan int)
    c <-1
}
func main(){
    bufferedChannel() //程序掛掉,因爲發了沒人收
}

加入緩存區

func bufferedChannel(){
    c:=make(chan int,3)
    c <-1
    c <-2
    c <-3
    c <-4 //發4會報錯,沒人收,緩存大小是3
}
func main(){
    bufferedChannel() //程序掛掉,因爲發了沒人收
}
package main

import (
    "fmt"
    "time"
)

func createWorker(id int) chan<- int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func bufferedChannel(){
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    time.Sleep(time.Millisecond)
}

func main() {
    bufferedChannel()
}

channel 是可以 close 掉的,可以通知發完了(由發送方來完成 close)

func channelClose()  {
//    數據有明確的結尾的話,發送方可以通知close
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    close(c) //關閉 一旦close 接收方就會收到0值 空串或0
    time.Sleep(time.Millisecond) //收一毫秒的空值
}
// 那我們接收方也要做一個判斷,如果有值就收沒值就退出唄
func createWorker(id int) chan<- int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            n,ok:=<-c
            if !ok {
                break
            }
            fmt.Printf("Workder %d received %c\n",id,n)

        }
    }()
    return c
}
// 還有一種方法,我們可以通過range來判斷是否結束了
func createWorker(id int) chan<- int {
    c := make(chan int)
    //真正的worker
    go func() {
        for n := range c {
            fmt.Printf("Workder %d received %c\n", id, n)
        }
    }()
    return c
}

channel buffered channel 發送方 close() 接收方兩種判斷結束的方式n,ok:=<-crange CSP 模型
不要能過共享內存來通信,通過通信來共享內存

等待任務

package main

import (
    "fmt"
)

//創建結構
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        done <- true //輸出一個完成true
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        <-workers[i].done //讀狀態出來
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        <-workers[i].done // 這樣寫會按順序執行,讀個小寫a 輸出一個
    }
}

func main() {
    chanDemo()
}

希望是把所有任務都發出去後,再來等 20 done

package main

import (
    "fmt"
)

//創建結構
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        go func() {
            done <- true //輸出一個完成true
        }()
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        //<-workers[i].done //讀狀態出來
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        //<-workers[i].done // 這樣寫會按順序執行,讀個小寫a 輸出一個
    }
     //等所有發送都完成後再收done的狀態
    for _,workder:=range workers { //這樣會出錯,看小寫的打印完,打寫的沒有打印,這是因爲所有的channel發都是block的,小寫的完了有10個done,沒人收又發10個大寫的,就循環等待了,有個簡單快速的解決方法就是,將done<-true放到另goroutine中去做 ,或者將大小寫分開,小寫發完,done完再去搞大寫
        <-workder.done
        <-workder.done
    }
}

func main() {
    chanDemo()
}

WaitGroup

我們再改寫一下上個例子

package main

import (
    "fmt"
    "sync"
)

//創建結構
type workder struct {
    in chan int
    wg *sync.WaitGroup //共享一個
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        wg: wg,
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) //我們有20個任務就開20個
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

再抽象一下 將 wg 封閉到結構的函數中

package main

import (
    "fmt"
    "sync"
)

//創建結構
type workder struct {
    in chan int
    done func()
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        done: func() {
            wg.Done()
        },
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) //我們有20個任務就開20個
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

channel 來實現樹的遍歷

// main.go
func main(){
    c:=root.TraverseWithChannel()
    maxNode:=0
    for node:=range c{
        if node.Value>maxNode {
            maxNode = node.Value
        }
    }
    fmt.Println("Max node value:", maxNode)
}
// traverse.go
package tree

import "fmt"

//中序遍歷 先左在中在右
func (node *Node) Traverse() {
    /*if node==nil {
        return
    }
    node.Left.Traverse()
    //只能作打印,
    node.Print()
    node.Right.Traverse()*/

    node.TraverseFunc(func(n *Node) {
        n.Print()
    })
    fmt.Println()
}

//改造一下
func (node *Node) TraverseFunc(f func(*Node)) {
    if node == nil {
        return
    }
    node.Left.TraverseFunc(f)
    f(node)
    node.Right.TraverseFunc(f)
}
// 返回一個channel
func (node *Node) TraverseWithChannel() chan *Node{
    out:=make(chan *Node)
    go func() {
        node.TraverseFunc(func(node *Node) {
            out <- node
        })
        close(out) //注意關閉
    }()
    return out
}

select 的使用

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func main() {
    //var c1, c2 chan int
    //    從c1或c2裏面搜 誰快搜誰 ,實現非阻塞式的獲取就用select
    var c1, c2 = generator(), generator()
    for {
        select {
        case n := <-c1:
            fmt.Println("Received from c1:", n)
        case n := <-c2:
            fmt.Println("Received from c2:", n)
        default:
            fmt.Println("No value received")

        }
    }

}
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            //隨機sleep1500毫秒 使得c1,c2輸出的速度不一樣
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil 走的default
    //    從c1或c2裏面搜 誰來的快從誰裏取 ,實現非阻塞式的獲取就用select
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    hasValue := false
    for {
        var activeWorker chan<- int
        if hasValue {
            activeWorker = workder
        }
        select {
        case n = <-c1:
            hasValue = true

        case n = <-c2:
            hasValue = true
        case activeWorker <- n:
            hasValue = false
        }

    }

}

生成數據和消耗數據速度不一樣,會導致有些數據丟失

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            //隨機sleep1500毫秒 使得c1,c2輸出的速度不一樣
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        time.Sleep(2 * time.Second) //這樣有些數據沒打印出來 新收到的數據會把原來的數據沖掉,我們可以做個數據緩存(排隊)然後去消耗它
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil 走的default
    //    從c1或c2裏面搜 誰來的快從誰裏取 ,實現非阻塞式的獲取就用select
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    //hasValue := false
    var values []int //緩存數據
    //10秒退出
    tm := time.After(20 * time.Second)
    //每秒看一下隊列的長度
    tick := time.Tick(time.Second)
    for {
        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = workder
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            //hasValue = true
            values = append(values, n)
        case n = <-c2:
            //hasValue = true
            values = append(values, n)
        case activeWorker <- activeValue: //activeValue有值纔去送數據
            //hasValue = false
            values = values[1:]
        case <-tick:
            fmt.Println("queue len = ", len(values))
            //fmt.Printf("values= %v\n",values)
        case <-time.After(800 * time.Millisecond):
            //數據生的太慢 800毫秒沒有數據提示超時
            fmt.Println("timeout")
        case <-tm:
            //10秒後退出,timer
            fmt.Println("bye")
            return

        }

    }

}

傳統同步方式

互斥量的使用

package main

import (
    "fmt"
    "time"
)

type atomicInt int //線程安全的

func (a *atomicInt) increment()  {
    *a++
}

func (a *atomicInt) get() int {
    return int(*a)
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a)
}

我們可以在命令行下去查看運行結果,實際上會提示資源DATA Race

 go run --raise atomic.go
package main

import (
    "fmt"
    "sync"
    "time"
)

type atomicInt struct {
    value int
    lock sync.Mutex
} //線程安全的

func (a *atomicInt) increment()  {
    a.lock.Lock()
    defer a.lock.Unlock()
    a.value++
}

func (a *atomicInt) get() int {
    a.lock.Lock()
    defer a.lock.Unlock()
    return a.value
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get()) // a.value時還會報錯data race
}

如果我們想讓一塊代碼區加鎖,我們可以如下

func (a *atomicInt) increment() {
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        a.value++
    }()
}

傳遞同步機制都是通過共享內存來實現同步,我們要通過通信來實現共享內存

主題測試文章,只做測試使用。發佈者:Walker,轉轉請注明出處:https://www.walker-learn.xyz/archives/6739

(0)
Walker的頭像Walker
上一篇 2026年3月6日 15:30
下一篇 2026年3月6日 14:30

相關推薦

  • Go工程師體系課 004

    需求分析 後臺管理系統 商品管理 商品列表 商品分類 品牌管理 品牌分類 訂單管理 訂單列表 用戶信息管理 用戶列表 用戶地址 用戶留言 輪播圖管理 電商系統 登錄頁面 首頁 商品搜索 商品分類導航 輪播圖展示 推薦商品展示 商品詳情頁 商品圖片展示 商品描述 商品規格選擇 加入購物車 購物車 商品列表 數量調整 刪除商品 結算功能 用戶中心 訂單中心 我的…

    2026年3月6日
    7300
  • 編程基礎 0009_testing詳解

    Go testing 詳解 目錄 testing 包基礎 表格驅動測試 子測試 t.Run 基準測試 Benchmark 測試覆蓋率 TestMain httptest 包 Mock 和接口測試技巧 模糊測試 Fuzz 1. testing 包基礎 1.1 測試文件和函數命名規則 Go 測試遵循嚴格的命名約定: 測試文件以 _test.go 結尾(如 use…

    後端開發 2026年3月6日
    8300
  • Go工程師體系課 019

    Go 內存模型與 GC 1. 內存分配基礎 1.1 棧(Stack)與堆(Heap) ┌─────────────────────────────┐ │ 堆 (Heap) │ ← 動態分配,GC 管理 │ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │ obj │ │ obj │ │ obj │ │ │ └─────┘ └─────┘ └────…

    後端開發 2026年3月7日
    7900
  • Go工程師體系課 018

    API 網關與持續部署入門(Kong & Jenkins) 對應資料目錄《第 2 章 Jenkins 入門》《第 3 章 通過 Jenkins 部署服務》,整理 Kong 與 Jenkins 在企業級持續交付中的實戰路徑。即便零基礎,也能順著步驟搭建出自己的網關 + 持續部署流水線。 課前導覽:什麼是 API 網關 API 網關位於客戶端與後端微服務…

    後端開發 2026年3月6日
    10000
  • Go工程師體系課 012

    Go 中集成 Elasticsearch 1. 客戶端庫選擇 1.1 主流 Go ES 客戶端 olivere/elastic:功能最全面,API 設計優雅,支持 ES 7.x/8.x elastic/go-elasticsearch:官方客戶端,輕量級,更接近原生 REST API go-elasticsearch/elasticsearch:社區維護的官…

    後端開發 2026年3月6日
    8800
簡體中文 繁體中文 English