selftests: drv-net: add a way to wait for a local process

We use wait_port_listen() extensively to wait for a process
we spawned to be ready. Not all processes will open listening
sockets. Add a method of explicitly waiting for a child to
be ready. Pass a FD to the spawned process and wait for it
to write a message to us. FD number is passed via KSFT_READY_FD
env variable.

Similarly use KSFT_WAIT_FD to let the child process for a sign
that we are done and child should exit. Sending a signal to
a child with shell=True can get tricky.

Make use of this method in the queues test to make it less flaky.

Acked-by: Stanislav Fomichev <sdf@fomichev.me>
Acked-by: Joe Damato <jdamato@fastly.com>
Tested-by: Joe Damato <jdamato@fastly.com>
Link: https://patch.msgid.link/20250219234956.520599-6-kuba@kernel.org
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
This commit is contained in:
Jakub Kicinski
2025-02-19 15:49:54 -08:00
parent d3726ab45c
commit 7147713799
3 changed files with 133 additions and 32 deletions

View File

@@ -5,13 +5,12 @@ from lib.py import ksft_disruptive, ksft_exit, ksft_run
from lib.py import ksft_eq, ksft_raises, KsftSkipEx, KsftFailEx
from lib.py import EthtoolFamily, NetdevFamily, NlError
from lib.py import NetDrvEnv
from lib.py import cmd, defer, ip
from lib.py import bkg, cmd, defer, ip
import errno
import glob
import os
import socket
import struct
import subprocess
def sys_get_queues(ifname, qtype='rx') -> int:
folders = glob.glob(f'/sys/class/net/{ifname}/queues/{qtype}-*')
@@ -32,32 +31,30 @@ def check_xdp(cfg, nl, xdp_queue_id=0) -> None:
elif xdp.ret > 0:
raise KsftFailEx('unable to create AF_XDP socket')
xdp = subprocess.Popen([cfg.rpath("xdp_helper"), f"{cfg.ifindex}", f"{xdp_queue_id}"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=1,
text=True)
defer(xdp.kill)
with bkg(f'{cfg.rpath("xdp_helper")} {cfg.ifindex} {xdp_queue_id}',
ksft_wait=3):
stdout, stderr = xdp.communicate(timeout=10)
rx = tx = False
rx = tx = False
queues = nl.queue_get({'ifindex': cfg.ifindex}, dump=True)
if not queues:
raise KsftSkipEx("Netlink reports no queues")
queues = nl.queue_get({'ifindex': cfg.ifindex}, dump=True)
if not queues:
raise KsftSkipEx("Netlink reports no queues")
for q in queues:
if q['id'] == 0:
if q['type'] == 'rx':
rx = True
if q['type'] == 'tx':
tx = True
for q in queues:
if q['id'] == 0:
if q['type'] == 'rx':
rx = True
if q['type'] == 'tx':
tx = True
ksft_eq(q['xsk'], {})
else:
if 'xsk' in q:
_fail("Check failed: xsk attribute set.")
ksft_eq(q['xsk'], {})
else:
if 'xsk' in q:
_fail("Check failed: xsk attribute set.")
ksft_eq(rx, True)
ksft_eq(tx, True)
ksft_eq(rx, True)
ksft_eq(tx, True)
def get_queues(cfg, nl) -> None:
snl = NetdevFamily(recv_size=4096)

View File

@@ -14,6 +14,54 @@
#define UMEM_SZ (1U << 16)
#define NUM_DESC (UMEM_SZ / 2048)
/* Move this to a common header when reused! */
static void ksft_ready(void)
{
const char msg[7] = "ready\n";
char *env_str;
int fd;
env_str = getenv("KSFT_READY_FD");
if (env_str) {
fd = atoi(env_str);
if (!fd) {
fprintf(stderr, "invalid KSFT_READY_FD = '%s'\n",
env_str);
return;
}
} else {
fd = STDOUT_FILENO;
}
write(fd, msg, sizeof(msg));
if (fd != STDOUT_FILENO)
close(fd);
}
static void ksft_wait(void)
{
char *env_str;
char byte;
int fd;
env_str = getenv("KSFT_WAIT_FD");
if (env_str) {
fd = atoi(env_str);
if (!fd) {
fprintf(stderr, "invalid KSFT_WAIT_FD = '%s'\n",
env_str);
return;
}
} else {
/* Not running in KSFT env, wait for input from STDIN instead */
fd = STDIN_FILENO;
}
read(fd, &byte, sizeof(byte));
if (fd != STDIN_FILENO)
close(fd);
}
/* this is a simple helper program that creates an XDP socket and does the
* minimum necessary to get bind() to succeed.
*
@@ -32,7 +80,6 @@ int main(int argc, char **argv)
int ifindex;
int sock_fd;
int queue;
char byte;
if (argc != 3) {
fprintf(stderr, "Usage: %s ifindex queue_id\n", argv[0]);
@@ -92,13 +139,12 @@ int main(int argc, char **argv)
return 1;
}
/* give the parent program some data when the socket is ready*/
fprintf(stdout, "%d\n", sock_fd);
ksft_ready();
ksft_wait();
/* parent program will write a byte to stdin when its ready for this
* helper to exit
*/
read(STDIN_FILENO, &byte, 1);
close(sock_fd);
return 0;

View File

@@ -2,8 +2,10 @@
import errno
import json as _json
import os
import random
import re
import select
import socket
import subprocess
import time
@@ -15,21 +17,56 @@ class CmdExitFailure(Exception):
self.cmd = cmd_obj
def fd_read_timeout(fd, timeout):
rlist, _, _ = select.select([fd], [], [], timeout)
if rlist:
return os.read(fd, 1024)
else:
raise TimeoutError("Timeout waiting for fd read")
class cmd:
def __init__(self, comm, shell=True, fail=True, ns=None, background=False, host=None, timeout=5):
"""
Execute a command on local or remote host.
Use bkg() instead to run a command in the background.
"""
def __init__(self, comm, shell=True, fail=True, ns=None, background=False,
host=None, timeout=5, ksft_wait=None):
if ns:
comm = f'ip netns exec {ns} ' + comm
self.stdout = None
self.stderr = None
self.ret = None
self.ksft_term_fd = None
self.comm = comm
if host:
self.proc = host.cmd(comm)
else:
# ksft_wait lets us wait for the background process to fully start,
# we pass an FD to the child process, and wait for it to write back.
# Similarly term_fd tells child it's time to exit.
pass_fds = ()
env = os.environ.copy()
if ksft_wait is not None:
rfd, ready_fd = os.pipe()
wait_fd, self.ksft_term_fd = os.pipe()
pass_fds = (ready_fd, wait_fd, )
env["KSFT_READY_FD"] = str(ready_fd)
env["KSFT_WAIT_FD"] = str(wait_fd)
self.proc = subprocess.Popen(comm, shell=shell, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stderr=subprocess.PIPE, pass_fds=pass_fds,
env=env)
if ksft_wait is not None:
os.close(ready_fd)
os.close(wait_fd)
msg = fd_read_timeout(rfd, ksft_wait)
os.close(rfd)
if not msg:
raise Exception("Did not receive ready message")
if not background:
self.process(terminate=False, fail=fail, timeout=timeout)
@@ -37,6 +74,8 @@ class cmd:
if fail is None:
fail = not terminate
if self.ksft_term_fd:
os.write(self.ksft_term_fd, b"1")
if terminate:
self.proc.terminate()
stdout, stderr = self.proc.communicate(timeout)
@@ -54,11 +93,30 @@ class cmd:
class bkg(cmd):
"""
Run a command in the background.
Examples usage:
Run a command on remote host, and wait for it to finish.
This is usually paired with wait_port_listen() to make sure
the command has initialized:
with bkg("socat ...", exit_wait=True, host=cfg.remote) as nc:
...
Run a command and expect it to let us know that it's ready
by writing to a special file descriptor passed via KSFT_READY_FD.
Command will be terminated when we exit the context manager:
with bkg("my_binary", ksft_wait=5):
"""
def __init__(self, comm, shell=True, fail=None, ns=None, host=None,
exit_wait=False):
exit_wait=False, ksft_wait=None):
super().__init__(comm, background=True,
shell=shell, fail=fail, ns=ns, host=host)
self.terminate = not exit_wait
shell=shell, fail=fail, ns=ns, host=host,
ksft_wait=ksft_wait)
self.terminate = not exit_wait and not ksft_wait
self.check_fail = fail
if shell and self.terminate: