<返回目录     Powered by claud/xia兄

第11课: 并发模式与最佳实践

Context包

Context是Go语言中用于控制goroutine生命周期、传递请求范围数据和取消信号的标准方式。

Context的作用

创建Context

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    // 1. 背景context(永不取消)
    ctx := context.Background()

    // 2. 带取消的context
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 3. 带超时的context
    ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // 4. 带截止时间的context
    deadline := time.Now().Add(10 * time.Second)
    ctx, cancel = context.WithDeadline(context.Background(), deadline)
    defer cancel()

    // 5. 带值的context
    ctx = context.WithValue(context.Background(), "userID", 12345)
}

使用Context控制goroutine

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d 收到取消信号: %v\n", id, ctx.Err())
            return
        default:
            fmt.Printf("Worker %d 工作中...\n", id)
            time.Sleep(time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    for i := 1; i <= 3; i++ {
        go worker(ctx, i)
    }

    <-ctx.Done()
    fmt.Println("主程序退出")
    time.Sleep(time.Second) // 等待worker打印退出消息
}

errgroup包

errgroup提供了goroutine组的同步、错误传播和context取消功能。

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/errgroup"
    "time"
)

func fetchURL(ctx context.Context, url string) error {
    select {
    case <-time.After(time.Second):
        fmt.Printf("获取 %s 成功\n", url)
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func main() {
    g, ctx := errgroup.WithContext(context.Background())

    urls := []string{
        "https://example.com/1",
        "https://example.com/2",
        "https://example.com/3",
    }

    for _, url := range urls {
        url := url // 捕获循环变量
        g.Go(func() error {
            return fetchURL(ctx, url)
        })
    }

    // 等待所有goroutine完成
    if err := g.Wait(); err != nil {
        fmt.Printf("错误: %v\n", err)
    }
}

sync.Once

sync.Once确保某个操作只执行一次,常用于单例模式和初始化。

package main

import (
    "fmt"
    "sync"
)

type Config struct {
    data map[string]string
}

var (
    instance *Config
    once     sync.Once
)

func GetConfig() *Config {
    once.Do(func() {
        fmt.Println("初始化配置...")
        instance = &Config{
            data: make(map[string]string),
        }
        instance.data["host"] = "localhost"
        instance.data["port"] = "8080"
    })
    return instance
}

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            config := GetConfig()
            fmt.Printf("Goroutine %d: %v\n", id, config.data)
        }(i)
    }

    wg.Wait()
}

sync.Pool

sync.Pool用于复用对象,减少内存分配和GC压力。

package main

import (
    "fmt"
    "sync"
)

type Buffer struct {
    data []byte
}

var bufferPool = sync.Pool{
    New: func() interface{} {
        fmt.Println("创建新Buffer")
        return &Buffer{
            data: make([]byte, 1024),
        }
    },
}

func processData(data []byte) {
    // 从池中获取Buffer
    buf := bufferPool.Get().(*Buffer)
    defer bufferPool.Put(buf) // 使用完后放回池中

    // 使用buffer处理数据
    copy(buf.data, data)
    fmt.Printf("处理了 %d 字节\n", len(data))
}

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            processData([]byte("test data"))
        }()
    }

    wg.Wait()
}

sync.Map

sync.Map是并发安全的map,适用于读多写少的场景。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var m sync.Map

    // 存储
    m.Store("key1", "value1")
    m.Store("key2", "value2")

    // 读取
    if value, ok := m.Load("key1"); ok {
        fmt.Println("key1:", value)
    }

    // 读取或存储
    actual, loaded := m.LoadOrStore("key3", "value3")
    fmt.Printf("key3: %v, 已存在: %v\n", actual, loaded)

    // 删除
    m.Delete("key2")

    // 遍历
    m.Range(func(key, value interface{}) bool {
        fmt.Printf("%v: %v\n", key, value)
        return true // 返回false停止遍历
    })
}

并发模式

1. 限流器模式

package main

import (
    "context"
    "fmt"
    "time"
)

type RateLimiter struct {
    tokens chan struct{}
}

func NewRateLimiter(rate int) *RateLimiter {
    rl := &RateLimiter{
        tokens: make(chan struct{}, rate),
    }

    // 定期补充令牌
    go func() {
        ticker := time.NewTicker(time.Second / time.Duration(rate))
        defer ticker.Stop()

        for range ticker.C {
            select {
            case rl.tokens <- struct{}{}:
            default:
            }
        }
    }()

    return rl
}

func (rl *RateLimiter) Wait(ctx context.Context) error {
    select {
    case <-rl.tokens:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func main() {
    limiter := NewRateLimiter(2) // 每秒2个请求

    for i := 1; i <= 5; i++ {
        if err := limiter.Wait(context.Background()); err != nil {
            fmt.Println("限流错误:", err)
            continue
        }
        fmt.Printf("请求 %d 在 %v 执行\n", i, time.Now())
    }
}

2. 信号量模式

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/semaphore"
    "time"
)

func main() {
    // 创建信号量,限制并发数为3
    sem := semaphore.NewWeighted(3)
    ctx := context.Background()

    for i := 1; i <= 10; i++ {
        if err := sem.Acquire(ctx, 1); err != nil {
            fmt.Println("获取信号量失败:", err)
            break
        }

        go func(id int) {
            defer sem.Release(1)
            fmt.Printf("任务 %d 开始\n", id)
            time.Sleep(time.Second)
            fmt.Printf("任务 %d 完成\n", id)
        }(i)
    }

    // 等待所有任务完成
    if err := sem.Acquire(ctx, 3); err != nil {
        fmt.Println("等待失败:", err)
    }
}

3. 超时模式

package main

import (
    "context"
    "fmt"
    "time"
)

func doWork(ctx context.Context) error {
    select {
    case <-time.After(2 * time.Second):
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()

    if err := doWork(ctx); err != nil {
        fmt.Println("操作超时:", err)
    } else {
        fmt.Println("操作成功")
    }
}

并发安全

数据竞争检测

# 使用-race标志检测数据竞争
go run -race main.go
go test -race ./...
go build -race

避免数据竞争

package main

import (
    "fmt"
    "sync"
)

// 错误示例:数据竞争
func badExample() {
    counter := 0
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // 数据竞争!
        }()
    }

    wg.Wait()
    fmt.Println("Counter:", counter) // 结果不确定
}

// 正确示例:使用Mutex
func goodExample() {
    counter := 0
    var mu sync.Mutex
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            counter++
            mu.Unlock()
        }()
    }

    wg.Wait()
    fmt.Println("Counter:", counter) // 1000
}

最佳实践

常见错误:

实践练习

  1. 使用context实现可取消的HTTP请求
  2. 使用errgroup并发处理多个任务
  3. 实现一个线程安全的缓存(使用sync.Map)
  4. 使用sync.Pool优化高频对象分配
  5. 实现一个限流器(令牌桶算法)
  6. 使用信号量限制并发数
  7. 编写并发安全的单例模式
  8. 使用-race检测并修复数据竞争