367 lines
15 KiB
Python
367 lines
15 KiB
Python
|
#!/usr/bin/env python2
|
||
|
# Copyright 2015 The Chromium OS Authors. All rights reserved.
|
||
|
# Use of this source code is governed by a BSD-style license that can be
|
||
|
# found in the LICENSE file.
|
||
|
|
||
|
"""Unit tests for the EC-3PO interpreter."""
|
||
|
|
||
|
from __future__ import print_function
|
||
|
|
||
|
# pylint: disable=cros-logging-import
|
||
|
import logging
|
||
|
import mock
|
||
|
import tempfile
|
||
|
import unittest
|
||
|
|
||
|
import interpreter
|
||
|
import threadproc_shim
|
||
|
|
||
|
class TestEnhancedECBehaviour(unittest.TestCase):
|
||
|
"""Test case to verify all enhanced EC interpretation tasks."""
|
||
|
def setUp(self):
|
||
|
"""Setup the test harness."""
|
||
|
# Setup logging with a timestamp, the module, and the log level.
|
||
|
logging.basicConfig(level=logging.DEBUG,
|
||
|
format=('%(asctime)s - %(module)s -'
|
||
|
' %(levelname)s - %(message)s'))
|
||
|
|
||
|
# Create a tempfile that would represent the EC UART PTY.
|
||
|
self.tempfile = tempfile.NamedTemporaryFile()
|
||
|
|
||
|
# Create the pipes that the interpreter will use.
|
||
|
self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe()
|
||
|
self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False)
|
||
|
|
||
|
# Mock the open() function so we can inspect reads/writes to the EC.
|
||
|
self.ec_uart_pty = mock.mock_open()
|
||
|
with mock.patch('__builtin__.open', self.ec_uart_pty):
|
||
|
# Create an interpreter.
|
||
|
self.itpr = interpreter.Interpreter(self.tempfile.name,
|
||
|
self.cmd_pipe_itpr,
|
||
|
self.dbg_pipe_itpr,
|
||
|
log_level=logging.DEBUG,
|
||
|
name="EC")
|
||
|
|
||
|
@mock.patch('interpreter.os')
|
||
|
def test_HandlingCommandsThatProduceNoOutput(self, mock_os):
|
||
|
"""Verify that the Interpreter correctly handles non-output commands.
|
||
|
|
||
|
Args:
|
||
|
mock_os: MagicMock object replacing the 'os' module for this test
|
||
|
case.
|
||
|
"""
|
||
|
# The interpreter init should open the EC UART PTY.
|
||
|
expected_ec_calls = [mock.call(self.tempfile.name, 'a+')]
|
||
|
# Have a command come in the command pipe. The first command will be an
|
||
|
# interrogation to determine if the EC is enhanced or not.
|
||
|
self.cmd_pipe_user.send(interpreter.EC_SYN)
|
||
|
self.itpr.HandleUserData()
|
||
|
# At this point, the command should be queued up waiting to be sent, so
|
||
|
# let's actually send it to the EC.
|
||
|
self.itpr.SendCmdToEC()
|
||
|
expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN),
|
||
|
mock.call().flush()])
|
||
|
# Now, assume that the EC sends only 1 response back of EC_ACK.
|
||
|
mock_os.read.side_effect = [interpreter.EC_ACK]
|
||
|
# When reading the EC, the interpreter will call file.fileno() to pass to
|
||
|
# os.read().
|
||
|
expected_ec_calls.append(mock.call().fileno())
|
||
|
# Simulate the response.
|
||
|
self.itpr.HandleECData()
|
||
|
|
||
|
# Now that the interrogation was complete, it's time to send down the real
|
||
|
# command.
|
||
|
test_cmd = 'chan save'
|
||
|
# Send the test command down the pipe.
|
||
|
self.cmd_pipe_user.send(test_cmd)
|
||
|
self.itpr.HandleUserData()
|
||
|
self.itpr.SendCmdToEC()
|
||
|
# Since the EC image is enhanced, we should have sent a packed command.
|
||
|
expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd)))
|
||
|
expected_ec_calls.append(mock.call().flush())
|
||
|
|
||
|
# Now that the first command was sent, we should send another command which
|
||
|
# produces no output. The console would send another interrogation.
|
||
|
self.cmd_pipe_user.send(interpreter.EC_SYN)
|
||
|
self.itpr.HandleUserData()
|
||
|
self.itpr.SendCmdToEC()
|
||
|
expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN),
|
||
|
mock.call().flush()])
|
||
|
# Again, assume that the EC sends only 1 response back of EC_ACK.
|
||
|
mock_os.read.side_effect = [interpreter.EC_ACK]
|
||
|
# When reading the EC, the interpreter will call file.fileno() to pass to
|
||
|
# os.read().
|
||
|
expected_ec_calls.append(mock.call().fileno())
|
||
|
# Simulate the response.
|
||
|
self.itpr.HandleECData()
|
||
|
|
||
|
# Now send the second test command.
|
||
|
test_cmd = 'chan 0'
|
||
|
self.cmd_pipe_user.send(test_cmd)
|
||
|
self.itpr.HandleUserData()
|
||
|
self.itpr.SendCmdToEC()
|
||
|
# Since the EC image is enhanced, we should have sent a packed command.
|
||
|
expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd)))
|
||
|
expected_ec_calls.append(mock.call().flush())
|
||
|
|
||
|
# Finally, verify that the appropriate writes were actually sent to the EC.
|
||
|
self.ec_uart_pty.assert_has_calls(expected_ec_calls)
|
||
|
|
||
|
@mock.patch('interpreter.os')
|
||
|
def test_CommandRetryingOnError(self, mock_os):
|
||
|
"""Verify that commands are retried if an error is encountered.
|
||
|
|
||
|
Args:
|
||
|
mock_os: MagicMock object replacing the 'os' module for this test
|
||
|
case.
|
||
|
"""
|
||
|
# The interpreter init should open the EC UART PTY.
|
||
|
expected_ec_calls = [mock.call(self.tempfile.name, 'a+')]
|
||
|
# Have a command come in the command pipe. The first command will be an
|
||
|
# interrogation to determine if the EC is enhanced or not.
|
||
|
self.cmd_pipe_user.send(interpreter.EC_SYN)
|
||
|
self.itpr.HandleUserData()
|
||
|
# At this point, the command should be queued up waiting to be sent, so
|
||
|
# let's actually send it to the EC.
|
||
|
self.itpr.SendCmdToEC()
|
||
|
expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN),
|
||
|
mock.call().flush()])
|
||
|
# Now, assume that the EC sends only 1 response back of EC_ACK.
|
||
|
mock_os.read.side_effect = [interpreter.EC_ACK]
|
||
|
# When reading the EC, the interpreter will call file.fileno() to pass to
|
||
|
# os.read().
|
||
|
expected_ec_calls.append(mock.call().fileno())
|
||
|
# Simulate the response.
|
||
|
self.itpr.HandleECData()
|
||
|
|
||
|
# Let's send a command that is received on the EC-side with an error.
|
||
|
test_cmd = 'accelinfo'
|
||
|
self.cmd_pipe_user.send(test_cmd)
|
||
|
self.itpr.HandleUserData()
|
||
|
self.itpr.SendCmdToEC()
|
||
|
packed_cmd = self.itpr.PackCommand(test_cmd)
|
||
|
expected_ec_calls.extend([mock.call().write(packed_cmd),
|
||
|
mock.call().flush()])
|
||
|
# Have the EC return the error string twice.
|
||
|
mock_os.read.side_effect = ['&&EE', '&&EE']
|
||
|
for i in range(2):
|
||
|
# When reading the EC, the interpreter will call file.fileno() to pass to
|
||
|
# os.read().
|
||
|
expected_ec_calls.append(mock.call().fileno())
|
||
|
# Simulate the response.
|
||
|
self.itpr.HandleECData()
|
||
|
|
||
|
# Since an error was received, the EC should attempt to retry the command.
|
||
|
expected_ec_calls.extend([mock.call().write(packed_cmd),
|
||
|
mock.call().flush()])
|
||
|
# Verify that the retry count was decremented.
|
||
|
self.assertEqual(interpreter.COMMAND_RETRIES-i-1, self.itpr.cmd_retries,
|
||
|
'Unexpected cmd_remaining count.')
|
||
|
# Actually retry the command.
|
||
|
self.itpr.SendCmdToEC()
|
||
|
|
||
|
# Now assume that the last one goes through with no trouble.
|
||
|
expected_ec_calls.extend([mock.call().write(packed_cmd),
|
||
|
mock.call().flush()])
|
||
|
self.itpr.SendCmdToEC()
|
||
|
|
||
|
# Verify all the calls.
|
||
|
self.ec_uart_pty.assert_has_calls(expected_ec_calls)
|
||
|
|
||
|
def test_PackCommandsForEnhancedEC(self):
|
||
|
"""Verify that the interpreter packs commands for enhanced EC images."""
|
||
|
# Assume current EC image is enhanced.
|
||
|
self.itpr.enhanced_ec = True
|
||
|
# Receive a command from the user.
|
||
|
test_cmd = 'gettime'
|
||
|
self.cmd_pipe_user.send(test_cmd)
|
||
|
# Mock out PackCommand to see if it was called.
|
||
|
self.itpr.PackCommand = mock.MagicMock()
|
||
|
# Have the interpreter handle the command.
|
||
|
self.itpr.HandleUserData()
|
||
|
# Verify that PackCommand() was called.
|
||
|
self.itpr.PackCommand.assert_called_once_with(test_cmd)
|
||
|
|
||
|
def test_DontPackCommandsForNonEnhancedEC(self):
|
||
|
"""Verify the interpreter doesn't pack commands for non-enhanced images."""
|
||
|
# Assume current EC image is not enhanced.
|
||
|
self.itpr.enhanced_ec = False
|
||
|
# Receive a command from the user.
|
||
|
test_cmd = 'gettime'
|
||
|
self.cmd_pipe_user.send(test_cmd)
|
||
|
# Mock out PackCommand to see if it was called.
|
||
|
self.itpr.PackCommand = mock.MagicMock()
|
||
|
# Have the interpreter handle the command.
|
||
|
self.itpr.HandleUserData()
|
||
|
# Verify that PackCommand() was called.
|
||
|
self.itpr.PackCommand.assert_not_called()
|
||
|
|
||
|
@mock.patch('interpreter.os')
|
||
|
def test_KeepingTrackOfInterrogation(self, mock_os):
|
||
|
"""Verify that the interpreter can track the state of the interrogation.
|
||
|
|
||
|
Args:
|
||
|
mock_os: MagicMock object replacing the 'os' module. for this test
|
||
|
case.
|
||
|
"""
|
||
|
# Upon init, the interpreter should assume that the current EC image is not
|
||
|
# enhanced.
|
||
|
self.assertFalse(self.itpr.enhanced_ec, msg=('State of enhanced_ec upon'
|
||
|
' init is not False.'))
|
||
|
|
||
|
# Assume an interrogation request comes in from the user.
|
||
|
self.cmd_pipe_user.send(interpreter.EC_SYN)
|
||
|
self.itpr.HandleUserData()
|
||
|
|
||
|
# Verify the state is now within an interrogation.
|
||
|
self.assertTrue(self.itpr.interrogating, 'interrogating should be True')
|
||
|
# The state of enhanced_ec should not be changed yet because we haven't
|
||
|
# received a valid response yet.
|
||
|
self.assertFalse(self.itpr.enhanced_ec, msg=('State of enhanced_ec is '
|
||
|
'not False.'))
|
||
|
|
||
|
# Assume that the EC responds with an EC_ACK.
|
||
|
mock_os.read.side_effect = [interpreter.EC_ACK]
|
||
|
self.itpr.HandleECData()
|
||
|
|
||
|
# Now, the interrogation should be complete and we should know that the
|
||
|
# current EC image is enhanced.
|
||
|
self.assertFalse(self.itpr.interrogating, msg=('interrogating should be '
|
||
|
'False'))
|
||
|
self.assertTrue(self.itpr.enhanced_ec, msg='enhanced_ec sholud be True')
|
||
|
|
||
|
# Now let's perform another interrogation, but pretend that the EC ignores
|
||
|
# it.
|
||
|
self.cmd_pipe_user.send(interpreter.EC_SYN)
|
||
|
self.itpr.HandleUserData()
|
||
|
|
||
|
# Verify interrogating state.
|
||
|
self.assertTrue(self.itpr.interrogating, 'interrogating sholud be True')
|
||
|
# We should assume that the image is not enhanced until we get the valid
|
||
|
# response.
|
||
|
self.assertFalse(self.itpr.enhanced_ec, 'enhanced_ec should be False now.')
|
||
|
|
||
|
# Let's pretend that we get a random debug print. This should clear the
|
||
|
# interrogating flag.
|
||
|
mock_os.read.side_effect = '[1660.593076 HC 0x103]'
|
||
|
self.itpr.HandleECData()
|
||
|
|
||
|
# Verify that interrogating flag is cleared and enhanced_ec is still False.
|
||
|
self.assertFalse(self.itpr.interrogating, 'interrogating should be False.')
|
||
|
self.assertFalse(self.itpr.enhanced_ec,
|
||
|
'enhanced_ec should still be False.')
|
||
|
|
||
|
|
||
|
class TestUARTDisconnection(unittest.TestCase):
|
||
|
"""Test case to verify interpreter disconnection/reconnection."""
|
||
|
def setUp(self):
|
||
|
"""Setup the test harness."""
|
||
|
# Setup logging with a timestamp, the module, and the log level.
|
||
|
logging.basicConfig(level=logging.DEBUG,
|
||
|
format=('%(asctime)s - %(module)s -'
|
||
|
' %(levelname)s - %(message)s'))
|
||
|
|
||
|
# Create a tempfile that would represent the EC UART PTY.
|
||
|
self.tempfile = tempfile.NamedTemporaryFile()
|
||
|
|
||
|
# Create the pipes that the interpreter will use.
|
||
|
self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe()
|
||
|
self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False)
|
||
|
|
||
|
# Mock the open() function so we can inspect reads/writes to the EC.
|
||
|
self.ec_uart_pty = mock.mock_open()
|
||
|
with mock.patch('__builtin__.open', self.ec_uart_pty):
|
||
|
# Create an interpreter.
|
||
|
self.itpr = interpreter.Interpreter(self.tempfile.name,
|
||
|
self.cmd_pipe_itpr,
|
||
|
self.dbg_pipe_itpr,
|
||
|
log_level=logging.DEBUG,
|
||
|
name="EC")
|
||
|
|
||
|
# First, check that interpreter is initialized to connected.
|
||
|
self.assertTrue(self.itpr.connected, ('The interpreter should be'
|
||
|
' initialized in a connected state'))
|
||
|
|
||
|
def test_DisconnectStopsECTraffic(self):
|
||
|
"""Verify that when in disconnected state, no debug prints are sent."""
|
||
|
# Let's send a disconnect command through the command pipe.
|
||
|
self.cmd_pipe_user.send('disconnect')
|
||
|
self.itpr.HandleUserData()
|
||
|
|
||
|
# Verify interpreter is disconnected from EC.
|
||
|
self.assertFalse(self.itpr.connected, ('The interpreter should be'
|
||
|
'disconnected.'))
|
||
|
# Verify that the EC UART is no longer a member of the inputs. The
|
||
|
# interpreter will never pull data from the EC if it's not a member of the
|
||
|
# inputs list.
|
||
|
self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
|
||
|
|
||
|
def test_CommandsDroppedWhenDisconnected(self):
|
||
|
"""Verify that when in disconnected state, commands are dropped."""
|
||
|
# Send a command, followed by 'disconnect'.
|
||
|
self.cmd_pipe_user.send('taskinfo')
|
||
|
self.itpr.HandleUserData()
|
||
|
self.cmd_pipe_user.send('disconnect')
|
||
|
self.itpr.HandleUserData()
|
||
|
|
||
|
# Verify interpreter is disconnected from EC.
|
||
|
self.assertFalse(self.itpr.connected, ('The interpreter should be'
|
||
|
'disconnected.'))
|
||
|
# Verify that the EC UART is no longer a member of the inputs nor outputs.
|
||
|
self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
|
||
|
self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
|
||
|
|
||
|
# Have the user send a few more commands in the disconnected state.
|
||
|
command = 'help\n'
|
||
|
for char in command:
|
||
|
self.cmd_pipe_user.send(char)
|
||
|
self.itpr.HandleUserData()
|
||
|
|
||
|
# The command queue should be empty.
|
||
|
self.assertEqual(0, self.itpr.ec_cmd_queue.qsize())
|
||
|
|
||
|
# Now send the reconnect command.
|
||
|
self.cmd_pipe_user.send('reconnect')
|
||
|
with mock.patch('__builtin__.open', mock.mock_open()):
|
||
|
self.itpr.HandleUserData()
|
||
|
|
||
|
# Verify interpreter is connected.
|
||
|
self.assertTrue(self.itpr.connected)
|
||
|
# Verify that EC UART is a member of the inputs.
|
||
|
self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs)
|
||
|
# Since no command was sent after reconnection, verify that the EC UART is
|
||
|
# not a member of the outputs.
|
||
|
self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
|
||
|
|
||
|
def test_ReconnectAllowsECTraffic(self):
|
||
|
"""Verify that when connected, EC UART traffic is allowed."""
|
||
|
# Let's send a disconnect command through the command pipe.
|
||
|
self.cmd_pipe_user.send('disconnect')
|
||
|
self.itpr.HandleUserData()
|
||
|
|
||
|
# Verify interpreter is disconnected.
|
||
|
self.assertFalse(self.itpr.connected, ('The interpreter should be'
|
||
|
'disconnected.'))
|
||
|
# Verify that the EC UART is no longer a member of the inputs nor outputs.
|
||
|
self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
|
||
|
self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
|
||
|
|
||
|
# Issue reconnect command through the command pipe.
|
||
|
self.cmd_pipe_user.send('reconnect')
|
||
|
with mock.patch('__builtin__.open', mock.mock_open()):
|
||
|
self.itpr.HandleUserData()
|
||
|
|
||
|
# Verify interpreter is connected.
|
||
|
self.assertTrue(self.itpr.connected, ('The interpreter should be'
|
||
|
'connected.'))
|
||
|
# Verify that the EC UART is now a member of the inputs.
|
||
|
self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs)
|
||
|
# Since we have issued no commands during the disconnected state, no
|
||
|
# commands are pending and therefore the PTY should not be added to the
|
||
|
# outputs.
|
||
|
self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
unittest.main()
|