前言
在上一篇文章中,我们提到相同协程是阻塞的,不同协程是并发的这一说法。然而,这种理解存在偏差,所以第一个现象并不像我们之前想象的那样。今天,我们将专门梳理协程的概念,深入剖析其本质,以纠正上次关于协程与阻塞的说明。
案例说明
下面我将介绍一种非常典型的现象
for i in range(2): await device_to_host_source.send(frame) for _ in range(2): await RisingEdge(self.dut.clk)
上面的代码实现了一个 AXIS 发包程序,它会连续发送两帧数据,每帧持续 5 个时钟周期。按原本的设计意图,两帧之间应该间隔 2 个时钟周期。但实际运行后你会发现,代码表面上看没有问题,结果却完全没有达到预期效果,帧与帧之间并没有形成间隔。

问题点说明
当我们在RisingEdge后面加上utils.get_sim_time('ns'),这个函数的作用是打印当前的时间
for i in range(2): await device_to_host_source.send(frame) for _ in range(2): await RisingEdge(self.dut.clk) dut._log.info(f"time={cocotb.utils.get_sim_time('ns')}, rising edge {_}")
我们会发现,第一次 RisingEdge 的时间点和 send 的时间点是重合的,似乎它们是同时发生的,这是为什么呢?

之前我们以为 Send 和 RisingEdge 是两个不同的协程,因此它们是并发的,看起来好像是同时发生的。但这种理解完全错误!
事实上,如果我们把 send 函数换成一个简单的 test_send,而 test_send 里仅仅执行 RisingEdge,就会发现情况不一样了。按照之前的理论,test_send 和外部的 RisingEdge 应该是同时发生的。
async def test_send(self): for _ in range(2): await RisingEdge(self.dut.clk) self.dut._log.info(f"============= run test_send to host {_}=============")
for i in range(2): await test_send(self) for _ in range(2): await RisingEdge(self.dut.clk) dut._log.info(f"time={cocotb.utils.get_sim_time('ns')}, rising edge {_}")
但实际上,它们并不是在同一个仿真时间执行的:test_send 会先完全执行完,再执行外层的 RisingEdge。

协程与任务
协程对象、协程函数、任务
这是两个协程函数(Coroutine Function):
async def foo0(dut): dut._log.info("hello")async def foo1(dut): dut._log.info("world")
这是一个协程函数,它内部调用了其他协程:
async def foo2(dut): await foo0(dut) await foo1(dut)
协程要想运行,首先需要被实例化为协程对象(Coroutine Object)。例如下面这样:
async def main(dut): foo0(dut) foo1(dut) foo2(dut)
仅仅创建协程对象是不会执行的。我们必须使用 await 等待它,协程才会真正运行,当然协程函数和协程对象都可以称之为协程:
async def main(dut): await foo0(dut) await foo1(dut) await foo2(dut)
通过这种方式,执行顺序是:先执行 foo0,再执行 foo1,最后执行 foo2。如果想让协程并发运行,需要使用 cocotb.start_soon 来启动任务(Task)。Task 是用于并发调度协程的机制。例如:
async def main(dut): task0 = cocotb.start_soon(foo0(dut)) task1 = cocotb.start_soon(foo1(dut)) task2 = cocotb.start_soon(foo2(dut))
这样,task0、task1 和 task2 就会并发执行,而不再是顺序执行。
相同协程、不同协程、await
所谓相同协程,指的是同一个协程对象内部的协程,或者说同一个任务(Task)中的协程。所谓“不同协程”,则指不同的协程对象,或者不同任务中的协程。
也就是说,不同协程之间会在 await 点进行切换:当一个协程因为 await 被阻塞时,事件循环(Event Loop)会调度执行其他协程,因此不同协程是并发运行的。
换句话说,Task2 内部的 await foo0 和 await foo1 是顺序阻塞执行的(相同协程),而 Task0 中的 await foo0 与 Task2 中的 await foo0 则是并发执行、非阻塞的。同理,Task1 中的 await foo1 与 Task2 中的 await foo1 也是并发、非阻塞执行。
总结一下:await 仅用于阻塞相同协程(即同一个任务内部的协程),不同协程之间则可以并发运行。
回到之前的问题
Send 和 RisingEdge 看起来像是两个相同的协程也就是说是顺序执行的,因为它们都属于同一个协程对象。那么问题来了:为什么我们看到 Send 和 RisingEdge 仿佛在同一个仿真时间内运行呢?
我们不妨打开 AXIStream 中的 Send 函数,在函数内部插入一些打印日志(log)来观察。
async def send(self, frame): cocotb.log.info(f"{get_sim_time('ns')} - send ENTER") while self.full(): cocotb.log.info(f"{get_sim_time('ns')} - queue FULL -> wait dequeue_event") self.dequeue_event.clear() await self.dequeue_event.wait() cocotb.log.info(f"{get_sim_time('ns')} - dequeue_event fired -> continue send") cocotb.log.info(f"{get_sim_time('ns')} - before queue.put") await self.queue.put(frame) cocotb.log.info(f"{get_sim_time('ns')} - after queue.put -> returning from send")
会发现 Send 在 130ns 就执行完了,而数据包真正发出去是在 140ns。这就很奇怪:到底是谁在发送数据呢?

