diff mbox series

[net-next,v2,2/2] tools/net/ynl: add async notification handling

Message ID 20241112111727.91575-3-donald.hunter@gmail.com (mailing list archive)
State Superseded
Delegated to: Netdev Maintainers
Headers show
Series tools/net/ynl: rework async notification handling | expand

Checks

Context Check Description
netdev/series_format success Posting correctly formatted
netdev/tree_selection success Clearly marked for net-next
netdev/ynl success Generated files up to date; no warnings/errors; no diff in generated;
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 3 this patch: 3
netdev/build_tools success Errors and warnings before: 0 (+0) this patch: 0 (+0)
netdev/cc_maintainers success CCed 6 of 6 maintainers
netdev/build_clang success Errors and warnings before: 3 this patch: 3
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 3 this patch: 3
netdev/checkpatch success total: 0 errors, 0 warnings, 0 checks, 96 lines checked
netdev/build_clang_rust success No Rust files in patch. Skipping build
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0

Commit Message

Donald Hunter Nov. 12, 2024, 11:17 a.m. UTC
The notification handling in ynl is currently very simple, using sleep()
to wait a period of time and then handling all the buffered messages in
a single batch.

This patch adds async notification handling so that messages can be
processed as they are received. This makes it possible to use ynl as a
library that supplies notifications in a timely manner.

- Add poll_ntf() to be a generator that yields 1 notification at a
  time and blocks until a notification is available.
- Add a --duration parameter to the CLI, with --sleep as an alias.

./tools/net/ynl/cli.py \
    --spec <SPEC> --subscribe <TOPIC> [ --duration <SECS> ]

The cli will report any notifications for duration seconds and then
exit. If duration is not specified, then it will poll forever, until
interrupted.

Here is an example python snippet that shows how to use ynl as a library
for receiving notifications:

    ynl = YnlFamily(f"{dir}/rt_route.yaml")
    ynl.ntf_subscribe('rtnlgrp-ipv4-route')

    for event in ynl.poll_ntf():
        handle(event)

Signed-off-by: Donald Hunter <donald.hunter@gmail.com>
---
 tools/net/ynl/cli.py     | 16 +++++++++-------
 tools/net/ynl/lib/ynl.py | 28 +++++++++++++++++++++++++---
 2 files changed, 34 insertions(+), 10 deletions(-)

Comments

Jakub Kicinski Nov. 13, 2024, 1:38 a.m. UTC | #1
some comments for your consideration, if you prefer to keep as is:

Acked-by: Jakub Kicinski <kuba@kernel.org>

On Tue, 12 Nov 2024 11:17:27 +0000 Donald Hunter wrote:
> +    def poll_ntf(self, duration=None):
> +        endtime = time.time() + duration if duration is not None else None

we can record starttime here, and avoid the complex logic..

> +        selector = selectors.DefaultSelector()
> +        selector.register(self.sock, selectors.EVENT_READ)
> +
> +        while True:
> +            try:
> +                yield self.async_msg_queue.get_nowait()
> +            except queue.Empty:
> +                if endtime is not None:
> +                    interval = endtime - time.time()

then here:

		if duration is not None:
			timeout = time.time() - starttime + duration

and rest as is (modulo the s/interval/timeout/)

> +                    if interval <= 0:
> +                        return
> +                else:
> +                    interval = None
> +                events = selector.select(interval)
> +                if events:
> +                    self.check_ntf()
Donald Hunter Nov. 13, 2024, 8:48 a.m. UTC | #2
On Wed, 13 Nov 2024 at 01:38, Jakub Kicinski <kuba@kernel.org> wrote:
>
> some comments for your consideration, if you prefer to keep as is:
>
> Acked-by: Jakub Kicinski <kuba@kernel.org>
>
> On Tue, 12 Nov 2024 11:17:27 +0000 Donald Hunter wrote:
> > +    def poll_ntf(self, duration=None):
> > +        endtime = time.time() + duration if duration is not None else None
>
> we can record starttime here, and avoid the complex logic..

Yes, of course. That's a lot clearer.

> > +        selector = selectors.DefaultSelector()
> > +        selector.register(self.sock, selectors.EVENT_READ)
> > +
> > +        while True:
> > +            try:
> > +                yield self.async_msg_queue.get_nowait()
> > +            except queue.Empty:
> > +                if endtime is not None:
> > +                    interval = endtime - time.time()
>
> then here:
>
>                 if duration is not None:
>                         timeout = time.time() - starttime + duration

That'd be starttime + duration - time.time().

I'll respin with these changes, thanks!
diff mbox series

Patch

diff --git a/tools/net/ynl/cli.py b/tools/net/ynl/cli.py
index b8481f401376..0601fcc53601 100755
--- a/tools/net/ynl/cli.py
+++ b/tools/net/ynl/cli.py
@@ -4,7 +4,6 @@ 
 import argparse
 import json
 import pprint
-import time
 
 from lib import YnlFamily, Netlink, NlError
 
@@ -43,7 +42,10 @@  def main():
     group.add_argument('--list-ops', action='store_true')
     group.add_argument('--list-msgs', action='store_true')
 
-    parser.add_argument('--sleep', dest='sleep', type=int)
+    parser.add_argument('--duration', dest='duration', type=int,
+                        help='when subscribed, watch for DURATION seconds')
+    parser.add_argument('--sleep', dest='duration', type=int,
+                        help='alias for duration')
     parser.add_argument('--subscribe', dest='ntf', type=str)
     parser.add_argument('--replace', dest='flags', action='append_const',
                         const=Netlink.NLM_F_REPLACE)
@@ -80,9 +82,6 @@  def main():
     if args.ntf:
         ynl.ntf_subscribe(args.ntf)
 
-    if args.sleep:
-        time.sleep(args.sleep)
-
     if args.list_ops:
         for op_name, op in ynl.ops.items():
             print(op_name, " [", ", ".join(op.modes), "]")
@@ -106,8 +105,11 @@  def main():
         exit(1)
 
     if args.ntf:
-        ynl.check_ntf()
-        output(ynl.async_msg_queue)
+        try:
+            for msg in ynl.poll_ntf(duration=args.duration):
+                output(msg)
+        except KeyboardInterrupt:
+            pass
 
 
 if __name__ == "__main__":
diff --git a/tools/net/ynl/lib/ynl.py b/tools/net/ynl/lib/ynl.py
index c22c22bf2cb7..ffb1c4263d09 100644
--- a/tools/net/ynl/lib/ynl.py
+++ b/tools/net/ynl/lib/ynl.py
@@ -12,6 +12,9 @@  import sys
 import yaml
 import ipaddress
 import uuid
+import queue
+import selectors
+import time
 
 from .nlspec import SpecFamily
 
@@ -489,7 +492,7 @@  class YnlFamily(SpecFamily):
         self.sock.setsockopt(Netlink.SOL_NETLINK, Netlink.NETLINK_GET_STRICT_CHK, 1)
 
         self.async_msg_ids = set()
-        self.async_msg_queue = []
+        self.async_msg_queue = queue.Queue()
 
         for msg in self.msgs.values():
             if msg.is_async:
@@ -903,7 +906,7 @@  class YnlFamily(SpecFamily):
 
         msg['name'] = op['name']
         msg['msg'] = attrs
-        self.async_msg_queue.append(msg)
+        self.async_msg_queue.put(msg)
 
     def check_ntf(self):
         while True:
@@ -925,11 +928,30 @@  class YnlFamily(SpecFamily):
 
                 decoded = self.nlproto.decode(self, nl_msg, None)
                 if decoded.cmd() not in self.async_msg_ids:
-                    print("Unexpected msg id done while checking for ntf", decoded)
+                    print("Unexpected msg id while checking for ntf", decoded)
                     continue
 
                 self.handle_ntf(decoded)
 
+    def poll_ntf(self, duration=None):
+        endtime = time.time() + duration if duration is not None else None
+        selector = selectors.DefaultSelector()
+        selector.register(self.sock, selectors.EVENT_READ)
+
+        while True:
+            try:
+                yield self.async_msg_queue.get_nowait()
+            except queue.Empty:
+                if endtime is not None:
+                    interval = endtime - time.time()
+                    if interval <= 0:
+                        return
+                else:
+                    interval = None
+                events = selector.select(interval)
+                if events:
+                    self.check_ntf()
+
     def operation_do_attributes(self, name):
       """
       For a given operation name, find and return a supported