541 lines
17 KiB
Python
541 lines
17 KiB
Python
|
#!/usr/bin/env python2
|
||
|
# -*- coding: utf-8 -*-
|
||
|
# Copyright 2019 The Chromium OS Authors. All rights reserved.
|
||
|
# Use of this source code is governed by a BSD-style license that can be
|
||
|
# found in the LICENSE file.
|
||
|
|
||
|
"""ChromeOS Uart Stress Test
|
||
|
|
||
|
This tester runs the command 'chargen' on EC and/or AP, captures the
|
||
|
output, and compares it against the expected output to check any characters
|
||
|
lost.
|
||
|
|
||
|
Prerequisite:
|
||
|
(1) This test needs PySerial. Please check if it is available before test.
|
||
|
Can be installed by 'pip install pyserial'
|
||
|
(2) If servod is running, turn uart_timestamp off before running this test.
|
||
|
e.g. dut-control cr50_uart_timestamp:off
|
||
|
"""
|
||
|
|
||
|
from __future__ import print_function
|
||
|
from chromite.lib import cros_logging as logging
|
||
|
|
||
|
import argparse
|
||
|
import atexit
|
||
|
import serial
|
||
|
import os
|
||
|
import stat
|
||
|
import sys
|
||
|
import threading
|
||
|
import time
|
||
|
|
||
|
|
||
|
BAUDRATE = 115200 # Default baudrate setting for UART port
|
||
|
CROS_USERNAME = 'root' # Account name to login to ChromeOS
|
||
|
CROS_PASSWORD = 'test0000' # Password to login to ChromeOS
|
||
|
CHARGEN_TXT = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
|
||
|
# The result of 'chargen 62 62'
|
||
|
CHARGEN_TXT_LEN = len(CHARGEN_TXT)
|
||
|
CR = '\r' # Carriage Return
|
||
|
LF = '\n' # Line Feed
|
||
|
CRLF = CR + LF
|
||
|
FLAG_FILENAME = '/tmp/chargen_testing'
|
||
|
TPM_CMD = ('trunks_client --key_create --rsa=2048 --usage=sign'
|
||
|
' --key_blob=/tmp/blob &> /dev/null')
|
||
|
# A ChromeOS TPM command for the cr50 stress
|
||
|
# purpose.
|
||
|
CR50_LOAD_GEN_CMD = ('while [[ -f %s ]]; do %s; done &'
|
||
|
% (FLAG_FILENAME, TPM_CMD))
|
||
|
# A command line to run TPM_CMD in background
|
||
|
# infinitely.
|
||
|
|
||
|
|
||
|
class ChargenTestError(Exception):
|
||
|
"""Exception for Uart Stress Test Error"""
|
||
|
pass
|
||
|
|
||
|
|
||
|
class UartSerial(object):
|
||
|
"""Test Object for a single UART serial device
|
||
|
|
||
|
Attributes
|
||
|
UART_DEV_PROFILES
|
||
|
"""
|
||
|
UART_DEV_PROFILES = (
|
||
|
# Kernel
|
||
|
{
|
||
|
'prompt':'localhost login:',
|
||
|
'device_type':'AP',
|
||
|
'prepare_cmd':[
|
||
|
CROS_USERNAME, # Login
|
||
|
CROS_PASSWORD, # Password
|
||
|
'dmesg -D', # Disable console message
|
||
|
'touch ' + FLAG_FILENAME, # Create a temp file
|
||
|
],
|
||
|
'cleanup_cmd':[
|
||
|
'rm -f ' + FLAG_FILENAME, # Remove the temp file
|
||
|
'dmesg -E', # Enable console message
|
||
|
'logout', # Logout
|
||
|
],
|
||
|
'end_of_input':LF,
|
||
|
},
|
||
|
# EC
|
||
|
{
|
||
|
'prompt':'> ',
|
||
|
'device_type':'EC',
|
||
|
'prepare_cmd':[
|
||
|
'chan save',
|
||
|
'chan 0' # Disable console message
|
||
|
],
|
||
|
'cleanup_cmd':['', 'chan restore'],
|
||
|
'end_of_input':CRLF,
|
||
|
},
|
||
|
)
|
||
|
|
||
|
def __init__(self, port, duration, timeout=1,
|
||
|
baudrate=BAUDRATE, cr50_workload=False):
|
||
|
"""Initialize UartSerial
|
||
|
|
||
|
Args:
|
||
|
port: UART device path. e.g. /dev/ttyUSB0
|
||
|
duration: Time to test, in seconds
|
||
|
timeout: Read timeout value.
|
||
|
baudrate: Baud rate such as 9600 or 115200.
|
||
|
cr50_workload: True if a workload should be generated on cr50
|
||
|
|
||
|
Attributes:
|
||
|
char_loss_occurrences: Number that character loss happens
|
||
|
cleanup_cli: Command list to perform before the test exits
|
||
|
cr50_workload: True if cr50 should be stressed, or False otherwise
|
||
|
dev_prof: Dictionary of device profile
|
||
|
duration: Time to keep chargen running
|
||
|
eol: Characters to add at the end of input
|
||
|
logger: object that store the log
|
||
|
num_ch_exp: Expected number of characters in output
|
||
|
num_ch_cap: Number of captured characters in output
|
||
|
test_cli: Command list to run for chargen test
|
||
|
test_thread: Thread object that captures the UART output
|
||
|
serial: serial.Serial object
|
||
|
"""
|
||
|
|
||
|
# Initialize serial object
|
||
|
self.serial = serial.Serial()
|
||
|
self.serial.port = port
|
||
|
self.serial.timeout = timeout
|
||
|
self.serial.baudrate = baudrate
|
||
|
|
||
|
self.duration = duration
|
||
|
self.cr50_workload = cr50_workload
|
||
|
|
||
|
self.logger = logging.getLogger(type(self).__name__ + '| ' + port)
|
||
|
self.test_thread = threading.Thread(target=self.stress_test_thread)
|
||
|
|
||
|
self.dev_prof = {}
|
||
|
self.cleanup_cli = []
|
||
|
self.test_cli = []
|
||
|
self.eol = CRLF
|
||
|
self.num_ch_exp = 0
|
||
|
self.num_ch_cap = 0
|
||
|
self.char_loss_occurrences = 0
|
||
|
atexit.register(self.cleanup)
|
||
|
|
||
|
def run_command(self, command_lines, delay=0):
|
||
|
"""Run command(s) at UART prompt
|
||
|
|
||
|
Args:
|
||
|
command_lines: list of commands to run.
|
||
|
delay: delay after a command in second
|
||
|
"""
|
||
|
for cli in command_lines:
|
||
|
self.logger.debug('run %r', cli)
|
||
|
|
||
|
self.serial.write(cli + self.eol)
|
||
|
self.serial.flush()
|
||
|
if delay:
|
||
|
time.sleep(delay)
|
||
|
|
||
|
def cleanup(self):
|
||
|
"""Before termination, clean up the UART device."""
|
||
|
self.logger.debug('Closing...')
|
||
|
|
||
|
self.serial.open()
|
||
|
self.run_command(self.cleanup_cli) # Run cleanup commands
|
||
|
self.serial.close()
|
||
|
|
||
|
self.logger.debug('Cleanup done')
|
||
|
|
||
|
def get_output(self):
|
||
|
"""Capture the UART output
|
||
|
|
||
|
Args:
|
||
|
stop_char: Read output buffer until it reads stop_char.
|
||
|
|
||
|
Returns:
|
||
|
text from UART output.
|
||
|
"""
|
||
|
if self.serial.inWaiting() == 0:
|
||
|
time.sleep(1)
|
||
|
|
||
|
return self.serial.read(self.serial.inWaiting())
|
||
|
|
||
|
def prepare(self):
|
||
|
"""Prepare the test:
|
||
|
|
||
|
Identify the type of UART device (EC or Kernel?), then
|
||
|
decide what kind of commands to use to generate stress loads.
|
||
|
|
||
|
Raises:
|
||
|
ChargenTestError if UART source can't be identified.
|
||
|
"""
|
||
|
try:
|
||
|
self.logger.info('Preparing...')
|
||
|
|
||
|
self.serial.open()
|
||
|
|
||
|
# Prepare the device for test
|
||
|
self.serial.flushInput()
|
||
|
self.serial.flushOutput()
|
||
|
|
||
|
self.get_output() # drain data
|
||
|
|
||
|
# Give a couple of line feeds, and capture the prompt text
|
||
|
self.run_command(['', ''])
|
||
|
prompt_txt = self.get_output()
|
||
|
|
||
|
# Detect the device source: EC or AP?
|
||
|
# Detect if the device is AP or EC console based on the captured.
|
||
|
for dev_prof in self.UART_DEV_PROFILES:
|
||
|
if dev_prof['prompt'] in prompt_txt:
|
||
|
self.dev_prof = dev_prof
|
||
|
break
|
||
|
else:
|
||
|
# No prompt patterns were found. UART seems not responding or in
|
||
|
# an undesirable status.
|
||
|
if prompt_txt:
|
||
|
raise ChargenTestError('%s: Got an unknown prompt text: %s\n'
|
||
|
'Check manually whether %s is available.' %
|
||
|
(self.serial.port, prompt_txt,
|
||
|
self.serial.port))
|
||
|
else:
|
||
|
raise ChargenTestError('%s: Got no input. Close any other connections'
|
||
|
' to this port, and try it again.' %
|
||
|
self.serial.port)
|
||
|
|
||
|
self.logger.info('Detected as %s UART', self.dev_prof['device_type'])
|
||
|
# Log displays the UART type (AP|EC) instead of device filename.
|
||
|
self.logger = logging.getLogger(type(self).__name__ + '| ' +
|
||
|
self.dev_prof['device_type'])
|
||
|
|
||
|
# Either login to AP or run some commands to prepare the device
|
||
|
# for test
|
||
|
self.eol = self.dev_prof['end_of_input']
|
||
|
self.run_command(self.dev_prof['prepare_cmd'], delay=2)
|
||
|
self.cleanup_cli += self.dev_prof['cleanup_cmd']
|
||
|
|
||
|
# Check whether the command 'chargen' is available in the device.
|
||
|
# 'chargen 1 4' is supposed to print '0000'
|
||
|
self.get_output() # drain data
|
||
|
self.run_command(['chargen 1 4'])
|
||
|
tmp_txt = self.get_output()
|
||
|
|
||
|
# Check whether chargen command is available.
|
||
|
if '0000' not in tmp_txt:
|
||
|
raise ChargenTestError('%s: Chargen got an unexpected result: %s' %
|
||
|
(self.dev_prof['device_type'], tmp_txt))
|
||
|
|
||
|
self.num_ch_exp = int(self.serial.baudrate * self.duration / 10)
|
||
|
self.test_cli = ['chargen %d %d' % (CHARGEN_TXT_LEN, self.num_ch_exp)]
|
||
|
|
||
|
self.logger.info('Ready to test')
|
||
|
finally:
|
||
|
self.serial.close()
|
||
|
|
||
|
def stress_test_thread(self):
|
||
|
"""Test thread
|
||
|
|
||
|
Raises:
|
||
|
ChargenTestError: if broken character is found.
|
||
|
"""
|
||
|
try:
|
||
|
self.serial.open()
|
||
|
self.serial.flushInput()
|
||
|
self.serial.flushOutput()
|
||
|
|
||
|
# Run TPM command in background to burden cr50.
|
||
|
if self.dev_prof['device_type'] == 'AP' and self.cr50_workload:
|
||
|
self.run_command([CR50_LOAD_GEN_CMD])
|
||
|
self.logger.debug('run TPM job while %s exists', FLAG_FILENAME)
|
||
|
|
||
|
# Run the command 'chargen', one time
|
||
|
self.get_output() # Drain the output
|
||
|
self.run_command(self.test_cli)
|
||
|
self.serial.readline() # Drain the echoed command line.
|
||
|
|
||
|
err_msg = '%s: Expected %r but got %s after %d char received'
|
||
|
|
||
|
# Keep capturing the output until the test timer is expired.
|
||
|
self.num_ch_cap = 0
|
||
|
self.char_loss_occurrences = 0
|
||
|
data_starve_count = 0
|
||
|
|
||
|
total_num_ch = self.num_ch_exp # Expected number of characters in total
|
||
|
ch_exp = CHARGEN_TXT[0]
|
||
|
ch_cap = 'z' # any character value is ok for loop initial condition.
|
||
|
while self.num_ch_cap < total_num_ch:
|
||
|
captured = self.get_output()
|
||
|
|
||
|
if captured:
|
||
|
# There is some output data. Reset the data starvation count.
|
||
|
data_starve_count = 0
|
||
|
else:
|
||
|
data_starve_count += 1
|
||
|
if data_starve_count > 1:
|
||
|
# If nothing was captured more than once, then terminate the test.
|
||
|
self.logger.debug('No more output')
|
||
|
break
|
||
|
|
||
|
for ch_cap in captured:
|
||
|
if ch_cap not in CHARGEN_TXT:
|
||
|
# If it is not alpha-numeric, terminate the test.
|
||
|
if ch_cap not in CRLF:
|
||
|
# If it is neither a CR nor LF, then it is an error case.
|
||
|
self.logger.error('Whole captured characters: %r', captured)
|
||
|
raise ChargenTestError(err_msg % ('Broken char captured', ch_exp,
|
||
|
hex(ord(ch_cap)),
|
||
|
self.num_ch_cap))
|
||
|
|
||
|
# Set the loop termination condition true.
|
||
|
total_num_ch = self.num_ch_cap
|
||
|
|
||
|
if self.num_ch_cap >= total_num_ch:
|
||
|
break
|
||
|
|
||
|
if ch_exp != ch_cap:
|
||
|
# If it is alpha-numeric but not continuous, then some characters
|
||
|
# are lost.
|
||
|
self.logger.error(err_msg, 'Char loss detected',
|
||
|
ch_exp, repr(ch_cap), self.num_ch_cap)
|
||
|
self.char_loss_occurrences += 1
|
||
|
|
||
|
# Recalculate the expected number of characters to adjust
|
||
|
# termination condition. The loss might be bigger than this
|
||
|
# adjustment, but it is okay since it will terminates by either
|
||
|
# CR/LF detection or by data starvation.
|
||
|
idx_ch_exp = CHARGEN_TXT.find(ch_exp)
|
||
|
idx_ch_cap = CHARGEN_TXT.find(ch_cap)
|
||
|
if idx_ch_cap < idx_ch_exp:
|
||
|
idx_ch_cap += len(CHARGEN_TXT)
|
||
|
total_num_ch -= (idx_ch_cap - idx_ch_exp)
|
||
|
|
||
|
self.num_ch_cap += 1
|
||
|
|
||
|
# Determine What character is expected next?
|
||
|
ch_exp = CHARGEN_TXT[(CHARGEN_TXT.find(ch_cap) + 1) % CHARGEN_TXT_LEN]
|
||
|
|
||
|
finally:
|
||
|
self.serial.close()
|
||
|
|
||
|
def start_test(self):
|
||
|
"""Start the test thread"""
|
||
|
self.logger.info('Test thread starts')
|
||
|
self.test_thread.start()
|
||
|
|
||
|
def wait_test_done(self):
|
||
|
"""Wait until the test thread get done and join"""
|
||
|
self.test_thread.join()
|
||
|
self.logger.info('Test thread is done')
|
||
|
|
||
|
def get_result(self):
|
||
|
"""Display the result
|
||
|
|
||
|
Returns:
|
||
|
Integer = the number of lost character
|
||
|
|
||
|
Raises:
|
||
|
ChargenTestError: if the capture is corrupted.
|
||
|
"""
|
||
|
# If more characters than expected are captured, it means some messages
|
||
|
# from other than chargen are mixed. Stop processing further.
|
||
|
if self.num_ch_exp < self.num_ch_cap:
|
||
|
raise ChargenTestError('%s: UART output is corrupted.' %
|
||
|
self.dev_prof['device_type'])
|
||
|
|
||
|
# Get the count difference between the expected to the captured
|
||
|
# as the number of lost character.
|
||
|
char_lost = self.num_ch_exp - self.num_ch_cap
|
||
|
self.logger.info('%8d char lost / %10d (%.1f %%)',
|
||
|
char_lost, self.num_ch_exp,
|
||
|
char_lost * 100.0 / self.num_ch_exp)
|
||
|
|
||
|
return char_lost, self.num_ch_exp, self.char_loss_occurrences
|
||
|
|
||
|
|
||
|
class ChargenTest(object):
|
||
|
"""UART stress tester
|
||
|
|
||
|
Attributes:
|
||
|
cr50_workload: True if cr50 should be stressed, or False otherwise
|
||
|
duration: Time to keep testing in seconds
|
||
|
logger: logging object
|
||
|
ports: List of Uart device filename
|
||
|
serials: Dictionary where key is filename of UART device, and the value is
|
||
|
UartSerial object
|
||
|
"""
|
||
|
|
||
|
def __init__(self, ports, duration, cr50_workload=False):
|
||
|
"""Initialize UART stress tester
|
||
|
|
||
|
Args:
|
||
|
ports: List of UART ports to test.
|
||
|
duration: Time to keep testing in seconds.
|
||
|
cr50_workload: True if a workload should be generated on cr50
|
||
|
|
||
|
Raises:
|
||
|
ChargenTestError: if any of ports is not a valid character device.
|
||
|
"""
|
||
|
|
||
|
# Save the arguments
|
||
|
self.ports = ports
|
||
|
for port in ports:
|
||
|
try:
|
||
|
mode = os.stat(port).st_mode
|
||
|
except OSError as e:
|
||
|
raise ChargenTestError(e)
|
||
|
if not stat.S_ISCHR(mode):
|
||
|
raise ChargenTestError('%s is not a character device.' % port)
|
||
|
|
||
|
if duration <= 0:
|
||
|
raise ChargenTestError('Input error: duration is not positive.')
|
||
|
self.duration = duration
|
||
|
|
||
|
self.cr50_workload = cr50_workload
|
||
|
|
||
|
# Initialize logging object
|
||
|
self.logger = logging.getLogger(type(self).__name__)
|
||
|
|
||
|
# Create an UartSerial object per UART port
|
||
|
self.serials = {} # UartSerial objects
|
||
|
for port in self.ports:
|
||
|
self.serials[port] = UartSerial(port=port, duration=self.duration,
|
||
|
cr50_workload=self.cr50_workload)
|
||
|
|
||
|
def prepare(self):
|
||
|
"""Prepare the test for each UART port"""
|
||
|
self.logger.info('Prepare ports for test')
|
||
|
for _, ser in self.serials.items():
|
||
|
ser.prepare()
|
||
|
self.logger.info('Ports are ready to test')
|
||
|
|
||
|
def print_result(self):
|
||
|
"""Display the test result for each UART port
|
||
|
|
||
|
Returns:
|
||
|
char_lost: Total number of characters lost
|
||
|
"""
|
||
|
char_lost = 0
|
||
|
for _, ser in self.serials.items():
|
||
|
(tmp_lost, _, _) = ser.get_result()
|
||
|
char_lost += tmp_lost
|
||
|
|
||
|
# If any characters are lost, then test fails.
|
||
|
msg = 'lost %d character(s) from the test' % char_lost
|
||
|
if char_lost > 0:
|
||
|
self.logger.error('FAIL: %s', msg)
|
||
|
else:
|
||
|
self.logger.info('PASS: %s', msg)
|
||
|
|
||
|
return char_lost
|
||
|
|
||
|
def run(self):
|
||
|
"""Run the stress test on UART port(s)
|
||
|
|
||
|
Raises:
|
||
|
ChargenTestError: If any characters are lost.
|
||
|
"""
|
||
|
|
||
|
# Detect UART source type, and decide which command to test.
|
||
|
self.prepare()
|
||
|
|
||
|
# Run the test on each UART port in thread.
|
||
|
self.logger.info('Test starts')
|
||
|
for _, ser in self.serials.items():
|
||
|
ser.start_test()
|
||
|
|
||
|
# Wait all tests to finish.
|
||
|
for _, ser in self.serials.items():
|
||
|
ser.wait_test_done()
|
||
|
|
||
|
# Print the result.
|
||
|
char_lost = self.print_result()
|
||
|
if char_lost:
|
||
|
raise ChargenTestError('Test failed: lost %d character(s)' %
|
||
|
char_lost)
|
||
|
|
||
|
self.logger.info('Test is done')
|
||
|
|
||
|
def parse_args(cmdline):
|
||
|
"""Parse command line arguments.
|
||
|
|
||
|
Args:
|
||
|
cmdline: list to be parsed
|
||
|
|
||
|
Returns:
|
||
|
tuple (options, args) where args is a list of cmdline arguments that the
|
||
|
parser was unable to match i.e. they're servod controls, not options.
|
||
|
"""
|
||
|
description = """%(prog)s repeats sending a uart console command
|
||
|
to each UART device for a given time, and check if output
|
||
|
has any missing characters.
|
||
|
|
||
|
Examples:
|
||
|
%(prog)s /dev/ttyUSB2 --time 3600
|
||
|
%(prog)s /dev/ttyUSB1 /dev/ttyUSB2 --debug
|
||
|
%(prog)s /dev/ttyUSB1 /dev/ttyUSB2 --cr50
|
||
|
"""
|
||
|
|
||
|
parser = argparse.ArgumentParser(description=description,
|
||
|
formatter_class=argparse.RawTextHelpFormatter
|
||
|
)
|
||
|
parser.add_argument('port', type=str, nargs="*",
|
||
|
help='UART device path to test')
|
||
|
parser.add_argument('-c', '--cr50', action='store_true', default=False,
|
||
|
help='generate TPM workload on cr50')
|
||
|
parser.add_argument('-d', '--debug', action='store_true', default=False,
|
||
|
help='enable debug messages')
|
||
|
parser.add_argument('-t', '--time', type=int,
|
||
|
help='Test duration in second', default=300)
|
||
|
return parser.parse_known_args(cmdline)
|
||
|
|
||
|
|
||
|
def main():
|
||
|
"""Main function wrapper"""
|
||
|
try:
|
||
|
(options, _) = parse_args(sys.argv[1:])
|
||
|
|
||
|
# Set Log format
|
||
|
log_format = '%(asctime)s %(levelname)-6s | %(name)-25s'
|
||
|
date_format = '%Y-%m-%d %H:%M:%S'
|
||
|
if options.debug:
|
||
|
log_format += ' | %(filename)s:%(lineno)4d:%(funcName)-18s'
|
||
|
loglevel = logging.DEBUG
|
||
|
else:
|
||
|
loglevel = logging.INFO
|
||
|
log_format += ' | %(message)s'
|
||
|
|
||
|
logging.basicConfig(level=loglevel, format=log_format,
|
||
|
datefmt=date_format)
|
||
|
|
||
|
# Create a ChargenTest object
|
||
|
utest = ChargenTest(options.port, options.time, options.cr50)
|
||
|
utest.run() # Run
|
||
|
|
||
|
except KeyboardInterrupt:
|
||
|
sys.exit(0)
|
||
|
|
||
|
except ChargenTestError as e:
|
||
|
logging.error(str(e))
|
||
|
sys.exit(1)
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
main()
|