Go资深工程师讲解(慕课) 004

004

goroutine

package main

import (
    "fmt"
    "time"
)

func main() {
    for i:=0;i<10;i++{
        go func(i int) {
            fmt.Printf("Hello from goroutine %d \n",i) // i/o操作可以交出控制权,手动操作交出控制权runtime.Gosched()
        }(i)
    }
    // main直接退出了,还没来得急执行协程
  time.Sleep(time.Millisecond)
}
  • 轻量级“线程”
  • 非抢占式多任务处理,由协程主动交出控制权(线程是抢占式任务处理)
  • 编译器/解释器/虚拟机层面的多任务
  • 多个协程可能在一个或多个线程上运行
  • 手动交出控制权 runtime.Gosched()

子程序是协程的一个特例,协程是双向的

线程与协程
goroutine 协程
谁和谁放在一个线程,这个不用管由调度器来管理

  • 在函数前加上 go 关键字,就能送给调度器运行
  • 不需要 在定义时区分是否是异步函数
  • 调度器在合适的点进行切换
  • 使用 race 来检测数据访问冲突

goroutine 可能的切换点

  • I/O,select
  • channel
  • 等待锁
  • 函数调用
  • runtime.Gosched() 手动

channel

goroutine 和 goroutine 之间可以开出很多个双向通道,它是 goroutine 和 goroutine 之间的交互。有发,就得有收,要不就会卡在那(deadlock)。

package main

import (
    "fmt"
    "time"
)

func worker(c chan int)  {
    for{
        n:=<-c
        fmt.Println(n)
    }
}

func chanDemo()  {
    c:=make(chan int) //var c chan int 这只是定义了一个c,这是一个chan类型内部值是int的变量,并没有生成一个chan 要生成,需要用make
    //用go关键词
    /*go func() {
        for{
            n:=<-c //收数据
            fmt.Println(n)
        }
    }()*/
    go worker(c)
    c<-1 //有发就得有收
    c<-2
    time.Sleep(time.Millisecond) //简单发一个sleep这样这两个值都打印出来,要不c还没打印,main函数已经结束,收的2还来不及打印
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %d\n",id,<-c)
    }
}

func chanDemo()  {
    c:=make(chan int)
    go worker(0,c)
    c<-1 //有发就得有收
    c<-2
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %c\n",id,<-c)
    }
}

func chanDemo()  {
    //c:=make(chan int)
    //go worker(0,c)
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=make(chan int)
        go worker(i,channels[i])
    }
    //c<-1 //有发就得有收
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

channel 做为返回值,返回一个 channel

package main

import (
    "fmt"
    "time"
)

//
func createWorker(id int) chan int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func chanDemo()  {
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=createWorker(i)
    }
    //c<-1 //有发就得有收
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

有时候我们要明确收发功能,所以我们可以在返回类型上加箭头,(可能收,可能发,注意位置)chan<-收数据 <-chan 发数据,所以func createWorker(id int) chan<- int {var channels [10]chan<- int 改成有箭头

bufferedChan

我们建立 channel 有发就得有收,要不程序就会 deadlock,如下

func bufferedChannel(){
    c:=make(chan int)
    c <-1
}
func main(){
    bufferedChannel() //程序挂掉,因为发了没人收
}

加入缓存区

func bufferedChannel(){
    c:=make(chan int,3)
    c <-1
    c <-2
    c <-3
    c <-4 //发4会报错,没人收,缓存大小是3
}
func main(){
    bufferedChannel() //程序挂掉,因为发了没人收
}
package main

import (
    "fmt"
    "time"
)

func createWorker(id int) chan<- int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func bufferedChannel(){
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    time.Sleep(time.Millisecond)
}

func main() {
    bufferedChannel()
}

channel 是可以 close 掉的,可以通知发完了(由发送方来完成 close)

func channelClose()  {
//    数据有明确的结尾的话,发送方可以通知close
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    close(c) //关闭 一旦close 接收方就会收到0值 空串或0
    time.Sleep(time.Millisecond) //收一毫秒的空值
}
// 那我们接收方也要做一个判断,如果有值就收没值就退出呗
func createWorker(id int) chan<- int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            n,ok:=<-c
            if !ok {
                break
            }
            fmt.Printf("Workder %d received %c\n",id,n)

        }
    }()
    return c
}
// 还有一种方法,我们可以通过range来判断是否结束了
func createWorker(id int) chan<- int {
    c := make(chan int)
    //真正的worker
    go func() {
        for n := range c {
            fmt.Printf("Workder %d received %c\n", id, n)
        }
    }()
    return c
}

channel buffered channel 发送方 close() 接收方两种判断结束的方式n,ok:=<-crange CSP 模型
不要能过共享内存来通信,通过通信来共享内存

等待任务

package main

import (
    "fmt"
)

//创建结构
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        done <- true //输出一个完成true
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        <-workers[i].done //读状态出来
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        <-workers[i].done // 这样写会按顺序执行,读个小写a 输出一个
    }
}

func main() {
    chanDemo()
}

希望是把所有任务都发出去后,再来等 20 done

package main

import (
    "fmt"
)

//创建结构
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        go func() {
            done <- true //输出一个完成true
        }()
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        //<-workers[i].done //读状态出来
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        //<-workers[i].done // 这样写会按顺序执行,读个小写a 输出一个
    }
     //等所有发送都完成后再收done的状态
    for _,workder:=range workers { //这样会出错,看小写的打印完,打写的没有打印,这是因为所有的channel发都是block的,小写的完了有10个done,没人收又发10个大写的,就循环等待了,有个简单快速的解决方法就是,将done<-true放到另goroutine中去做 ,或者将大小写分开,小写发完,done完再去搞大写
        <-workder.done
        <-workder.done
    }
}

func main() {
    chanDemo()
}

WaitGroup

我们再改写一下上个例子

package main

import (
    "fmt"
    "sync"
)

//创建结构
type workder struct {
    in chan int
    wg *sync.WaitGroup //共享一个
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        wg: wg,
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) //我们有20个任务就开20个
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

再抽象一下 将 wg 封闭到结构的函数中

package main

import (
    "fmt"
    "sync"
)

//创建结构
type workder struct {
    in chan int
    done func()
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        done: func() {
            wg.Done()
        },
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) //我们有20个任务就开20个
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

channel 来实现树的遍历

// main.go
func main(){
    c:=root.TraverseWithChannel()
    maxNode:=0
    for node:=range c{
        if node.Value>maxNode {
            maxNode = node.Value
        }
    }
    fmt.Println("Max node value:", maxNode)
}
// traverse.go
package tree

import "fmt"

//中序遍历 先左在中在右
func (node *Node) Traverse() {
    /*if node==nil {
        return
    }
    node.Left.Traverse()
    //只能作打印,
    node.Print()
    node.Right.Traverse()*/

    node.TraverseFunc(func(n *Node) {
        n.Print()
    })
    fmt.Println()
}

//改造一下
func (node *Node) TraverseFunc(f func(*Node)) {
    if node == nil {
        return
    }
    node.Left.TraverseFunc(f)
    f(node)
    node.Right.TraverseFunc(f)
}
// 返回一个channel
func (node *Node) TraverseWithChannel() chan *Node{
    out:=make(chan *Node)
    go func() {
        node.TraverseFunc(func(node *Node) {
            out <- node
        })
        close(out) //注意关闭
    }()
    return out
}

select 的使用

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func main() {
    //var c1, c2 chan int
    //    从c1或c2里面搜 谁快搜谁 ,实现非阻塞式的获取就用select
    var c1, c2 = generator(), generator()
    for {
        select {
        case n := <-c1:
            fmt.Println("Received from c1:", n)
        case n := <-c2:
            fmt.Println("Received from c2:", n)
        default:
            fmt.Println("No value received")

        }
    }

}
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            //随机sleep1500毫秒 使得c1,c2输出的速度不一样
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil 走的default
    //    从c1或c2里面搜 谁来的快从谁里取 ,实现非阻塞式的获取就用select
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    hasValue := false
    for {
        var activeWorker chan<- int
        if hasValue {
            activeWorker = workder
        }
        select {
        case n = <-c1:
            hasValue = true

        case n = <-c2:
            hasValue = true
        case activeWorker <- n:
            hasValue = false
        }

    }

}

