Source code for cocotb_bus.monitors.avalon

# Copyright cocotb contributors
# Copyright (c) 2013 Potential Ventures Ltd
# Copyright (c) 2013 SolarFlare Communications Inc
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause

"""Monitors for Intel Avalon interfaces.

See https://www.intel.com/content/dam/www/programmable/us/en/pdfs/literature/manual/mnl_avalon_spec_1_3.pdf

NB Currently we only support a very small subset of functionality.
"""

import warnings
from typing import Optional

from cocotb.triggers import RisingEdge
from scapy.utils import hexdump

from cocotb_bus._compat import convert_binary_to_bytes, create_binary
from cocotb_bus.monitors import BusMonitor


class AvalonProtocolError(Exception):
    pass


[docs] class AvalonST(BusMonitor): """Avalon-ST bus. Non-packetized so each valid word is a separate transaction. """ _signals = ["valid", "data"] _optional_signals = ["ready"] _default_config = {"firstSymbolInHighOrderBits": True} def __init__(self, entity, name: Optional[str], clock, *, config={}, **kwargs): BusMonitor.__init__(self, entity, name, clock, **kwargs) self.config = self._default_config.copy() for configoption, value in config.items(): self.config[configoption] = value self.log.debug("Setting config option %s to %s", configoption, str(value)) async def _monitor_recv(self): """Watch the pins and reconstruct transactions.""" # Avoid spurious object creation by recycling clkedge = RisingEdge(self.clock) def valid(): if hasattr(self.bus, "ready"): return ( str(self.bus.valid.value) == "1" and str(self.bus.ready.value) == "1" ) return str(self.bus.valid.value) == "1" # NB could await on valid here more efficiently? while True: await clkedge if valid(): self._recv( convert_binary_to_bytes( self.bus.data.value, big_endian=self.config["firstSymbolInHighOrderBits"], ) )
[docs] class AvalonSTPkts(BusMonitor): """Packetized Avalon-ST bus. Args: entity, name, clock: see :class:`BusMonitor` config (dict): bus configuration options report_channel (bool): report channel with data, default is False Setting to True on bus without channel signal will give an error """ _signals = ["valid", "data", "startofpacket", "endofpacket"] _optional_signals = ["error", "channel", "ready", "empty"] _default_config = { "dataBitsPerSymbol": 8, "firstSymbolInHighOrderBits": True, "maxChannel": 0, "readyLatency": 0, "invalidTimeout": 0, } def __init__( self, entity, name: Optional[str], clock, *, config={}, report_channel=False, **kwargs, ): BusMonitor.__init__(self, entity, name, clock, **kwargs) self.config = self._default_config.copy() self.report_channel = report_channel # Set default config maxChannel to max value on channel bus if hasattr(self.bus, "channel"): self.config["maxChannel"] = (2 ** len(self.bus.channel)) - 1 else: if report_channel: raise ValueError( "Channel reporting asked on bus without channel signal" ) for configoption, value in config.items(): self.config[configoption] = value self.log.debug("Setting config option %s to %s", configoption, str(value)) num_data_symbols = len(self.bus.data) / self.config["dataBitsPerSymbol"] if num_data_symbols > 1 and not hasattr(self.bus, "empty"): raise AttributeError( "%s has %i data symbols, but contains no object named empty" % (self.name, num_data_symbols) ) self.config["useEmpty"] = num_data_symbols > 1 if hasattr(self.bus, "channel"): if len(self.bus.channel) > 128: raise AttributeError( "AvalonST interface specification defines channel width as 1-128. " "%d channel width is %d" % (self.name, len(self.bus.channel)) ) maxChannel = (2 ** len(self.bus.channel)) - 1 if self.config["maxChannel"] > maxChannel: raise AttributeError( "%s has maxChannel=%d, but can only support a maximum channel of " "(2**channel_width)-1=%d, channel_width=%d" % ( self.name, self.config["maxChannel"], maxChannel, len(self.bus.channel), ) ) async def _monitor_recv(self): """Watch the pins and reconstruct transactions.""" # Avoid spurious object creation by recycling clkedge = RisingEdge(self.clock) pkt = b"" in_pkt = False invalid_cyclecount = 0 channel = None def valid(): if hasattr(self.bus, "ready"): return ( str(self.bus.valid.value) == "1" and str(self.bus.ready.value) == "1" ) return str(self.bus.valid.value) == "1" while True: await clkedge if self.in_reset: continue if valid(): invalid_cyclecount = 0 if str(self.bus.startofpacket.value) == "1": if pkt: raise AvalonProtocolError( "Duplicate start-of-packet received on %s" % str(self.bus.startofpacket) ) pkt = b"" in_pkt = True if not in_pkt: raise AvalonProtocolError("Data transfer outside of packet") # Handle empty and X's in empty / data if str(self.bus.endofpacket.value) != "1": pkt += convert_binary_to_bytes( self.bus.data.value, big_endian=self.config["firstSymbolInHighOrderBits"], ) else: value = str(self.bus.data.value) if self.config["useEmpty"] and int(self.bus.empty.value): empty = ( int(self.bus.empty.value) * self.config["dataBitsPerSymbol"] ) if self.config["firstSymbolInHighOrderBits"]: value = value[:-empty] else: value = value[empty:] vec = create_binary( value, len(value), big_endian=self.config["firstSymbolInHighOrderBits"], ) if not vec.is_resolvable: raise AvalonProtocolError( "After empty masking value is still bad? " "Had empty {:d}, got value {:s}".format(empty, value) ) pkt += convert_binary_to_bytes( vec, big_endian=self.config["firstSymbolInHighOrderBits"] ) if hasattr(self.bus, "channel"): if channel is None: channel = int(self.bus.channel.value) if channel > self.config["maxChannel"]: raise AvalonProtocolError( "Channel value (%d) is greater than maxChannel (%d)" % (channel, self.config["maxChannel"]) ) elif int(self.bus.channel.value) != channel: raise AvalonProtocolError("Channel value changed during packet") if str(self.bus.endofpacket.value) == "1": self.log.info("Received a packet of %d bytes", len(pkt)) self.log.debug(f"Received Packet:\n{hexdump(pkt, dump=True)}") self.channel = channel if self.report_channel: self._recv({"data": pkt, "channel": channel}) else: self._recv(pkt) pkt = b"" in_pkt = False channel = None else: if in_pkt: invalid_cyclecount += 1 if self.config["invalidTimeout"]: if invalid_cyclecount >= self.config["invalidTimeout"]: raise AvalonProtocolError( "In-Packet Timeout. Didn't receive any valid data for %d cycles!" % invalid_cyclecount )
class AvalonSTPktsWithChannel(AvalonSTPkts): """Packetized AvalonST bus using channel. This class is deprecated. Use AvalonSTPkts(..., report_channel=True, ...) """ def __init__(self, entity, name: Optional[str], clock, **kwargs): warnings.warn( "Use of AvalonSTPktsWithChannel is deprecated\n" "\tUse AvalonSTPkts(..., report_channel=True, ...)", DeprecationWarning, stacklevel=2, ) AvalonSTPkts.__init__(self, entity, name, clock, report_channel=True, **kwargs)