Files
linux/tools/net/ynl/pyynl/cli.py
Gal Pressman 2a2d5a3392 tools: ynl: cli: Add --list-attrs option to show operation attributes
Add a --list-attrs option to the YNL CLI that displays information about
netlink operations, including request and reply attributes.
This eliminates the need to manually inspect YAML spec files to
determine the JSON structure required for operations, or understand the
structure of the reply.

Example usage:
  # ./cli.py --family netdev --list-attrs dev-get
  Operation: dev-get
  Get / dump information about a netdev.

  Do request attributes:
    - ifindex: u32
      netdev ifindex

  Do reply attributes:
    - ifindex: u32
      netdev ifindex
    - xdp-features: u64 (enum: xdp-act)
      Bitmask of enabled xdp-features.
    - xdp-zc-max-segs: u32
      max fragment count supported by ZC driver
    - xdp-rx-metadata-features: u64 (enum: xdp-rx-metadata)
      Bitmask of supported XDP receive metadata features. See Documentation/networking/xdp-rx-metadata.rst for more details.
    - xsk-features: u64 (enum: xsk-flags)
      Bitmask of enabled AF_XDP features.

  Dump reply attributes:
    - ifindex: u32
      netdev ifindex
    - xdp-features: u64 (enum: xdp-act)
      Bitmask of enabled xdp-features.
    - xdp-zc-max-segs: u32
      max fragment count supported by ZC driver
    - xdp-rx-metadata-features: u64 (enum: xdp-rx-metadata)
      Bitmask of supported XDP receive metadata features. See Documentation/networking/xdp-rx-metadata.rst for more details.
    - xsk-features: u64 (enum: xsk-flags)
      Bitmask of enabled AF_XDP features.

Reviewed-by: Nimrod Oren <noren@nvidia.com>
Signed-off-by: Gal Pressman <gal@nvidia.com>
Link: https://patch.msgid.link/20251118143208.2380814-2-gal@nvidia.com
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
2025-11-20 15:43:04 +01:00

220 lines
7.8 KiB
Python
Executable File