生成数据和消耗数据速度不一样,会导致有些数据丢失

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            //随机sleep1500毫秒 使得c1,c2输出的速度不一样
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        time.Sleep(2 * time.Second) //这样有些数据没打印出来 新收到的数据会把原来的数据冲掉,我们可以做个数据缓存(排队)然后去消耗它
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil 走的default
    //    从c1或c2里面搜 谁来的快从谁里取 ,实现非阻塞式的获取就用select
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    //hasValue := false
    var values []int //缓存数据
    //10秒退出
    tm := time.After(20 * time.Second)
    //每秒看一下队列的长度
    tick := time.Tick(time.Second)
    for {
        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = workder
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            //hasValue = true
            values = append(values, n)
        case n = <-c2:
            //hasValue = true
            values = append(values, n)
        case activeWorker <- activeValue: //activeValue有值才去送数据
            //hasValue = false
            values = values[1:]
        case <-tick:
            fmt.Println("queue len = ", len(values))
            //fmt.Printf("values= %v\n",values)
        case <-time.After(800 * time.Millisecond):
            //数据生的太慢 800毫秒没有数据提示超时
            fmt.Println("timeout")
        case <-tm:
            //10秒后退出,timer
            fmt.Println("bye")
            return

        }

    }

}

传统同步方式

互斥量的使用

package main

import (
    "fmt"
    "time"
)

type atomicInt int //线程安全的

func (a *atomicInt) increment()  {
    *a++
}

func (a *atomicInt) get() int {
    return int(*a)
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a)
}

我们可以在命令行下去查看运行结果,实际上会提示资源DATA Race

 go run --raise atomic.go
package main

import (
    "fmt"
    "sync"
    "time"
)

type atomicInt struct {
    value int
    lock sync.Mutex
} //线程安全的

func (a *atomicInt) increment()  {
    a.lock.Lock()
    defer a.lock.Unlock()
    a.value++
}

func (a *atomicInt) get() int {
    a.lock.Lock()
    defer a.lock.Unlock()
    return a.value
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get()) // a.value时还会报错data race
}

如果我们想让一块代码区加锁,我们可以如下

func (a *atomicInt) increment() {
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        a.value++
    }()
}

传递同步机制都是通过共享内存来实现同步,我们要通过通信来实现共享内存

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://www.walker-learn.xyz/archives/6739

(0)
Walker的头像Walker
上一篇 2026年3月6日 15:30
下一篇 2026年3月6日 14:30

相关推荐

  • Go工程师体系课 004

    需求分析 后台管理系统 商品管理 商品列表 商品分类 品牌管理 品牌分类 订单管理 订单列表 用户信息管理 用户列表 用户地址 用户留言 轮播图管理 电商系统 登录页面 首页 商品搜索 商品分类导航 轮播图展示 推荐商品展示 商品详情页 商品图片展示 商品描述 商品规格选择 加入购物车 购物车 商品列表 数量调整 删除商品 结算功能 用户中心 订单中心 我的…

    2026年3月6日
    7300
  • 编程基础 0009_testing详解

    Go testing 详解 目录 testing 包基础 表格驱动测试 子测试 t.Run 基准测试 Benchmark 测试覆盖率 TestMain httptest 包 Mock 和接口测试技巧 模糊测试 Fuzz 1. testing 包基础 1.1 测试文件和函数命名规则 Go 测试遵循严格的命名约定: 测试文件以 _test.go 结尾(如 use…

    后端开发 2026年3月6日
    8400
  • Go工程师体系课 019

    Go 内存模型与 GC 1. 内存分配基础 1.1 栈(Stack)与堆(Heap) ┌─────────────────────────────┐ │ 堆 (Heap) │ ← 动态分配,GC 管理 │ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │ obj │ │ obj │ │ obj │ │ │ └─────┘ └─────┘ └────…

    后端开发 2026年3月7日
    7900
  • Go工程师体系课 018

    API 网关与持续部署入门(Kong & Jenkins) 对应资料目录《第 2 章 Jenkins 入门》《第 3 章 通过 Jenkins 部署服务》,整理 Kong 与 Jenkins 在企业级持续交付中的实战路径。即便零基础,也能顺着步骤搭建出自己的网关 + 持续部署流水线。 课前导览:什么是 API 网关 API 网关位于客户端与后端微服务…

    后端开发 2026年3月6日
    10100
  • Go工程师体系课 012

    Go 中集成 Elasticsearch 1. 客户端库选择 1.1 主流 Go ES 客户端 olivere/elastic:功能最全面,API 设计优雅,支持 ES 7.x/8.x elastic/go-elasticsearch:官方客户端,轻量级,更接近原生 REST API go-elasticsearch/elasticsearch:社区维护的官…

    后端开发 2026年3月6日
    8900
简体中文 繁体中文 English