Context是Go语言中用于控制goroutine生命周期、传递请求范围数据和取消信号的标准方式。
package main
import (
"context"
"fmt"
"time"
)
func main() {
// 1. 背景context(永不取消)
ctx := context.Background()
// 2. 带取消的context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 3. 带超时的context
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 4. 带截止时间的context
deadline := time.Now().Add(10 * time.Second)
ctx, cancel = context.WithDeadline(context.Background(), deadline)
defer cancel()
// 5. 带值的context
ctx = context.WithValue(context.Background(), "userID", 12345)
}
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d 收到取消信号: %v\n", id, ctx.Err())
return
default:
fmt.Printf("Worker %d 工作中...\n", id)
time.Sleep(time.Second)
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}
<-ctx.Done()
fmt.Println("主程序退出")
time.Sleep(time.Second) // 等待worker打印退出消息
}
errgroup提供了goroutine组的同步、错误传播和context取消功能。
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"time"
)
func fetchURL(ctx context.Context, url string) error {
select {
case <-time.After(time.Second):
fmt.Printf("获取 %s 成功\n", url)
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
g, ctx := errgroup.WithContext(context.Background())
urls := []string{
"https://example.com/1",
"https://example.com/2",
"https://example.com/3",
}
for _, url := range urls {
url := url // 捕获循环变量
g.Go(func() error {
return fetchURL(ctx, url)
})
}
// 等待所有goroutine完成
if err := g.Wait(); err != nil {
fmt.Printf("错误: %v\n", err)
}
}
sync.Once确保某个操作只执行一次,常用于单例模式和初始化。
package main
import (
"fmt"
"sync"
)
type Config struct {
data map[string]string
}
var (
instance *Config
once sync.Once
)
func GetConfig() *Config {
once.Do(func() {
fmt.Println("初始化配置...")
instance = &Config{
data: make(map[string]string),
}
instance.data["host"] = "localhost"
instance.data["port"] = "8080"
})
return instance
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
config := GetConfig()
fmt.Printf("Goroutine %d: %v\n", id, config.data)
}(i)
}
wg.Wait()
}
sync.Pool用于复用对象,减少内存分配和GC压力。
package main
import (
"fmt"
"sync"
)
type Buffer struct {
data []byte
}
var bufferPool = sync.Pool{
New: func() interface{} {
fmt.Println("创建新Buffer")
return &Buffer{
data: make([]byte, 1024),
}
},
}
func processData(data []byte) {
// 从池中获取Buffer
buf := bufferPool.Get().(*Buffer)
defer bufferPool.Put(buf) // 使用完后放回池中
// 使用buffer处理数据
copy(buf.data, data)
fmt.Printf("处理了 %d 字节\n", len(data))
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
processData([]byte("test data"))
}()
}
wg.Wait()
}
sync.Map是并发安全的map,适用于读多写少的场景。
package main
import (
"fmt"
"sync"
)
func main() {
var m sync.Map
// 存储
m.Store("key1", "value1")
m.Store("key2", "value2")
// 读取
if value, ok := m.Load("key1"); ok {
fmt.Println("key1:", value)
}
// 读取或存储
actual, loaded := m.LoadOrStore("key3", "value3")
fmt.Printf("key3: %v, 已存在: %v\n", actual, loaded)
// 删除
m.Delete("key2")
// 遍历
m.Range(func(key, value interface{}) bool {
fmt.Printf("%v: %v\n", key, value)
return true // 返回false停止遍历
})
}
package main
import (
"context"
"fmt"
"time"
)
type RateLimiter struct {
tokens chan struct{}
}
func NewRateLimiter(rate int) *RateLimiter {
rl := &RateLimiter{
tokens: make(chan struct{}, rate),
}
// 定期补充令牌
go func() {
ticker := time.NewTicker(time.Second / time.Duration(rate))
defer ticker.Stop()
for range ticker.C {
select {
case rl.tokens <- struct{}{}:
default:
}
}
}()
return rl
}
func (rl *RateLimiter) Wait(ctx context.Context) error {
select {
case <-rl.tokens:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
limiter := NewRateLimiter(2) // 每秒2个请求
for i := 1; i <= 5; i++ {
if err := limiter.Wait(context.Background()); err != nil {
fmt.Println("限流错误:", err)
continue
}
fmt.Printf("请求 %d 在 %v 执行\n", i, time.Now())
}
}
package main
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
"time"
)
func main() {
// 创建信号量,限制并发数为3
sem := semaphore.NewWeighted(3)
ctx := context.Background()
for i := 1; i <= 10; i++ {
if err := sem.Acquire(ctx, 1); err != nil {
fmt.Println("获取信号量失败:", err)
break
}
go func(id int) {
defer sem.Release(1)
fmt.Printf("任务 %d 开始\n", id)
time.Sleep(time.Second)
fmt.Printf("任务 %d 完成\n", id)
}(i)
}
// 等待所有任务完成
if err := sem.Acquire(ctx, 3); err != nil {
fmt.Println("等待失败:", err)
}
}
package main
import (
"context"
"fmt"
"time"
)
func doWork(ctx context.Context) error {
select {
case <-time.After(2 * time.Second):
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := doWork(ctx); err != nil {
fmt.Println("操作超时:", err)
} else {
fmt.Println("操作成功")
}
}
# 使用-race标志检测数据竞争
go run -race main.go
go test -race ./...
go build -race
package main
import (
"fmt"
"sync"
)
// 错误示例:数据竞争
func badExample() {
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // 数据竞争!
}()
}
wg.Wait()
fmt.Println("Counter:", counter) // 结果不确定
}
// 正确示例:使用Mutex
func goodExample() {
counter := 0
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}()
}
wg.Wait()
fmt.Println("Counter:", counter) // 1000
}