协程(Coroutine)是一种协作式多任务机制,允许函数暂停执行并在稍后恢复。与线程不同,协程是非抢占式的,由程序员控制切换时机。Lua的协程基于"半协程"(semi-coroutine)实现,具有不对称的控制转移:resume和yield不是对等的。协程的状态保存在Lua栈中,包括局部变量、程序计数器等。协程切换的开销很小,通常只有几百字节,使得Lua可以轻松创建成千上万个协程。
协程(Coroutine)是一种协作式多任务机制,允许函数暂停执行并在稍后恢复。与线程不同,协程是非抢占式的,由程序员控制切换时机。
-- 协程的基本概念
-- 协程 = 可暂停和恢复的函数
-- 协程是协作式的,不是抢占式的
-- 协程切换开销小,适合大量并发任务
-- 协程状态:suspended, running, normal, dead
-- 协程 vs 线程
-- 协程:用户态,协作式,轻量级
-- 线程:内核态,抢占式,重量级
-- 协程的应用场景:
-- 1. 迭代器生成
-- 2. 异步I/O
-- 3. 状态机
-- 4. 生产者-消费者模式
-- 5. 协程管道
-- 6. 游戏AI
-- 创建协程
local co = coroutine.create(function()
print("协程开始执行")
coroutine.yield()
print("协程恢复执行")
end)
-- 查看协程状态
print(coroutine.status(co)) -- suspended
-- 启动协程
coroutine.resume(co) -- 输出: 协程开始执行
print(coroutine.status(co)) -- suspended
-- 恢复协程
coroutine.resume(co) -- 输出: 协程恢复执行
print(coroutine.status(co)) -- dead
-- 协程有4种状态:
-- suspended:挂起(初始状态或yield后)
-- running:运行中
-- normal:正常(调用另一个协程时)
-- dead:已结束
local co = coroutine.create(function()
print("状态1:", coroutine.status(co)) -- running
coroutine.yield()
print("状态2:", coroutine.status(co)) -- running
end)
print("状态0:", coroutine.status(co)) -- suspended
coroutine.resume(co)
print("状态3:", coroutine.status(co)) -- suspended
coroutine.resume(co)
print("状态4:", coroutine.status(co)) -- dead
-- resume和yield可以传递数据
local co = coroutine.create(function(a, b)
print("接收参数:", a, b)
local x, y = coroutine.yield(a + b)
print("接收参数:", x, y)
return x * y
end)
-- resume的返回值:状态 + yield的参数
local status, result = coroutine.resume(co, 10, 20)
print("返回值:", result) -- 30
-- 再次resume,传递参数给yield
local status, result = coroutine.resume(co, 5, 6)
print("返回值:", result) -- 30
-- 输出:
-- 接收参数: 10 20
-- 返回值: 30
-- 接收参数: 5 6
-- 返回值: 30
-- 生产者
function producer()
return coroutine.create(function()
for i = 1, 5 do
print("生产:", i)
coroutine.yield(i)
end
end)
end
-- 消费者
function consumer(prod)
while true do
local status, value = coroutine.resume(prod)
if not status then break end
if value then
print("消费:", value)
end
end
end
local prod = producer()
consumer(prod)
-- 输出:
-- 生产: 1
-- 消费: 1
-- 生产: 2
-- 消费: 2
-- ...
-- 使用协程实现迭代器
function range(n)
return coroutine.wrap(function()
for i = 1, n do
coroutine.yield(i)
end
end)
end
for i in range(5) do
print(i)
end
-- 输出: 1 2 3 4 5
-- 文件行迭代器
function lines(filename)
return coroutine.wrap(function()
local file = io.open(filename)
if not file then return end
for line in file:lines() do
coroutine.yield(line)
end
file:close()
end)
end
-- 使用
for line in lines("test.txt") do
print(line)
end
-- 模拟异步任务
local tasks = {}
function async(func)
local co = coroutine.create(func)
table.insert(tasks, co)
return co
end
function await(seconds)
local start = os.time()
while os.time() - start < seconds do
coroutine.yield()
end
end
function runTasks()
while #tasks > 0 do
for i = #tasks, 1, -1 do
local co = tasks[i]
if coroutine.status(co) == "dead" then
table.remove(tasks, i)
else
coroutine.resume(co)
end
end
end
end
-- 使用
async(function()
print("任务1开始")
await(2)
print("任务1完成")
end)
async(function()
print("任务2开始")
await(1)
print("任务2完成")
end)
runTasks()
-- 管道模式
function filter(source, predicate)
return coroutine.wrap(function()
for value in source do
if predicate(value) then
coroutine.yield(value)
end
end
end)
end
function map(source, transform)
return coroutine.wrap(function()
for value in source do
coroutine.yield(transform(value))
end
end)
end
-- 数据源
function numbers(n)
return coroutine.wrap(function()
for i = 1, n do
coroutine.yield(i)
end
end)
end
-- 使用管道
local source = numbers(10)
local evens = filter(source, function(x) return x % 2 == 0 end)
local squares = map(evens, function(x) return x * x end)
for value in squares do
print(value)
end
-- 输出: 4 16 36 64 100
-- 协程中的错误处理
local co = coroutine.create(function()
print("开始")
error("发生错误")
print("不会执行")
end)
local status, err = coroutine.resume(co)
if not status then
print("错误:", err)
end
-- 使用pcall保护协程
local co2 = coroutine.create(function()
local status, result = pcall(function()
error("内部错误")
end)
if not status then
print("捕获错误:", result)
end
coroutine.yield("继续执行")
end)
coroutine.resume(co2)
coroutine.resume(co2)
-- 状态机示例
function stateMachine()
local state = "idle"
return coroutine.wrap(function()
while true do
if state == "idle" then
print("空闲状态")
state = coroutine.yield("idle")
elseif state == "running" then
print("运行状态")
state = coroutine.yield("running")
elseif state == "stopped" then
print("停止状态")
break
end
end
end)
end
local sm = stateMachine()
print(sm()) -- 空闲状态
print(sm("running")) -- 运行状态
print(sm("idle")) -- 空闲状态
print(sm("stopped")) -- 停止状态
-- 1. 协程池
local CoroutinePool = {}
CoroutinePool.__index = CoroutinePool
function CoroutinePool.new(size)
local pool = setmetatable({
coroutines = {},
available = {},
size = size or 10
}, CoroutinePool)
for i = 1, pool.size do
local co = coroutine.create(function()
while true do
local task = coroutine.yield()
if task then
task()
end
end
end)
table.insert(pool.coroutines, co)
table.insert(pool.available, co)
end
return pool
end
function CoroutinePool:execute(task)
if #self.available > 0 then
local co = table.remove(self.available)
local status = coroutine.resume(co, task)
if status then
table.insert(self.available, co)
end
else
error("协程池已满")
end
end
-- 2. 协程实现超时
function timeout(seconds, func)
local co = coroutine.create(func)
local timer = os.time()
while coroutine.status(co) ~= "dead" do
if os.time() - timer > seconds then
error("操作超时")
end
coroutine.resume(co)
end
end
-- 3. 协程实现互斥锁
local Mutex = {}
Mutex.__index = Mutex
function Mutex.new()
return setmetatable({
locked = false,
queue = {}
}, Mutex)
end
function Mutex:lock()
if not self.locked then
self.locked = true
return true
else
local co = coroutine.running()
table.insert(self.queue, co)
coroutine.yield()
return true
end
end
function Mutex:unlock()
self.locked = false
if #self.queue > 0 then
local co = table.remove(self.queue, 1)
coroutine.resume(co)
end
end
-- 4. 协程实现条件变量
local Condition = {}
Condition.__index = Condition
function Condition.new()
return setmetatable({
waiting = {}
}, Condition)
end
function Condition:wait(mutex)
local co = coroutine.running()
table.insert(self.waiting, co)
mutex:unlock()
coroutine.yield()
mutex:lock()
end
function Condition:signal()
if #self.waiting > 0 then
local co = table.remove(self.waiting, 1)
coroutine.resume(co)
end
end
function Condition:broadcast()
while #self.waiting > 0 do
local co = table.remove(self.waiting, 1)
coroutine.resume(co)
end
end
-- 5. 协程实现生成器模式
function Generator(func)
return coroutine.wrap(function()
func(function(...)
coroutine.yield(...)
end)
end)
end
local fib = Generator(function(yield)
local a, b = 0, 1
while true do
yield(a)
a, b = b, a + b
end
end)
for i = 1, 10 do
print(fib())
end
-- 6. 协程实现异步HTTP请求(模拟)
local HttpClient = {}
HttpClient.__index = HttpClient
function HttpClient.new()
return setmetatable({
requests = {},
responses = {}
}, HttpClient)
end
function HttpClient:get(url, callback)
local co = coroutine.running()
self.requests[url] = co
callback(url, function(response)
self.responses[url] = response
if co and coroutine.status(co) ~= "dead" then
coroutine.resume(co)
end
end)
if co then
return coroutine.yield()
else
return self.responses[url]
end
end
-- 7. 协程实现任务调度器
local Scheduler = {}
Scheduler.__index = Scheduler
function Scheduler.new()
return setmetatable({
tasks = {},
current = nil
}, Scheduler)
end
function Scheduler:schedule(func)
local co = coroutine.create(func)
table.insert(self.tasks, co)
return co
end
function Scheduler:run()
while #self.tasks > 0 do
for i = #self.tasks, 1, -1 do
local co = self.tasks[i]
if coroutine.status(co) == "dead" then
table.remove(self.tasks, i)
else
self.current = co
local ok, err = coroutine.resume(co)
if not ok then
print("协程错误:", err)
table.remove(self.tasks, i)
end
end
end
end
end
function Scheduler:yield()
if self.current then
coroutine.yield()
end
end
-- 使用调度器
local scheduler = Scheduler.new()
scheduler:schedule(function()
for i = 1, 3 do
print("任务1:", i)
scheduler:yield()
end
end)
scheduler:schedule(function()
for i = 1, 3 do
print("任务2:", i)
scheduler:yield()
end
end)
scheduler:run()