仔细查看打印 TX Frame 日志的地方,会发现这里有一个死循环 while True,它一直在等待队列中有数据,然后取出数据并控制信号发出。也就是说,Send 的作用只是把要发送的数据放到队列中,而真正发送的是这个死循环。这个死循环必须依赖协程来不断挂起和调度。
async def _run(self): frame = None frame_offset = 0 self.active = False has_tready = hasattr(self.bus, "tready") has_tvalid = hasattr(self.bus, "tvalid") has_tlast = hasattr(self.bus, "tlast") has_tkeep = hasattr(self.bus, "tkeep") has_tid = hasattr(self.bus, "tid") has_tdest = hasattr(self.bus, "tdest") has_tuser = hasattr(self.bus, "tuser") clock_edge_event = RisingEdge(self.clock) while True: await clock_edge_event # read handshake signals tready_sample = (not has_tready) or self.bus.tready.value tvalid_sample = (not has_tvalid) or self.bus.tvalid.value if (tready_sample and tvalid_sample) or not tvalid_sample: if not frame and not self.queue.empty(): frame = self.queue.get_nowait() self.dequeue_event.set() self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_frames -= 1 self.current_frame = frame frame.sim_time_start = get_sim_time() frame.sim_time_end = None self.log.info("TX frame: %s", frame)
再翻看 AXIStream 的 __init__ 方法,我们会恍然大悟:在初始化中,cocotb.start_soon 已经启动了一个任务,也就是对象在创建时就初始化了一个用于取队列数据的协程。这才是关键所在。
class AxiStreamMonitor(AxiStreamBase): _type = "monitor" _init_x = False _valid_init = None _ready_init = None def __init__(self, bus, clock, reset=None, reset_active_level=True, byte_size=None, byte_lanes=None, *args, **kwargs): super().__init__(bus, clock, reset, reset_active_level, byte_size, byte_lanes, *args, **kwargs) self.read_queue = [] if hasattr(self.bus, "tvalid"): cocotb.start_soon(self._run_tvalid_monitor()) if hasattr(self.bus, "tready"): cocotb.start_soon(self._run_tready_monitor())
所以,真正的协程是 AXIStream 对象初始化的协程 与 我们自己创建的协程 并发运行的结果。在 130ns,Send 完成了把数据放入队列,然后继续执行 await RisingEdge;而在 140ns,当 AXIStream 对象的协程监测到队列中有数据时,它会去取出数据并驱动 AXIS 总线产生信号。与此同时,我们自己的协程仍在执行 await RisingEdge。
因此,Send 和 RisingEdge 看起来像是并发执行的,其实这只是一个视觉上的“错觉”
写在最后
通过本文的分析,我们可以清楚地看到协程的本质以及它们的执行机制:
- 1. 相同协程是顺序执行的:同一个协程对象内部的
await会阻塞自身,执行顺序严格按照代码书写顺序。 - 2. 不同协程是并发的:不同协程对象(或通过
cocotb.start_soon启动的 Task)在事件循环中是并发调度的,可以在同一仿真时间或不同时间切换执行。 - 3. AXIStream 的“并发错觉” :在 AXIS 发包例子中,
send看似和RisingEdge同时发生,其实send只是将数据放入队列,而真正的信号驱动是在对象内部已经启动的协程中进行的。我们看到的并发效果是多个协程在事件循环中交替调度的结果,而非单一协程同时执行。
总而言之,理解协程对象与任务的区别、await 的阻塞特性以及事件循环的调度机制,是正确设计 cocotb 测试、排查时序现象的关键。希望本文能够帮助大家理清协程的行为逻辑,避免对阻塞与并发的误解。
评论区
登录后即可参与讨论
立即登录