Source code for cocotb_bus.drivers.opb

# Copyright cocotb contributors
# Copyright (c) 2015 Potential Ventures Ltd
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause

"""
Drivers for On-chip Peripheral Bus.

NOTE: Currently we only support a very small subset of functionality.
"""

from cocotb.triggers import ReadOnly, RisingEdge

from cocotb_bus._compat import BinaryType
from cocotb_bus.drivers import BusDriver


class OPBException(Exception):
    pass


[docs] class OPBMaster(BusDriver): """On-chip peripheral bus master.""" _signals = [ "xferAck", "errAck", "toutSup", "retry", "DBus_out", "select", "RNW", "BE", "ABus", "DBus_in", ] _optional_signals = ["seqAddr"] _max_cycles = 16 def __init__(self, entity, name, clock, **kwargs): BusDriver.__init__(self, entity, name, clock, **kwargs) self.bus.select.value = 0 self.log.debug("OPBMaster created")
[docs] async def read(self, address: int, sync: bool = True) -> BinaryType: """Issue a request to the bus and block until this comes back. Simulation time still progresses but syntactically it blocks. Args: address: The address to read from. sync: Wait for rising edge on clock initially. Defaults to True. Returns: The read data value. Raises: OPBException: If read took longer than 16 cycles. """ await self._acquire_lock() # Apply values for next clock edge if sync: await RisingEdge(self.clock) self.bus.ABus.value = address self.bus.select.value = 1 self.bus.RNW.value = 1 self.bus.BE.value = 0xF count = 0 while not int(self.bus.xferAck.value): await RisingEdge(self.clock) await ReadOnly() if int(self.bus.toutSup.value): count = 0 else: count += 1 if count >= self._max_cycles: raise OPBException("Read took longer than 16 cycles") data = int(self.bus.DBus_out.value) # Deassert read self.bus.select.value = 0 self._release_lock() self.log.info("Read of address 0x%x returned 0x%08x" % (address, data)) return data
[docs] async def write(self, address: int, value: int, sync: bool = True) -> None: """Issue a write to the given address with the specified value. Args: address: The address to read from. value: The data value to write. sync: Wait for rising edge on clock initially. Defaults to True. Raises: OPBException: If write took longer than 16 cycles. """ await self._acquire_lock() if sync: await RisingEdge(self.clock) self.bus.ABus.value = address self.bus.select.value = 1 self.bus.RNW.value = 0 self.bus.BE.value = 0xF self.bus.DBus_out.value = value count = 0 while not int(self.bus.xferAck.value): await RisingEdge(self.clock) await ReadOnly() if int(self.bus.toutSup.value): count = 0 else: count += 1 if count >= self._max_cycles: raise OPBException("Write took longer than 16 cycles") self.bus.select.value = 0 self._release_lock()