"""
Helper functions for viewing the block visually.
Each of the functions in visualization take a block and a file descriptor.
The functions provided write the block as a given visual format to the file.
"""
import collections
from .pyrtlexceptions import PyrtlError, PyrtlInternalError
from .core import working_block, LogicNet
from .wire import WireVector, Input, Output, Const, Register
[docs]
def net_graph(block=None, split_state=False):
"""Return a graph representation of the given block.
:param Block block: block to use (defaults to current working block)
:param bool split_state: if True, split connections to/from a register
update net; this means that registers will be appear as source nodes of
the network, and ``r`` nets (i.e. the logic for setting a register's
next value) will be treated as sink nodes of the network.
The graph has the following form::
{
node1: { nodeA: [edge1A_1, edge1A_2], nodeB: [edge1B]},
node2: { nodeB: [edge2B], nodeC: [edge2C_1, edge2C_2]},
...
}
aka: ``edges = graph[source][dest]``
Each node can be either a LogicNet or a WireVector (e.g. an Input, an
Output, a Const or even an undriven WireVector (which acts as a source or
sink in the network). Each edge is a WireVector or derived type (Input,
Output, Register, etc.). Note that inputs, consts, and outputs will be
both "node" and "edge". WireVectors that are not connected to any nets are
not returned as part of the graph.
"""
# FIXME: make it not try to add unused wires (issue #204)
block = working_block(block)
from .wire import Register
# self.sanity_check()
graph = {}
# add all of the nodes
for net in block.logic:
graph[net] = {}
wire_src_dict, wire_dst_dict = block.net_connections()
dest_set = set(wire_src_dict.keys())
arg_set = set(wire_dst_dict.keys())
dangle_set = dest_set.symmetric_difference(arg_set)
for w in dangle_set:
graph[w] = {}
if split_state:
for w in block.wirevector_subset(Register):
graph[w] = {}
# add all of the edges
for w in (dest_set | arg_set):
try:
_from = wire_src_dict[w]
except Exception:
_from = w # e.g. an Input/Const
if split_state and isinstance(w, Register):
_from = w
try:
_to_list = wire_dst_dict[w]
except Exception:
graph[_from][w] = [w] # e.g. an Output
else:
for _to in _to_list:
graph[_from][_to] = list(filter(lambda arg: arg is w, _to.args))
return graph
# -----------------------------------------------------------------
# ___ __ ___
# | / _` |___
# | \__> |
def _trivialgraph_default_namer(thing, is_edge=True):
""" Returns a "good" string for thing in printed graphs. """
if is_edge:
if thing.name is None or thing.name.startswith('tmp'):
return ''
else:
return '/'.join([thing.name, str(len(thing))])
elif isinstance(thing, Const):
return str(thing.val)
elif isinstance(thing, WireVector):
return thing.name or '??'
else:
try:
return thing.op + str(thing.op_param or '')
except AttributeError:
raise PyrtlError('no naming rule for "%s"' % str(thing))
[docs]
def output_to_trivialgraph(file, namer=_trivialgraph_default_namer, block=None, split_state=False):
"""Walk the block and output it in `trivial graph format
<https://en.wikipedia.org/wiki/Trivial_Graph_Format>`_ to the open file.
:param file: Open file to write to
:param namer: A function that takes in an object (a wire or LogicNet) as
the first argument and a boolean `is_edge` as the second that is set
True if the object is a wire, and returns a string representing that
object.
:param Block block: Block to use (defaults to current working block)
:param bool split_state: if True, split connections to/from a register
update net; this means that registers will be appear as source nodes of
the network, and ``r`` nets (i.e. the logic for setting a register's
next value) will be treated as sink nodes of the network.
"""
graph = net_graph(block, split_state)
node_index_map = {} # map node -> index
# print the list of nodes
for index, node in enumerate(graph):
print('%d %s' % (index, namer(node, is_edge=False)), file=file)
node_index_map[node] = index
print('#', file=file)
# print the list of edges
for _from in graph:
for _to in graph[_from]:
from_index = node_index_map[_from]
to_index = node_index_map[_to]
for edge in graph[_from][_to]:
print('%d %d %s' % (from_index, to_index, namer(edge)), file=file)
# -----------------------------------------------------------------
# __ __ __ __
# / _` |__) /\ |__) |__| \ / | /
# \__> | \ /~~\ | | | \/ | /__
def _default_edge_namer(edge, is_to_splitmerge=False, extra_edge_info=None):
"""
A function for naming an edge for use in the graphviz graph.
:param edge: the edge (i.e. WireVector or deriving class)
:param is_to_splitmerge: if the node to which the edge points
is a select or concat operation
:param extra_edge_info: a map from edge to any additional data you want
to print associated with it (e.g. timing data)
:return: a function that can be called by graph namer function you pass
in to block_to_graphviz_string
"""
name = '' if edge.name is None else '/'.join([edge.name, str(len(edge))])
if extra_edge_info and edge in extra_edge_info:
# Always label an edge if present in the extra_edge_info map
name = name + " (" + str(extra_edge_info[edge]) + ")"
elif (edge.name is None
or edge.name.startswith('tmp')
or isinstance(edge, (Input, Output, Const, Register))):
name = ''
penwidth = 2 if len(edge) == 1 else 6
arrowhead = 'none' if is_to_splitmerge else 'normal'
return '[label="%s", penwidth="%d", arrowhead="%s"]' % (name, penwidth, arrowhead)
def _default_node_namer(node, split_state=False, extra_node_info=None):
"""
A function for naming a node for use in the graphviz graph.
:param node: the node (i.e. WireVector or deriving class, or a logic net)
:param split_state: if True, split connections to/from a register update net; this
means that registers will be appear as source nodes of the network, and
'r' nets (i.e. the logic for setting a register's next value) will
be treated as sink nodes of the network.
:param extra_node_info: a map from node to any additional data you want
to print associated with it (e.g. delay data)
:return: a function that can be called by graph namer function you pass
in to block_to_graphviz_string
"""
def label(v):
if extra_node_info and node in extra_node_info:
v = v + " (" + str(extra_node_info[node]) + ")"
return v
if isinstance(node, Const):
name = node.name + ': ' if not node.name.startswith('const_') else ''
return '[label="%s", shape=circle, fillcolor=lightgrey]' % label(name + str(node.val))
elif isinstance(node, Input):
return '[label="%s", shape=invhouse, fillcolor=coral]' % label(node.name)
elif isinstance(node, Output):
return '[label="%s", shape=house, fillcolor=lawngreen]' % label(node.name)
elif isinstance(node, Register):
return '[label="%s", shape=square, fillcolor=gold]' % label(node.name)
elif isinstance(node, WireVector):
return '[label="%s", shape=circle, fillcolor=none]' % label(node.name)
else:
try:
if node.op == '&':
return '[label="%s"]' % label("and")
elif node.op == '|':
return '[label="%s"]' % label("or")
elif node.op == '^':
return '[label="%s"]' % label("xor")
elif node.op == '~':
return '[label="%s", shape=invtriangle]' % label("not")
elif node.op == 'x':
return '[label="%s", shape=invtrapezium]' % label("mux")
elif node.op == 's':
# node.op_param is a tuple of the selected bits to pull from the argument wire,
# so it could look something like (0,0,0,0,0,0,0), meaning dest wire is going
# to be a concatenation of the zero-th bit of the argument wire, 7 times.
selLower = node.op_param[0]
selUpper = node.op_param[-1]
if len(node.op_param) == 1:
bits = "[%d]" % selLower
elif node.op_param == tuple(range(selLower, selUpper + 1)): # consecutive
bits = "[%d:%d]" % (selUpper, selLower)
elif all([ix == node.op_param[0] for ix in node.op_param[1:]]): # all the same
bits = "[%d]*%d" % (node.op_param[0], len(node.op_param))
else:
bits = "bits" + str(tuple(reversed(node.op_param)))
return '[label="%s", fillcolor=azure1, height=.25, width=.25]' % label(bits)
elif node.op in 'c':
return '[label="%s", height=.1, width=.1]' % label("concat")
elif node.op == 'r':
name = node.dests[0].name or ''
name = ("%s.next" % name) if split_state else name
return '[label="%s", shape=square, fillcolor=gold]' % label(name)
elif node.op == 'w':
return '[label="%s", height=.1, width=.1]' % label("")
elif node.op in 'm@':
name = node.op_param[1].name
if name.startswith("tmp"):
name = ""
else:
name = "(" + name + ")"
return '[label="%s"]' % label(node.op + name)
else:
return '[label="%s"]' % label(node.op + str(node.op_param or ''))
except AttributeError:
raise PyrtlError('no naming rule for "%s"' % str(node))
def _graphviz_default_namer(
thing,
is_edge,
is_to_splitmerge,
split_state,
node_namer=_default_node_namer,
edge_namer=_default_edge_namer):
""" Returns a "good" Graphviz label for thing.
:param thing: The edge (wire) or node (logic net or Input/Output/Const) to name
:param is_edge: True if thing is an edge
:param is_to_splitmerge: if the node to which the edge points
is a select or concat operation
:param split_state: If True, visually split the connections to/from a register update net.
:param node_namer: A function mapping a node to a label; one of its arguments
is a dict mapping nodes to nodes to additional user-supplied information.
:param edge_namer: A function mapping an edge to a label; one of its arguments
is a dict mapping nodes to nodes to additional user-supplied information.
:return: A function that knows how to label each element in the graph, which
can be passed to 'output_to_graphviz' or 'block_to_graphviz_string'
"""
if is_edge:
return edge_namer(thing, is_to_splitmerge=is_to_splitmerge)
else:
return node_namer(thing, split_state=split_state)
[docs]
def graphviz_detailed_namer(
extra_node_info=None,
extra_edge_info=None):
""" Returns a detailed Graphviz namer that prints extra information
about nodes/edges in the given maps.
:param extra_node_info: A dict from node to some object about that node
(its string representation will be printed next to the node's label)
:param extra_edge_info: A dict from edge to some object about that edge
(its string representation will be printed next to the edge's label)
:return: A function that knows how to label each element in the graph, which
can be passed to :func:`.output_to_graphviz` or :func:`.block_to_graphviz_string`
If both dict arguments are None, the returned namer behaves identically
to the default Graphviz namer.
"""
def node_namer(node, split_state):
return _default_node_namer(node, split_state, extra_node_info)
def edge_namer(edge, is_to_splitmerge):
return _default_edge_namer(edge, is_to_splitmerge, extra_edge_info)
def namer(thing, is_edge, is_to_splitmerge, split_state):
return _graphviz_default_namer(
thing, is_edge, is_to_splitmerge, split_state,
node_namer=node_namer, edge_namer=edge_namer)
return namer
[docs]
def output_to_graphviz(file, block=None, namer=_graphviz_default_namer,
split_state=True, maintain_arg_order=False):
"""Walk the block and output it in `Graphviz <https://graphviz.org/>`_
format to the open file.
:param file: Open file to write to
:param Block block: Block to use (defaults to current working block)
:param namer: Function used to label each edge and node; see
:func:`block_to_graphviz_string` for more information.
:param bool split_state: If True, visually split the connections to/from a
register update net.
:param bool maintain_arg_order: If True, will add ordering constraints so
that that incoming edges are ordered left-to-right for nets where
argument order matters (e.g. ``<``). Keeping this as False results in a
cleaner, though less visually precise, graphical output.
The file written by the this function should be a directed graph in the
format expected by the `Graphviz package <https://graphviz.org/>`_,
specifically in the :command:`dot` format. Once Graphviz is installed, the
resulting graph file can be rendered to a .png file with::
dot -Tps output.dot > output.ps
"""
print(block_to_graphviz_string(block, namer, split_state, maintain_arg_order), file=file)
[docs]
def block_to_graphviz_string(block=None, namer=_graphviz_default_namer,
split_state=True, maintain_arg_order=False):
"""Return a Graphviz string for the block.
:param namer: A function mapping graph objects (wires/logic nets) to labels.
If you want a more detailed namer, pass in a call to
:func:`.graphviz_detailed_namer` (see below).
:param Block block: Block to use (defaults to current working block)
:param bool split_state: If True, split connections to/from a register
update net; this means that registers will be appear as source nodes of
the network, and ``r`` nets (i.e. the logic for setting a register's
next value) will be treated as sink nodes of the network.
:param bool maintain_arg_order: If True, will add ordering constraints so
that that incoming edges are ordered left-to-right for nets where
argument order matters (e.g. ``<``). Keeping this as False results in a
cleaner, though less visually precise, graphical output.
The normal namer function will label user-named wires with their names and
label the nodes (logic nets or Input/Output/Const terminals) with their
operator symbol or name/value, respectively. If custom information about
each node in the graph is desired, you can pass in a custom namer function
which must have the same signature as the default namer,
:func:`._graphviz_default_namer`. However, we recommend you instead pass in
a call to :func:`.graphviz_detailed_namer`, supplying it with your own
dicts mapping wires and nodes to labels. For any wire/node found in these
maps, that additional information will be printed in parentheses alongside
the node in the graphviz graph.
For example, if you wanted to print the delay of each wire and the fanout
of each gate, you could pass in two maps to the
:func:`.graphviz_detailed_namer` call, which returns a namer function that
can subsequently be passed to :func:`.output_to_graphviz` or
:func:`.block_to_graphviz_string`. ::
node_fanout = {n: "Fanout: %d" % my_fanout_func(n) for n in working_block().logic}
wire_delay = {w: "Delay: %.2f" % my_delay_func(w) for w in working_block().wirevector_set}
with open("out.gv", "w") as f:
output_to_graphviz(f, namer=graphviz_detailed_namer(node_fanout, wire_delay))
"""
graph = net_graph(block, split_state)
node_index_map = {} # map node -> index
rstring = """\
digraph g {
graph [splines="spline", outputorder="edgesfirst"];
node [shape=circle, style=filled, fillcolor=lightblue1,
fontcolor=black, fontname=helvetica, penwidth=0,
fixedsize=shape];
edge [labelfloat=false, penwidth=2, color=deepskyblue, arrowsize=.5];
"""
from .importexport import _natural_sort_key
def _node_sort_key(node):
# If a LogicNet and a wire share the same name, we want the LogicNet
# to sort first, so we arbitrarily 'A' and 'B' suffixes to break ties.
if isinstance(node, LogicNet):
if node.op == '@':
key = str(node.args[2]) + 'A'
else:
key = node.dests[0].name + 'A'
else:
key = node.name + 'B'
return _natural_sort_key(key)
# print the list of nodes
for index, node in enumerate(sorted(graph.keys(), key=_node_sort_key)):
label = namer(node, False, False, split_state)
rstring += ' n%s %s;\n' % (index, label)
node_index_map[node] = index
# print the list of edges
srcs = collections.defaultdict(list)
for _from in sorted(graph.keys(), key=_node_sort_key):
for _to in sorted(graph[_from].keys(), key=_node_sort_key):
from_index = node_index_map[_from]
to_index = node_index_map[_to]
for edge in graph[_from][_to]:
is_to_splitmerge = True if hasattr(_to, 'op') and _to.op in 'cs' else False
label = namer(edge, True, is_to_splitmerge, False)
rstring += ' n%d -> n%d %s;\n' % (from_index, to_index, label)
srcs[_to].append((_from, edge))
# Maintain left-to-right order of incoming wires for nets where order matters.
# This won't be visually perfect sometimes (especially for a wire used twice
# in a net's argument list), but for the majority of cases this will improve
# the visualization.
def index_of(w, args):
# Special helper so we compare id rather than using builtin operators
ix = 0
for arg in args:
if w is arg:
return ix
ix += 1
raise PyrtlInternalError('Expected to find wire in set of args')
if maintain_arg_order:
block = working_block(block)
for net in sorted(block.logic_subset(op='c-<>x@'), key=_node_sort_key):
args = [(node_index_map[n], wire) for (n, wire) in srcs[net]]
args.sort(key=lambda t: index_of(t[1], net.args))
s = ' -> '.join(['n%d' % n for n, _ in args])
rstring += ' {\n'
rstring += ' rank=same;\n'
rstring += ' edge[style=invis];\n'
rstring += ' ' + s + ';\n'
rstring += ' rankdir=LR;\n'
rstring += ' }\n'
rstring += '}\n'
return rstring
# -----------------------------------------------------------------
# __ __
# /__` \ / / _`
# .__/ \/ \__>
[docs]
def output_to_svg(file, block=None, split_state=True):
""" Output the block as an SVG to the open file.
:param file: Open file to write to
:param Block block: Block to use (defaults to current working block)
:param bool split_state: If True, visually split the connections to/from a register update net.
"""
print(block_to_svg(block, split_state), file=file)
[docs]
def block_to_svg(block=None, split_state=True, maintain_arg_order=False):
"""Return an SVG for the block.
:param Block block: Block to use (defaults to current working block)
:param bool split_state: If True, visually split the connections to/from a
register update net.
:param bool maintain_arg_order: If True, will add ordering constraints so
that that incoming edges are ordered left-to-right for nets where
argument order matters (e.g. ``<``). Keeping this as False results in a
cleaner, though less visually precise, graphical output.
:return: The SVG representation of the block
"""
try:
from graphviz import Source
src = Source(block_to_graphviz_string(block, split_state=split_state,
maintain_arg_order=maintain_arg_order))
try:
svg = src._repr_image_svg_xml()
except AttributeError:
# py-graphviz 0.18.3 or earlier
return src._repr_svg_()
else:
# py-graphviz 0.19 or later
return svg
except ImportError:
raise PyrtlError('need graphviz installed (try "pip install graphviz")')
# -----------------------------------------------------------------
# ___
# |__| | |\/| |
# | | | | | |___
[docs]
def trace_to_html(simtrace, trace_list=None, sortkey=None, repr_func=hex, repr_per_name={}):
""" Return a HTML block showing the trace.
:param SimulationTrace simtrace: A SimulationTrace object
:param list[str] trace_list: (optional) A list of wires to display
:param sortkey: (optional) The key with which to sort the trace_list
:param repr_func: function to use for representing the current_val;
examples are ``hex``, ``oct``, ``bin``, ``str`` (for decimal), or even the name
of an IntEnum class you know the value will belong to. Defaults to ``hex``.
:param repr_per_name: Map from signal name to a function that takes in the signal's
value and returns a user-defined representation. If a signal name is
not found in the map, the argument `repr_func` will be used instead.
:return: An HTML block showing the trace
"""
from .simulation import SimulationTrace, _trace_sort_key
if not isinstance(simtrace, SimulationTrace):
raise PyrtlError('first arguement must be of type SimulationTrace')
trace = simtrace.trace
if sortkey is None:
sortkey = _trace_sort_key
if trace_list is None:
trace_list = sorted(trace, key=sortkey)
wave_template = (
"""\
<script type="WaveDrom">
{
signal : [
%s
],
config: { hscale: %d }
}
</script>
"""
)
vallens = [] # For determining longest value length
def extract(w):
wavelist = []
datalist = []
last = None
for i, value in enumerate(trace[w]):
if last == value:
wavelist.append('.')
else:
f = repr_per_name.get(w)
if f is not None:
wavelist.append('=')
datalist.append(str(f(value)))
elif len(simtrace._wires[w]) == 1:
# int() to convert True/False to 0/1
wavelist.append(str(int(value)))
else:
wavelist.append('=')
datalist.append(str(repr_func(value)))
last = value
wavestring = ''.join(wavelist)
datastring = ', '.join(['"%s"' % data for data in datalist])
if repr_per_name.get(w) is None and len(simtrace._wires[w]) == 1:
vallens.append(1) # all are the same length
return bool_signal_template % (w, wavestring)
else:
vallens.extend([len(data) for data in datalist])
return int_signal_template % (w, wavestring, datastring)
bool_signal_template = ' { name: "%s", wave: "%s" },'
int_signal_template = ' { name: "%s", wave: "%s", data: [%s] },'
signals = [extract(w) for w in trace_list]
all_signals = '\n'.join(signals)
maxvallen = max(vallens)
scale = (maxvallen // 5) + 1
wave = wave_template % (all_signals, scale)
# print(wave)
return wave