#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
import argparse
import json
import os
import pathlib
import pprint
import sys
import textwrap
sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
from lib import YnlFamily, Netlink, NlError
sys_schema_dir='/usr/share/ynl'
relative_schema_dir='../../../../Documentation/netlink'
def schema_dir():
script_dir = os.path.dirname(os.path.abspath(__file__))
schema_dir = os.path.abspath(f"{script_dir}/{relative_schema_dir}")
if not os.path.isdir(schema_dir):
schema_dir = sys_schema_dir
if not os.path.isdir(schema_dir):
raise Exception(f"Schema directory {schema_dir} does not exist")
return schema_dir
def spec_dir():
spec_dir = schema_dir() + '/specs'
if not os.path.isdir(spec_dir):
raise Exception(f"Spec directory {spec_dir} does not exist")
return spec_dir
class YnlEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return bytes.hex(obj)
if isinstance(obj, set):
return list(obj)
return json.JSONEncoder.default(self, obj)
def print_attr_list(attr_names, attr_set):
"""Print a list of attributes with their types and documentation."""
for attr_name in attr_names:
if attr_name in attr_set.attrs:
attr = attr_set.attrs[attr_name]
attr_info = f' - {attr_name}: {attr.type}'
if 'enum' in attr.yaml:
attr_info += f" (enum: {attr.yaml['enum']})"
if attr.yaml.get('doc'):
doc_text = textwrap.indent(attr.yaml['doc'], ' ')
attr_info += f"\n{doc_text}"
print(attr_info)
def print_mode_attrs(mode, mode_spec, attr_set, print_request=True):
"""Print a given mode (do/dump/event/notify)."""
mode_title = mode.capitalize()
if print_request and 'request' in mode_spec and 'attributes' in mode_spec['request']:
print(f'\n{mode_title} request attributes:')
print_attr_list(mode_spec['request']['attributes'], attr_set)
if 'reply' in mode_spec and 'attributes' in mode_spec['reply']:
print(f'\n{mode_title} reply attributes:')
print_attr_list(mode_spec['reply']['attributes'], attr_set)
if 'attributes' in mode_spec:
print(f'\n{mode_title} attributes:')
print_attr_list(mode_spec['attributes'], attr_set)
def main():
description = """
YNL CLI utility - a general purpose netlink utility that uses YAML
specs to drive protocol encoding and decoding.
"""
epilog = """
The --multi option can be repeated to include several do operations
in the same netlink payload.
"""
parser = argparse.ArgumentParser(description=description,
epilog=epilog)
spec_group = parser.add_mutually_exclusive_group(required=True)
spec_group.add_argument('--family', dest='family', type=str,
help='name of the netlink FAMILY')
spec_group.add_argument('--list-families', action='store_true',
help='list all netlink families supported by YNL (has spec)')
spec_group.add_argument('--spec', dest='spec', type=str,
help='choose the family by SPEC file path')
parser.add_argument('--schema', dest='schema', type=str)
parser.add_argument('--no-schema', action='store_true')
parser.add_argument('--json', dest='json_text', type=str)
group = parser.add_mutually_exclusive_group()
group.add_argument('--do', dest='do', metavar='DO-OPERATION', type=str)
group.add_argument('--multi', dest='multi', nargs=2, action='append',
metavar=('DO-OPERATION', 'JSON_TEXT'), type=str)
group.add_argument('--dump', dest='dump', metavar='DUMP-OPERATION', type=str)
group.add_argument('--list-ops', action='store_true')
group.add_argument('--list-msgs', action='store_true')
group.add_argument('--list-attrs', dest='list_attrs', metavar='OPERATION', type=str,
help='List attributes for an operation')
parser.add_argument('--duration', dest='duration', type=int,
help='when subscribed, watch for DURATION seconds')
parser.add_argument('--sleep', dest='duration', type=int,
help='alias for duration')
parser.add_argument('--subscribe', dest='ntf', type=str)
parser.add_argument('--replace', dest='flags', action='append_const',
const=Netlink.NLM_F_REPLACE)
parser.add_argument('--excl', dest='flags', action='append_const',
const=Netlink.NLM_F_EXCL)
parser.add_argument('--create', dest='flags', action='append_const',
const=Netlink.NLM_F_CREATE)
parser.add_argument('--append', dest='flags', action='append_const',
const=Netlink.NLM_F_APPEND)
parser.add_argument('--process-unknown', action=argparse.BooleanOptionalAction)
parser.add_argument('--output-json', action='store_true')
parser.add_argument('--dbg-small-recv', default=0, const=4000,
action='store', nargs='?', type=int)
args = parser.parse_args()
def output(msg):
if args.output_json:
print(json.dumps(msg, cls=YnlEncoder))
else:
pprint.PrettyPrinter().pprint(msg)
if args.list_families:
for filename in sorted(os.listdir(spec_dir())):
if filename.endswith('.yaml'):
print(filename.removesuffix('.yaml'))
return
if args.no_schema:
args.schema = ''
attrs = {}
if args.json_text:
attrs = json.loads(args.json_text)
if args.family:
spec = f"{spec_dir()}/{args.family}.yaml"
if args.schema is None and spec.startswith(sys_schema_dir):
args.schema = '' # disable schema validation when installed
if args.process_unknown is None:
args.process_unknown = True
else:
spec = args.spec
if not os.path.isfile(spec):
raise Exception(f"Spec file {spec} does not exist")
ynl = YnlFamily(spec, args.schema, args.process_unknown,
recv_size=args.dbg_small_recv)
if args.dbg_small_recv:
ynl.set_recv_dbg(True)
if args.ntf:
ynl.ntf_subscribe(args.ntf)
if args.list_ops:
for op_name, op in ynl.ops.items():
print(op_name, " [", ", ".join(op.modes), "]")
if args.list_msgs:
for op_name, op in ynl.msgs.items():
print(op_name, " [", ", ".join(op.modes), "]")
if args.list_attrs:
op = ynl.msgs.get(args.list_attrs)
if not op:
print(f'Operation {args.list_attrs} not found')
exit(1)
print(f'Operation: {op.name}')
print(op.yaml['doc'])
for mode in ['do', 'dump', 'event']:
if mode in op.yaml:
print_mode_attrs(mode, op.yaml[mode], op.attr_set, True)
if 'notify' in op.yaml:
mode_spec = op.yaml['notify']
ref_spec = ynl.msgs.get(mode_spec).yaml.get('do')
if ref_spec:
print_mode_attrs('notify', ref_spec, op.attr_set, False)
if 'mcgrp' in op.yaml:
print(f"\nMulticast group: {op.yaml['mcgrp']}")
try:
if args.do:
reply = ynl.do(args.do, attrs, args.flags)
output(reply)
if args.dump:
reply = ynl.dump(args.dump, attrs)
output(reply)
if args.multi:
ops = [ (item[0], json.loads(item[1]), args.flags or []) for item in args.multi ]
reply = ynl.do_multi(ops)
output(reply)
if args.ntf:
for msg in ynl.poll_ntf(duration=args.duration):
output(msg)
except NlError as e:
print(e)
exit(1)
except KeyboardInterrupt:
pass
except BrokenPipeError:
pass
if __name__ == "__main__":
main()