@@ -14,7 +14,6 @@
import zipfile
from .asset import Asset
-from .cmd import run_cmd
def tar_extract(archive, dest_dir, member=None):
@@ -52,9 +51,11 @@ def deb_extract(archive, dest_dir, member=None):
cwd = os.getcwd()
os.chdir(dest_dir)
try:
- (stdout, stderr, ret) = run_cmd(['ar', 't', archive])
- file_path = stdout.split()[2]
- run_cmd(['ar', 'x', archive, file_path])
+ proc = run(['ar', 't', archive],
+ check=True, capture_output=True, encoding='utf8')
+ file_path = proc.stdout.split()[2]
+ check_call(['ar', 'x', archive, file_path],
+ stdout=DEVNULL, stderr=DEVNULL)
tar_extract(file_path, dest_dir, member)
finally:
os.chdir(cwd)
@@ -6,18 +6,18 @@
# later. See the COPYING file in the top-level directory.
import logging
+from subprocess import run
-from . import run_cmd
def tesseract_ocr(image_path, tesseract_args=''):
console_logger = logging.getLogger('console')
console_logger.debug(image_path)
- (stdout, stderr, ret) = run_cmd(['tesseract', image_path,
- 'stdout'])
- if ret:
+ proc = run(['tesseract', image_path, 'stdout'],
+ capture_output=True, encoding='utf8')
+ if proc.returncode:
return None
lines = []
- for line in stdout.split('\n'):
+ for line in proc.stdout.split('\n'):
sline = line.strip()
if len(sline):
console_logger.debug(sline)
@@ -16,7 +16,7 @@
from pathlib import Path
import pycotap
import shutil
-import subprocess
+from subprocess import run
import sys
import tempfile
import unittest
@@ -27,7 +27,6 @@
from .archive import archive_extract
from .asset import Asset
-from .cmd import run_cmd
from .config import BUILD_DIR
from .uncompress import uncompress
@@ -251,11 +250,11 @@ def add_ldpath(self, ldpath):
self._ldpath.append(os.path.abspath(ldpath))
def run_cmd(self, bin_path, args=[]):
- return subprocess.run([self.qemu_bin]
- + ["-L %s" % ldpath for ldpath in self._ldpath]
- + [bin_path]
- + args,
- text=True, capture_output=True)
+ return run([self.qemu_bin]
+ + ["-L %s" % ldpath for ldpath in self._ldpath]
+ + [bin_path]
+ + args,
+ text=True, capture_output=True)
class QemuSystemTest(QemuBaseTest):
"""Facilitates system emulation tests."""
@@ -282,7 +281,9 @@ def setUp(self):
def set_machine(self, machinename):
# TODO: We should use QMP to get the list of available machines
if not self._machinehelp:
- self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
+ self._machinehelp = run(
+ [self.qemu_bin, '-M', 'help'],
+ capture_output=True, check=True, encoding='utf8').stdout
if self._machinehelp.find(machinename) < 0:
self.skipTest('no support for machine ' + machinename)
self.machine = machinename
@@ -310,15 +311,17 @@ def require_accelerator(self, accelerator):
"available" % accelerator)
def require_netdev(self, netdevname):
- netdevhelp = run_cmd([self.qemu_bin,
- '-M', 'none', '-netdev', 'help'])[0];
- if netdevhelp.find('\n' + netdevname + '\n') < 0:
+ help = run([self.qemu_bin,
+ '-M', 'none', '-netdev', 'help'],
+ capture_output=True, check=True, encoding='utf8').stdout;
+ if help.find('\n' + netdevname + '\n') < 0:
self.skipTest('no support for " + netdevname + " networking')
def require_device(self, devicename):
- devhelp = run_cmd([self.qemu_bin,
- '-M', 'none', '-device', 'help'])[0];
- if devhelp.find(devicename) < 0:
+ help = run([self.qemu_bin,
+ '-M', 'none', '-device', 'help'],
+ capture_output=True, check=True, encoding='utf8').stdout;
+ if help.find(devicename) < 0:
self.skipTest('no support for device ' + devicename)
def _new_vm(self, name, *args):
@@ -11,11 +11,12 @@
import os
import stat
+from subprocess import check_call, DEVNULL
from qemu_test import QemuSystemTest
from qemu_test import exec_command_and_wait_for_pattern
from qemu_test import wait_for_console_pattern
-from qemu_test import which, run_cmd, get_qemu_img
+from qemu_test import which, get_qemu_img
class TuxRunBaselineTest(QemuSystemTest):
@@ -76,8 +77,9 @@ def fetch_tuxrun_assets(self, kernel_asset, rootfs_asset, dtb_asset=None):
disk_image = self.scratch_file("rootfs.ext4")
- run_cmd(['zstd', "-f", "-d", disk_image_zst,
- "-o", disk_image])
+ check_call(['zstd', "-f", "-d", disk_image_zst,
+ "-o", disk_image],
+ stdout=DEVNULL, stderr=DEVNULL)
# zstd copies source archive permissions for the output
# file, so must make this writable for QEMU
os.chmod(disk_image, stat.S_IRUSR | stat.S_IWUSR)
@@ -12,10 +12,11 @@
import time
import logging
+from subprocess import check_call, DEVNULL
from qemu_test import QemuSystemTest, Asset
from qemu_test import exec_command, wait_for_console_pattern
-from qemu_test import get_qemu_img, run_cmd
+from qemu_test import get_qemu_img
class Aarch64VirtMachine(QemuSystemTest):
@@ -96,7 +97,8 @@ def common_aarch64_virt(self, machine):
logger.info('creating scratch qcow2 image')
image_path = self.scratch_file('scratch.qcow2')
qemu_img = get_qemu_img(self)
- run_cmd([qemu_img, 'create', '-f', 'qcow2', image_path, '8M'])
+ check_call([qemu_img, 'create', '-f', 'qcow2', image_path, '8M'],
+ stdout=DEVNULL, stderr=DEVNULL)
# Add the device
self.vm.add_args('-blockdev',
@@ -11,9 +11,10 @@
#
# SPDX-License-Identifier: GPL-2.0-or-later
+from subprocess import check_call, DEVNULL
import tempfile
-from qemu_test import run_cmd, Asset
+from qemu_test import Asset
from qemu_test.tuxruntest import TuxRunBaselineTest
class TuxRunPPC64Test(TuxRunBaselineTest):
@@ -70,7 +71,9 @@ def ppc64_common_tuxrun(self, kernel_asset, rootfs_asset, prefix):
# Create a temporary qcow2 and launch the test-case
with tempfile.NamedTemporaryFile(prefix=prefix,
suffix='.qcow2') as qcow2:
- run_cmd([self.qemu_img, 'create', '-f', 'qcow2', qcow2.name, ' 1G'])
+ check_call([self.qemu_img, 'create', '-f', 'qcow2',
+ qcow2.name, ' 1G'],
+ stdout=DEVNULL, stderr=DEVNULL)
self.vm.add_args('-drive', 'file=' + qcow2.name +
',format=qcow2,if=none,id='