summaryrefslogtreecommitdiff
path: root/tools/net/ynl/pyynl/cli.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/net/ynl/pyynl/cli.py')
-rwxr-xr-xtools/net/ynl/pyynl/cli.py285
1 files changed, 193 insertions, 92 deletions
diff --git a/tools/net/ynl/pyynl/cli.py b/tools/net/ynl/pyynl/cli.py
index af02a5b7e5a2..94a5ba348b69 100755
--- a/tools/net/ynl/pyynl/cli.py
+++ b/tools/net/ynl/pyynl/cli.py
@@ -1,43 +1,85 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
+"""
+YNL cli tool
+"""
+
import argparse
import json
import os
import pathlib
import pprint
+import shutil
import sys
import textwrap
+# pylint: disable=no-name-in-module,wrong-import-position
sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
-from lib import YnlFamily, Netlink, NlError, SpecFamily
+from lib import YnlFamily, Netlink, NlError, SpecFamily, SpecException, YnlException
+
+SYS_SCHEMA_DIR='/usr/share/ynl'
+RELATIVE_SCHEMA_DIR='../../../../Documentation/netlink'
+
+# pylint: disable=too-few-public-methods,too-many-locals
+class Colors:
+ """ANSI color and font modifier codes"""
+ RESET = '\033[0m'
+
+ BOLD = '\033[1m'
+ ITALICS = '\033[3m'
+ UNDERLINE = '\033[4m'
+ INVERT = '\033[7m'
+
+
+def color(text, modifiers):
+ """Add color to text if output is a TTY
-sys_schema_dir='/usr/share/ynl'
-relative_schema_dir='../../../../Documentation/netlink'
+ Returns:
+ Colored text if stdout is a TTY, otherwise plain text
+ """
+ if sys.stdout.isatty():
+ # Join the colors if they are a list, if it's a string this a noop
+ modifiers = "".join(modifiers)
+ return f"{modifiers}{text}{Colors.RESET}"
+ return text
+
+def term_width():
+ """ Get terminal width in columns (80 if stdout is not a terminal) """
+ return shutil.get_terminal_size().columns
def schema_dir():
+ """
+ Return the effective schema directory, preferring in-tree before
+ system schema directory.
+ """
script_dir = os.path.dirname(os.path.abspath(__file__))
- schema_dir = os.path.abspath(f"{script_dir}/{relative_schema_dir}")
- if not os.path.isdir(schema_dir):
- schema_dir = sys_schema_dir
- if not os.path.isdir(schema_dir):
- raise Exception(f"Schema directory {schema_dir} does not exist")
- return schema_dir
+ schema_dir_ = os.path.abspath(f"{script_dir}/{RELATIVE_SCHEMA_DIR}")
+ if not os.path.isdir(schema_dir_):
+ schema_dir_ = SYS_SCHEMA_DIR
+ if not os.path.isdir(schema_dir_):
+ raise YnlException(f"Schema directory {schema_dir_} does not exist")
+ return schema_dir_
def spec_dir():
- spec_dir = schema_dir() + '/specs'
- if not os.path.isdir(spec_dir):
- raise Exception(f"Spec directory {spec_dir} does not exist")
- return spec_dir
+ """
+ Return the effective spec directory, relative to the effective
+ schema directory.
+ """
+ spec_dir_ = schema_dir() + '/specs'
+ if not os.path.isdir(spec_dir_):
+ raise YnlException(f"Spec directory {spec_dir_} does not exist")
+ return spec_dir_
class YnlEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, bytes):
- return bytes.hex(obj)
- if isinstance(obj, set):
- return list(obj)
- return json.JSONEncoder.default(self, obj)
+ """A custom encoder for emitting JSON with ynl-specific instance types"""
+ def default(self, o):
+ if isinstance(o, bytes):
+ return bytes.hex(o)
+ if isinstance(o, set):
+ return list(o)
+ return json.JSONEncoder.default(self, o)
def print_attr_list(ynl, attr_names, attr_set, indent=2):
@@ -46,7 +88,7 @@ def print_attr_list(ynl, attr_names, attr_set, indent=2):
for attr_name in attr_names:
if attr_name in attr_set.attrs:
attr = attr_set.attrs[attr_name]
- attr_info = f'{prefix}- {attr_name}: {attr.type}'
+ attr_info = f'{prefix}- {color(attr_name, Colors.BOLD)}: {attr.type}'
if 'enum' in attr.yaml:
enum_name = attr.yaml['enum']
attr_info += f" (enum: {enum_name})"
@@ -54,7 +96,8 @@ def print_attr_list(ynl, attr_names, attr_set, indent=2):
if enum_name in ynl.consts:
const = ynl.consts[enum_name]
enum_values = list(const.entries.keys())
- attr_info += f"\n{prefix} {const.type.capitalize()}: {', '.join(enum_values)}"
+ type_fmted = color(const.type.capitalize(), Colors.ITALICS)
+ attr_info += f"\n{prefix} {type_fmted}: {', '.join(enum_values)}"
# Show nested attributes reference and recursively display them
nested_set_name = None
@@ -63,7 +106,10 @@ def print_attr_list(ynl, attr_names, attr_set, indent=2):
attr_info += f" -> {nested_set_name}"
if attr.yaml.get('doc'):
- doc_text = textwrap.indent(attr.yaml['doc'], prefix + ' ')
+ doc_prefix = prefix + ' ' * 4
+ doc_text = textwrap.fill(attr.yaml['doc'], width=term_width(),
+ initial_indent=doc_prefix,
+ subsequent_indent=doc_prefix)
attr_info += f"\n{doc_text}"
print(attr_info)
@@ -77,24 +123,62 @@ def print_attr_list(ynl, attr_names, attr_set, indent=2):
print_attr_list(ynl, nested_names, nested_set, indent + 4)
-def print_mode_attrs(ynl, mode, mode_spec, attr_set, print_request=True):
+def print_mode_attrs(ynl, mode, mode_spec, attr_set, consistent_dd_reply=None):
"""Print a given mode (do/dump/event/notify)."""
mode_title = mode.capitalize()
- if print_request and 'request' in mode_spec and 'attributes' in mode_spec['request']:
+ if 'request' in mode_spec and 'attributes' in mode_spec['request']:
print(f'\n{mode_title} request attributes:')
print_attr_list(ynl, mode_spec['request']['attributes'], attr_set)
if 'reply' in mode_spec and 'attributes' in mode_spec['reply']:
- print(f'\n{mode_title} reply attributes:')
- print_attr_list(ynl, mode_spec['reply']['attributes'], attr_set)
+ if consistent_dd_reply and mode == "do":
+ title = None # Dump handling will print in combined format
+ elif consistent_dd_reply and mode == "dump":
+ title = 'Do and Dump'
+ else:
+ title = f'{mode_title}'
+ if title:
+ print(f'\n{title} reply attributes:')
+ print_attr_list(ynl, mode_spec['reply']['attributes'], attr_set)
+
+
+def do_doc(ynl, op):
+ """Handle --list-attrs $op, print the attr information to stdout"""
+ print(f'Operation: {color(op.name, Colors.BOLD)}')
+ print(op.yaml['doc'])
+
+ consistent_dd_reply = False
+ if 'do' in op.yaml and 'dump' in op.yaml and 'reply' in op.yaml['do'] and \
+ op.yaml['do']['reply'] == op.yaml['dump'].get('reply'):
+ consistent_dd_reply = True
+
+ for mode in ['do', 'dump']:
+ if mode in op.yaml:
+ print_mode_attrs(ynl, mode, op.yaml[mode], op.attr_set,
+ consistent_dd_reply=consistent_dd_reply)
+
+ if 'attributes' in op.yaml.get('event', {}):
+ print('\nEvent attributes:')
+ print_attr_list(ynl, op.yaml['event']['attributes'], op.attr_set)
- if 'attributes' in mode_spec:
- print(f'\n{mode_title} attributes:')
- print_attr_list(ynl, mode_spec['attributes'], attr_set)
+ if 'notify' in op.yaml:
+ mode_spec = op.yaml['notify']
+ ref_spec = ynl.msgs.get(mode_spec).yaml.get('do')
+ if not ref_spec:
+ ref_spec = ynl.msgs.get(mode_spec).yaml.get('dump')
+ if ref_spec:
+ print('\nNotification attributes:')
+ print_attr_list(ynl, ref_spec['reply']['attributes'], op.attr_set)
+ if 'mcgrp' in op.yaml:
+ print(f"\nMulticast group: {op.yaml['mcgrp']}")
+
+# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def main():
+ """YNL cli tool"""
+
description = """
YNL CLI utility - a general purpose netlink utility that uses YAML
specs to drive protocol encoding and decoding.
@@ -105,54 +189,85 @@ def main():
"""
parser = argparse.ArgumentParser(description=description,
- epilog=epilog)
- spec_group = parser.add_mutually_exclusive_group(required=True)
- spec_group.add_argument('--family', dest='family', type=str,
- help='name of the netlink FAMILY')
- spec_group.add_argument('--list-families', action='store_true',
- help='list all netlink families supported by YNL (has spec)')
- spec_group.add_argument('--spec', dest='spec', type=str,
- help='choose the family by SPEC file path')
-
- parser.add_argument('--schema', dest='schema', type=str)
- parser.add_argument('--no-schema', action='store_true')
- parser.add_argument('--json', dest='json_text', type=str)
-
- group = parser.add_mutually_exclusive_group()
- group.add_argument('--do', dest='do', metavar='DO-OPERATION', type=str)
- group.add_argument('--multi', dest='multi', nargs=2, action='append',
- metavar=('DO-OPERATION', 'JSON_TEXT'), type=str)
- group.add_argument('--dump', dest='dump', metavar='DUMP-OPERATION', type=str)
- group.add_argument('--list-ops', action='store_true')
- group.add_argument('--list-msgs', action='store_true')
- group.add_argument('--list-attrs', dest='list_attrs', metavar='OPERATION', type=str,
- help='List attributes for an operation')
- group.add_argument('--validate', action='store_true')
-
- parser.add_argument('--duration', dest='duration', type=int,
- help='when subscribed, watch for DURATION seconds')
- parser.add_argument('--sleep', dest='duration', type=int,
- help='alias for duration')
- parser.add_argument('--subscribe', dest='ntf', type=str)
- parser.add_argument('--replace', dest='flags', action='append_const',
- const=Netlink.NLM_F_REPLACE)
- parser.add_argument('--excl', dest='flags', action='append_const',
- const=Netlink.NLM_F_EXCL)
- parser.add_argument('--create', dest='flags', action='append_const',
- const=Netlink.NLM_F_CREATE)
- parser.add_argument('--append', dest='flags', action='append_const',
- const=Netlink.NLM_F_APPEND)
- parser.add_argument('--process-unknown', action=argparse.BooleanOptionalAction)
- parser.add_argument('--output-json', action='store_true')
- parser.add_argument('--dbg-small-recv', default=0, const=4000,
- action='store', nargs='?', type=int)
+ epilog=epilog, add_help=False)
+
+ gen_group = parser.add_argument_group('General options')
+ gen_group.add_argument('-h', '--help', action='help',
+ help='show this help message and exit')
+
+ spec_group = parser.add_argument_group('Netlink family selection')
+ spec_sel = spec_group.add_mutually_exclusive_group(required=True)
+ spec_sel.add_argument('--list-families', action='store_true',
+ help=('list Netlink families supported by YNL '
+ '(which have a spec available in the standard '
+ 'system path)'))
+ spec_sel.add_argument('--family', dest='family', type=str,
+ help='name of the Netlink FAMILY to use')
+ spec_sel.add_argument('--spec', dest='spec', type=str,
+ help='full file path to the YAML spec file')
+
+ ops_group = parser.add_argument_group('Operations')
+ ops = ops_group.add_mutually_exclusive_group()
+ ops.add_argument('--do', dest='do', metavar='DO-OPERATION', type=str)
+ ops.add_argument('--dump', dest='dump', metavar='DUMP-OPERATION', type=str)
+ ops.add_argument('--multi', dest='multi', nargs=2, action='append',
+ metavar=('DO-OPERATION', 'JSON_TEXT'), type=str,
+ help="Multi-message operation sequence (for nftables)")
+ ops.add_argument('--list-ops', action='store_true',
+ help="List available --do and --dump operations")
+ ops.add_argument('--list-msgs', action='store_true',
+ help="List all messages of the family (incl. notifications)")
+ ops.add_argument('--list-attrs', '--doc', dest='list_attrs', metavar='MSG',
+ type=str, help='List attributes for a message / operation')
+ ops.add_argument('--validate', action='store_true',
+ help="Validate the spec against schema and exit")
+
+ io_group = parser.add_argument_group('Input / Output')
+ io_group.add_argument('--json', dest='json_text', type=str,
+ help=('Specify attributes of the message to send '
+ 'to the kernel in JSON format. Can be left out '
+ 'if the message is expected to be empty.'))
+ io_group.add_argument('--output-json', action='store_true',
+ help='Format output as JSON')
+
+ ntf_group = parser.add_argument_group('Notifications')
+ ntf_group.add_argument('--subscribe', dest='ntf', type=str)
+ ntf_group.add_argument('--duration', dest='duration', type=int,
+ help='when subscribed, watch for DURATION seconds')
+ ntf_group.add_argument('--sleep', dest='duration', type=int,
+ help='alias for duration')
+
+ nlflags = parser.add_argument_group('Netlink message flags (NLM_F_*)',
+ ('Extra flags to set in nlmsg_flags of '
+ 'the request, used mostly by older '
+ 'Classic Netlink families.'))
+ nlflags.add_argument('--replace', dest='flags', action='append_const',
+ const=Netlink.NLM_F_REPLACE)
+ nlflags.add_argument('--excl', dest='flags', action='append_const',
+ const=Netlink.NLM_F_EXCL)
+ nlflags.add_argument('--create', dest='flags', action='append_const',
+ const=Netlink.NLM_F_CREATE)
+ nlflags.add_argument('--append', dest='flags', action='append_const',
+ const=Netlink.NLM_F_APPEND)
+
+ schema_group = parser.add_argument_group('Development options')
+ schema_group.add_argument('--schema', dest='schema', type=str,
+ help="JSON schema to validate the spec")
+ schema_group.add_argument('--no-schema', action='store_true')
+
+ dbg_group = parser.add_argument_group('Debug options')
+ dbg_group.add_argument('--dbg-small-recv', default=0, const=4000,
+ action='store', nargs='?', type=int, metavar='INT',
+ help="Length of buffers used for recv()")
+ dbg_group.add_argument('--process-unknown', action=argparse.BooleanOptionalAction)
+
args = parser.parse_args()
def output(msg):
if args.output_json:
print(json.dumps(msg, cls=YnlEncoder))
else:
- pprint.PrettyPrinter().pprint(msg)
+ pprint.pprint(msg, width=term_width(), compact=True)
if args.list_families:
for filename in sorted(os.listdir(spec_dir())):
@@ -172,18 +287,18 @@ def main():
else:
spec = args.spec
if not os.path.isfile(spec):
- raise Exception(f"Spec file {spec} does not exist")
+ raise YnlException(f"Spec file {spec} does not exist")
if args.validate:
try:
SpecFamily(spec, args.schema)
- except Exception as error:
+ except SpecException as error:
print(error)
- exit(1)
+ sys.exit(1)
return
if args.family: # set behaviour when using installed specs
- if args.schema is None and spec.startswith(sys_schema_dir):
+ if args.schema is None and spec.startswith(SYS_SCHEMA_DIR):
args.schema = '' # disable schema validation when installed
if args.process_unknown is None:
args.process_unknown = True
@@ -207,23 +322,9 @@ def main():
op = ynl.msgs.get(args.list_attrs)
if not op:
print(f'Operation {args.list_attrs} not found')
- exit(1)
-
- print(f'Operation: {op.name}')
- print(op.yaml['doc'])
-
- for mode in ['do', 'dump', 'event']:
- if mode in op.yaml:
- print_mode_attrs(ynl, mode, op.yaml[mode], op.attr_set, True)
-
- if 'notify' in op.yaml:
- mode_spec = op.yaml['notify']
- ref_spec = ynl.msgs.get(mode_spec).yaml.get('do')
- if ref_spec:
- print_mode_attrs(ynl, 'notify', ref_spec, op.attr_set, False)
+ sys.exit(1)
- if 'mcgrp' in op.yaml:
- print(f"\nMulticast group: {op.yaml['mcgrp']}")
+ do_doc(ynl, op)
try:
if args.do:
@@ -242,7 +343,7 @@ def main():
output(msg)
except NlError as e:
print(e)
- exit(1)
+ sys.exit(1)
except KeyboardInterrupt:
pass
except BrokenPipeError: