summaryrefslogtreecommitdiff
path: root/packages/Python/lldbsuite/test_event
diff options
context:
space:
mode:
authorDimitry Andric <dim@FreeBSD.org>2019-08-20 18:01:57 +0000
committerDimitry Andric <dim@FreeBSD.org>2019-08-20 18:01:57 +0000
commit88c643b6fec27eec436c8d138fee6346e92337d6 (patch)
tree82cd13b2f3cde1c9e5f79689ba4e6ba67694843f /packages/Python/lldbsuite/test_event
parent94994d372d014ce4c8758b9605d63fae651bd8aa (diff)
Notes
Diffstat (limited to 'packages/Python/lldbsuite/test_event')
-rw-r--r--packages/Python/lldbsuite/test_event/__init__.py0
-rw-r--r--packages/Python/lldbsuite/test_event/build_exception.py16
-rw-r--r--packages/Python/lldbsuite/test_event/dotest_channels.py209
-rw-r--r--packages/Python/lldbsuite/test_event/event_builder.py482
-rw-r--r--packages/Python/lldbsuite/test_event/formatter/__init__.py163
-rw-r--r--packages/Python/lldbsuite/test_event/formatter/curses.py342
-rw-r--r--packages/Python/lldbsuite/test_event/formatter/dump_formatter.py23
-rw-r--r--packages/Python/lldbsuite/test_event/formatter/pickled.py80
-rw-r--r--packages/Python/lldbsuite/test_event/formatter/results_formatter.py766
-rw-r--r--packages/Python/lldbsuite/test_event/formatter/xunit.py596
-rw-r--r--packages/Python/lldbsuite/test_event/test/resources/invalid_decorator/TestInvalidDecorator.py13
-rw-r--r--packages/Python/lldbsuite/test_event/test/src/TestCatchInvalidDecorator.py70
-rw-r--r--packages/Python/lldbsuite/test_event/test/src/event_collector.py85
13 files changed, 0 insertions, 2845 deletions
diff --git a/packages/Python/lldbsuite/test_event/__init__.py b/packages/Python/lldbsuite/test_event/__init__.py
deleted file mode 100644
index e69de29bb2d1..000000000000
--- a/packages/Python/lldbsuite/test_event/__init__.py
+++ /dev/null
diff --git a/packages/Python/lldbsuite/test_event/build_exception.py b/packages/Python/lldbsuite/test_event/build_exception.py
deleted file mode 100644
index 5b00b92d4738..000000000000
--- a/packages/Python/lldbsuite/test_event/build_exception.py
+++ /dev/null
@@ -1,16 +0,0 @@
-class BuildError(Exception):
-
- def __init__(self, called_process_error):
- super(BuildError, self).__init__("Error when building test subject")
- self.command = called_process_error.lldb_extensions.get(
- "command", "<command unavailable>")
- self.build_error = called_process_error.lldb_extensions.get(
- "stderr_content", "<error output unavailable>")
-
- def __str__(self):
- return self.format_build_error(self.command, self.build_error)
-
- @staticmethod
- def format_build_error(command, command_output):
- return "Error when building test subject.\n\nBuild Command:\n{}\n\nBuild Command Output:\n{}".format(
- command, command_output)
diff --git a/packages/Python/lldbsuite/test_event/dotest_channels.py b/packages/Python/lldbsuite/test_event/dotest_channels.py
deleted file mode 100644
index 4f79193514bb..000000000000
--- a/packages/Python/lldbsuite/test_event/dotest_channels.py
+++ /dev/null
@@ -1,209 +0,0 @@
-"""
- The LLVM Compiler Infrastructure
-
-This file is distributed under the University of Illinois Open Source
-License. See LICENSE.TXT for details.
-
-Sync lldb and related source from a local machine to a remote machine.
-
-This facilitates working on the lldb sourcecode on multiple machines
-and multiple OS types, verifying changes across all.
-
-
-This module provides asyncore channels used within the LLDB test
-framework.
-"""
-
-from __future__ import print_function
-from __future__ import absolute_import
-
-
-# System modules
-import asyncore
-import socket
-
-# Third-party modules
-from six.moves import cPickle
-
-# LLDB modules
-
-
-class UnpicklingForwardingReaderChannel(asyncore.dispatcher):
- """Provides an unpickling, forwarding asyncore dispatch channel reader.
-
- Inferior dotest.py processes with side-channel-based test results will
- send test result event data in a pickled format, one event at a time.
- This class supports reconstructing the pickled data and forwarding it
- on to its final destination.
-
- The channel data is written in the form:
- {num_payload_bytes}#{payload_bytes}
-
- The bulk of this class is devoted to reading and parsing out
- the payload bytes.
- """
-
- def __init__(self, file_object, async_map, forwarding_func):
- asyncore.dispatcher.__init__(self, sock=file_object, map=async_map)
-
- self.header_contents = b""
- self.packet_bytes_remaining = 0
- self.reading_header = True
- self.ibuffer = b''
- self.forwarding_func = forwarding_func
- if forwarding_func is None:
- # This whole class is useless if we do nothing with the
- # unpickled results.
- raise Exception("forwarding function must be set")
-
- # Initiate all connections by sending an ack. This allows
- # the initiators of the socket to await this to ensure
- # that this end is up and running (and therefore already
- # into the async map).
- ack_bytes = b'*'
- file_object.send(ack_bytes)
-
- def deserialize_payload(self):
- """Unpickles the collected input buffer bytes and forwards."""
- if len(self.ibuffer) > 0:
- self.forwarding_func(cPickle.loads(self.ibuffer))
- self.ibuffer = b''
-
- def consume_header_bytes(self, data):
- """Consumes header bytes from the front of data.
- @param data the incoming data stream bytes
- @return any data leftover after consuming header bytes.
- """
- # We're done if there is no content.
- if not data or (len(data) == 0):
- return None
-
- full_header_len = 4
-
- assert len(self.header_contents) < full_header_len
-
- bytes_avail = len(data)
- bytes_needed = full_header_len - len(self.header_contents)
- header_bytes_avail = min(bytes_needed, bytes_avail)
- self.header_contents += data[:header_bytes_avail]
- if len(self.header_contents) == full_header_len:
- import struct
- # End of header.
- self.packet_bytes_remaining = struct.unpack(
- "!I", self.header_contents)[0]
- self.header_contents = b""
- self.reading_header = False
- return data[header_bytes_avail:]
-
- # If we made it here, we've exhausted the data and
- # we're still parsing header content.
- return None
-
- def consume_payload_bytes(self, data):
- """Consumes payload bytes from the front of data.
- @param data the incoming data stream bytes
- @return any data leftover after consuming remaining payload bytes.
- """
- if not data or (len(data) == 0):
- # We're done and there's nothing to do.
- return None
-
- data_len = len(data)
- if data_len <= self.packet_bytes_remaining:
- # We're consuming all the data provided.
- self.ibuffer += data
- self.packet_bytes_remaining -= data_len
-
- # If we're no longer waiting for payload bytes,
- # we flip back to parsing header bytes and we
- # unpickle the payload contents.
- if self.packet_bytes_remaining < 1:
- self.reading_header = True
- self.deserialize_payload()
-
- # We're done, no more data left.
- return None
- else:
- # We're only consuming a portion of the data since
- # the data contains more than the payload amount.
- self.ibuffer += data[:self.packet_bytes_remaining]
- data = data[self.packet_bytes_remaining:]
-
- # We now move on to reading the header.
- self.reading_header = True
- self.packet_bytes_remaining = 0
-
- # And we can deserialize the payload.
- self.deserialize_payload()
-
- # Return the remaining data.
- return data
-
- def handle_read(self):
- # Read some data from the socket.
- try:
- data = self.recv(8192)
- # print('driver socket READ: %d bytes' % len(data))
- except socket.error as socket_error:
- print(
- "\nINFO: received socket error when reading data "
- "from test inferior:\n{}".format(socket_error))
- raise
- except Exception as general_exception:
- print(
- "\nERROR: received non-socket error when reading data "
- "from the test inferior:\n{}".format(general_exception))
- raise
-
- # Consume the message content.
- while data and (len(data) > 0):
- # If we're reading the header, gather header bytes.
- if self.reading_header:
- data = self.consume_header_bytes(data)
- else:
- data = self.consume_payload_bytes(data)
-
- def handle_close(self):
- # print("socket reader: closing port")
- self.close()
-
-
-class UnpicklingForwardingListenerChannel(asyncore.dispatcher):
- """Provides a socket listener asyncore channel for unpickling/forwarding.
-
- This channel will listen on a socket port (use 0 for host-selected). Any
- client that connects will have an UnpicklingForwardingReaderChannel handle
- communication over the connection.
-
- The dotest parallel test runners, when collecting test results, open the
- test results side channel over a socket. This channel handles connections
- from inferiors back to the test runner. Each worker fires up a listener
- for each inferior invocation. This simplifies the asyncore.loop() usage,
- one of the reasons for implementing with asyncore. This listener shuts
- down once a single connection is made to it.
- """
-
- def __init__(self, async_map, host, port, backlog_count, forwarding_func):
- asyncore.dispatcher.__init__(self, map=async_map)
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.set_reuse_addr()
- self.bind((host, port))
- self.address = self.socket.getsockname()
- self.listen(backlog_count)
- self.handler = None
- self.async_map = async_map
- self.forwarding_func = forwarding_func
- if forwarding_func is None:
- # This whole class is useless if we do nothing with the
- # unpickled results.
- raise Exception("forwarding function must be set")
-
- def handle_accept(self):
- (sock, addr) = self.socket.accept()
- if sock and addr:
- # print('Incoming connection from %s' % repr(addr))
- self.handler = UnpicklingForwardingReaderChannel(
- sock, self.async_map, self.forwarding_func)
-
- def handle_close(self):
- self.close()
diff --git a/packages/Python/lldbsuite/test_event/event_builder.py b/packages/Python/lldbsuite/test_event/event_builder.py
deleted file mode 100644
index a7cb57c7f9c0..000000000000
--- a/packages/Python/lldbsuite/test_event/event_builder.py
+++ /dev/null
@@ -1,482 +0,0 @@
-"""
- The LLVM Compiler Infrastructure
-
-This file is distributed under the University of Illinois Open Source
-License. See LICENSE.TXT for details.
-
-Provides a class to build Python test event data structures.
-"""
-
-from __future__ import print_function
-from __future__ import absolute_import
-
-# System modules
-import inspect
-import time
-import traceback
-
-# Third-party modules
-
-# LLDB modules
-from . import build_exception
-
-
-class EventBuilder(object):
- """Helper class to build test result event dictionaries."""
-
- BASE_DICTIONARY = None
-
- # Test Event Types
- TYPE_JOB_RESULT = "job_result"
- TYPE_TEST_RESULT = "test_result"
- TYPE_TEST_START = "test_start"
- TYPE_MARK_TEST_RERUN_ELIGIBLE = "test_eligible_for_rerun"
- TYPE_MARK_TEST_EXPECTED_FAILURE = "test_expected_failure"
- TYPE_SESSION_TERMINATE = "terminate"
-
- RESULT_TYPES = {TYPE_JOB_RESULT, TYPE_TEST_RESULT}
-
- # Test/Job Status Tags
- STATUS_EXCEPTIONAL_EXIT = "exceptional_exit"
- STATUS_SUCCESS = "success"
- STATUS_FAILURE = "failure"
- STATUS_EXPECTED_FAILURE = "expected_failure"
- STATUS_EXPECTED_TIMEOUT = "expected_timeout"
- STATUS_UNEXPECTED_SUCCESS = "unexpected_success"
- STATUS_SKIP = "skip"
- STATUS_ERROR = "error"
- STATUS_TIMEOUT = "timeout"
-
- """Test methods or jobs with a status matching any of these
- status values will cause a testrun failure, unless
- the test methods rerun and do not trigger an issue when rerun."""
- TESTRUN_ERROR_STATUS_VALUES = {
- STATUS_ERROR,
- STATUS_EXCEPTIONAL_EXIT,
- STATUS_FAILURE,
- STATUS_TIMEOUT}
-
- @staticmethod
- def _get_test_name_info(test):
- """Returns (test-class-name, test-method-name) from a test case instance.
-
- @param test a unittest.TestCase instance.
-
- @return tuple containing (test class name, test method name)
- """
- test_class_components = test.id().split(".")
- test_class_name = ".".join(test_class_components[:-1])
- test_name = test_class_components[-1]
- return test_class_name, test_name
-
- @staticmethod
- def bare_event(event_type):
- """Creates an event with default additions, event type and timestamp.
-
- @param event_type the value set for the "event" key, used
- to distinguish events.
-
- @returns an event dictionary with all default additions, the "event"
- key set to the passed in event_type, and the event_time value set to
- time.time().
- """
- if EventBuilder.BASE_DICTIONARY is not None:
- # Start with a copy of the "always include" entries.
- event = dict(EventBuilder.BASE_DICTIONARY)
- else:
- event = {}
-
- event.update({
- "event": event_type,
- "event_time": time.time()
- })
- return event
-
- @staticmethod
- def _assert_is_python_sourcefile(test_filename):
- if test_filename is not None:
- if not test_filename.endswith(".py"):
- raise Exception(
- "source python filename has unexpected extension: {}".format(test_filename))
- return test_filename
-
- @staticmethod
- def _event_dictionary_common(test, event_type):
- """Returns an event dictionary setup with values for the given event type.
-
- @param test the unittest.TestCase instance
-
- @param event_type the name of the event type (string).
-
- @return event dictionary with common event fields set.
- """
- test_class_name, test_name = EventBuilder._get_test_name_info(test)
-
- # Determine the filename for the test case. If there is an attribute
- # for it, use it. Otherwise, determine from the TestCase class path.
- if hasattr(test, "test_filename"):
- test_filename = EventBuilder._assert_is_python_sourcefile(
- test.test_filename)
- else:
- test_filename = EventBuilder._assert_is_python_sourcefile(
- inspect.getsourcefile(test.__class__))
-
- event = EventBuilder.bare_event(event_type)
- event.update({
- "test_class": test_class_name,
- "test_name": test_name,
- "test_filename": test_filename
- })
-
- return event
-
- @staticmethod
- def _error_tuple_class(error_tuple):
- """Returns the unittest error tuple's error class as a string.
-
- @param error_tuple the error tuple provided by the test framework.
-
- @return the error type (typically an exception) raised by the
- test framework.
- """
- type_var = error_tuple[0]
- module = inspect.getmodule(type_var)
- if module:
- return "{}.{}".format(module.__name__, type_var.__name__)
- else:
- return type_var.__name__
-
- @staticmethod
- def _error_tuple_message(error_tuple):
- """Returns the unittest error tuple's error message.
-
- @param error_tuple the error tuple provided by the test framework.
-
- @return the error message provided by the test framework.
- """
- return str(error_tuple[1])
-
- @staticmethod
- def _error_tuple_traceback(error_tuple):
- """Returns the unittest error tuple's error message.
-
- @param error_tuple the error tuple provided by the test framework.
-
- @return the error message provided by the test framework.
- """
- return error_tuple[2]
-
- @staticmethod
- def _event_dictionary_test_result(test, status):
- """Returns an event dictionary with common test result fields set.
-
- @param test a unittest.TestCase instance.
-
- @param status the status/result of the test
- (e.g. "success", "failure", etc.)
-
- @return the event dictionary
- """
- event = EventBuilder._event_dictionary_common(
- test, EventBuilder.TYPE_TEST_RESULT)
- event["status"] = status
- return event
-
- @staticmethod
- def _event_dictionary_issue(test, status, error_tuple):
- """Returns an event dictionary with common issue-containing test result
- fields set.
-
- @param test a unittest.TestCase instance.
-
- @param status the status/result of the test
- (e.g. "success", "failure", etc.)
-
- @param error_tuple the error tuple as reported by the test runner.
- This is of the form (type<error>, error).
-
- @return the event dictionary
- """
- event = EventBuilder._event_dictionary_test_result(test, status)
- event["issue_class"] = EventBuilder._error_tuple_class(error_tuple)
- event["issue_message"] = EventBuilder._error_tuple_message(error_tuple)
- backtrace = EventBuilder._error_tuple_traceback(error_tuple)
- if backtrace is not None:
- event["issue_backtrace"] = traceback.format_tb(backtrace)
- return event
-
- @staticmethod
- def event_for_start(test):
- """Returns an event dictionary for the test start event.
-
- @param test a unittest.TestCase instance.
-
- @return the event dictionary
- """
- return EventBuilder._event_dictionary_common(
- test, EventBuilder.TYPE_TEST_START)
-
- @staticmethod
- def event_for_success(test):
- """Returns an event dictionary for a successful test.
-
- @param test a unittest.TestCase instance.
-
- @return the event dictionary
- """
- return EventBuilder._event_dictionary_test_result(
- test, EventBuilder.STATUS_SUCCESS)
-
- @staticmethod
- def event_for_unexpected_success(test, bugnumber):
- """Returns an event dictionary for a test that succeeded but was
- expected to fail.
-
- @param test a unittest.TestCase instance.
-
- @param bugnumber the issue identifier for the bug tracking the
- fix request for the test expected to fail (but is in fact
- passing here).
-
- @return the event dictionary
-
- """
- event = EventBuilder._event_dictionary_test_result(
- test, EventBuilder.STATUS_UNEXPECTED_SUCCESS)
- if bugnumber:
- event["bugnumber"] = str(bugnumber)
- return event
-
- @staticmethod
- def event_for_failure(test, error_tuple):
- """Returns an event dictionary for a test that failed.
-
- @param test a unittest.TestCase instance.
-
- @param error_tuple the error tuple as reported by the test runner.
- This is of the form (type<error>, error).
-
- @return the event dictionary
- """
- return EventBuilder._event_dictionary_issue(
- test, EventBuilder.STATUS_FAILURE, error_tuple)
-
- @staticmethod
- def event_for_expected_failure(test, error_tuple, bugnumber):
- """Returns an event dictionary for a test that failed as expected.
-
- @param test a unittest.TestCase instance.
-
- @param error_tuple the error tuple as reported by the test runner.
- This is of the form (type<error>, error).
-
- @param bugnumber the issue identifier for the bug tracking the
- fix request for the test expected to fail.
-
- @return the event dictionary
-
- """
- event = EventBuilder._event_dictionary_issue(
- test, EventBuilder.STATUS_EXPECTED_FAILURE, error_tuple)
- if bugnumber:
- event["bugnumber"] = str(bugnumber)
- return event
-
- @staticmethod
- def event_for_skip(test, reason):
- """Returns an event dictionary for a test that was skipped.
-
- @param test a unittest.TestCase instance.
-
- @param reason the reason why the test is being skipped.
-
- @return the event dictionary
- """
- event = EventBuilder._event_dictionary_test_result(
- test, EventBuilder.STATUS_SKIP)
- event["skip_reason"] = reason
- return event
-
- @staticmethod
- def event_for_error(test, error_tuple):
- """Returns an event dictionary for a test that hit a test execution error.
-
- @param test a unittest.TestCase instance.
-
- @param error_tuple the error tuple as reported by the test runner.
- This is of the form (type<error>, error).
-
- @return the event dictionary
- """
- event = EventBuilder._event_dictionary_issue(
- test, EventBuilder.STATUS_ERROR, error_tuple)
- event["issue_phase"] = "test"
- return event
-
- @staticmethod
- def event_for_build_error(test, error_tuple):
- """Returns an event dictionary for a test that hit a test execution error
- during the test cleanup phase.
-
- @param test a unittest.TestCase instance.
-
- @param error_tuple the error tuple as reported by the test runner.
- This is of the form (type<error>, error).
-
- @return the event dictionary
- """
- event = EventBuilder._event_dictionary_issue(
- test, EventBuilder.STATUS_ERROR, error_tuple)
- event["issue_phase"] = "build"
-
- build_error = error_tuple[1]
- event["build_command"] = build_error.command
- event["build_error"] = build_error.build_error
- return event
-
- @staticmethod
- def event_for_cleanup_error(test, error_tuple):
- """Returns an event dictionary for a test that hit a test execution error
- during the test cleanup phase.
-
- @param test a unittest.TestCase instance.
-
- @param error_tuple the error tuple as reported by the test runner.
- This is of the form (type<error>, error).
-
- @return the event dictionary
- """
- event = EventBuilder._event_dictionary_issue(
- test, EventBuilder.STATUS_ERROR, error_tuple)
- event["issue_phase"] = "cleanup"
- return event
-
- @staticmethod
- def event_for_job_test_add_error(test_filename, exception, backtrace):
- event = EventBuilder.bare_event(EventBuilder.TYPE_JOB_RESULT)
- event["status"] = EventBuilder.STATUS_ERROR
- if test_filename is not None:
- event["test_filename"] = EventBuilder._assert_is_python_sourcefile(
- test_filename)
- if exception is not None and "__class__" in dir(exception):
- event["issue_class"] = exception.__class__
- event["issue_message"] = exception
- if backtrace is not None:
- event["issue_backtrace"] = backtrace
- return event
-
- @staticmethod
- def event_for_job_exceptional_exit(
- pid, worker_index, exception_code, exception_description,
- test_filename, command_line):
- """Creates an event for a job (i.e. process) exit due to signal.
-
- @param pid the process id for the job that failed
- @param worker_index optional id for the job queue running the process
- @param exception_code optional code
- (e.g. SIGTERM integer signal number)
- @param exception_description optional string containing symbolic
- representation of the issue (e.g. "SIGTERM")
- @param test_filename the path to the test filename that exited
- in some exceptional way.
- @param command_line the Popen()-style list provided as the command line
- for the process that timed out.
-
- @return an event dictionary coding the job completion description.
- """
- event = EventBuilder.bare_event(EventBuilder.TYPE_JOB_RESULT)
- event["status"] = EventBuilder.STATUS_EXCEPTIONAL_EXIT
- if pid is not None:
- event["pid"] = pid
- if worker_index is not None:
- event["worker_index"] = int(worker_index)
- if exception_code is not None:
- event["exception_code"] = exception_code
- if exception_description is not None:
- event["exception_description"] = exception_description
- if test_filename is not None:
- event["test_filename"] = EventBuilder._assert_is_python_sourcefile(
- test_filename)
- if command_line is not None:
- event["command_line"] = command_line
- return event
-
- @staticmethod
- def event_for_job_timeout(pid, worker_index, test_filename, command_line):
- """Creates an event for a job (i.e. process) timeout.
-
- @param pid the process id for the job that timed out
- @param worker_index optional id for the job queue running the process
- @param test_filename the path to the test filename that timed out.
- @param command_line the Popen-style list provided as the command line
- for the process that timed out.
-
- @return an event dictionary coding the job completion description.
- """
- event = EventBuilder.bare_event(EventBuilder.TYPE_JOB_RESULT)
- event["status"] = "timeout"
- if pid is not None:
- event["pid"] = pid
- if worker_index is not None:
- event["worker_index"] = int(worker_index)
- if test_filename is not None:
- event["test_filename"] = EventBuilder._assert_is_python_sourcefile(
- test_filename)
- if command_line is not None:
- event["command_line"] = command_line
- return event
-
- @staticmethod
- def event_for_mark_test_rerun_eligible(test):
- """Creates an event that indicates the specified test is explicitly
- eligible for rerun.
-
- Note there is a mode that will enable test rerun eligibility at the
- global level. These markings for explicit rerun eligibility are
- intended for the mode of running where only explicitly re-runnable
- tests are rerun upon hitting an issue.
-
- @param test the TestCase instance to which this pertains.
-
- @return an event that specifies the given test as being eligible to
- be rerun.
- """
- event = EventBuilder._event_dictionary_common(
- test,
- EventBuilder.TYPE_MARK_TEST_RERUN_ELIGIBLE)
- return event
-
- @staticmethod
- def event_for_mark_test_expected_failure(test):
- """Creates an event that indicates the specified test is expected
- to fail.
-
- @param test the TestCase instance to which this pertains.
-
- @return an event that specifies the given test is expected to fail.
- """
- event = EventBuilder._event_dictionary_common(
- test,
- EventBuilder.TYPE_MARK_TEST_EXPECTED_FAILURE)
- return event
-
- @staticmethod
- def add_entries_to_all_events(entries_dict):
- """Specifies a dictionary of entries to add to all test events.
-
- This provides a mechanism for, say, a parallel test runner to
- indicate to each inferior dotest.py that it should add a
- worker index to each.
-
- Calling this method replaces all previous entries added
- by a prior call to this.
-
- Event build methods will overwrite any entries that collide.
- Thus, the passed in dictionary is the base, which gets merged
- over by event building when keys collide.
-
- @param entries_dict a dictionary containing key and value
- pairs that should be merged into all events created by the
- event generator. May be None to clear out any extra entries.
- """
- EventBuilder.BASE_DICTIONARY = dict(entries_dict)
diff --git a/packages/Python/lldbsuite/test_event/formatter/__init__.py b/packages/Python/lldbsuite/test_event/formatter/__init__.py
deleted file mode 100644
index 2481e326e946..000000000000
--- a/packages/Python/lldbsuite/test_event/formatter/__init__.py
+++ /dev/null
@@ -1,163 +0,0 @@
-"""
- The LLVM Compiler Infrastructure
-
-This file is distributed under the University of Illinois Open Source
-License. See LICENSE.TXT for details.
-"""
-
-from __future__ import print_function
-from __future__ import absolute_import
-
-# System modules
-import importlib
-import socket
-import sys
-
-# Third-party modules
-
-# LLDB modules
-
-
-# Ignore method count on DTOs.
-# pylint: disable=too-few-public-methods
-class FormatterConfig(object):
- """Provides formatter configuration info to create_results_formatter()."""
-
- def __init__(self):
- self.filename = None
- self.port = None
- self.formatter_name = None
- self.formatter_options = None
-
-
-# Ignore method count on DTOs.
-# pylint: disable=too-few-public-methods
-class CreatedFormatter(object):
- """Provides transfer object for returns from create_results_formatter()."""
-
- def __init__(self, formatter, cleanup_func):
- self.formatter = formatter
- self.cleanup_func = cleanup_func
-
-
-def create_results_formatter(config):
- """Sets up a test results formatter.
-
- @param config an instance of FormatterConfig
- that indicates how to setup the ResultsFormatter.
-
- @return an instance of CreatedFormatter.
- """
-
- def create_socket(port):
- """Creates a socket to the localhost on the given port.
-
- @param port the port number of the listening port on
- the localhost.
-
- @return (socket object, socket closing function)
- """
-
- def socket_closer(open_sock):
- """Close down an opened socket properly."""
- open_sock.shutdown(socket.SHUT_RDWR)
- open_sock.close()
-
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(("localhost", port))
-
- # Wait for the ack from the listener side.
- # This is needed to prevent a race condition
- # in the main dosep.py processing loop: we
- # can't allow a worker queue thread to die
- # that has outstanding messages to a listener
- # socket before the listener socket asyncore
- # listener socket gets spun up; otherwise,
- # we lose the test result info.
- read_bytes = sock.recv(1)
- if read_bytes is None or (len(read_bytes) < 1) or (read_bytes != b'*'):
- raise Exception(
- "listening socket did not respond with ack byte: response={}".format(read_bytes))
-
- return sock, lambda: socket_closer(sock)
-
- default_formatter_name = None
- results_file_object = None
- cleanup_func = None
-
- file_is_stream = False
- if config.filename:
- # Open the results file for writing.
- if config.filename == 'stdout':
- results_file_object = sys.stdout
- cleanup_func = None
- elif config.filename == 'stderr':
- results_file_object = sys.stderr
- cleanup_func = None
- else:
- results_file_object = open(config.filename, "w")
- cleanup_func = results_file_object.close
- default_formatter_name = (
- "lldbsuite.test_event.formatter.xunit.XunitFormatter")
- elif config.port:
- # Connect to the specified localhost port.
- results_file_object, cleanup_func = create_socket(config.port)
- default_formatter_name = (
- "lldbsuite.test_event.formatter.pickled.RawPickledFormatter")
- file_is_stream = True
-
- # If we have a results formatter name specified and we didn't specify
- # a results file, we should use stdout.
- if config.formatter_name is not None and results_file_object is None:
- # Use stdout.
- results_file_object = sys.stdout
- cleanup_func = None
-
- if results_file_object:
- # We care about the formatter. Choose user-specified or, if
- # none specified, use the default for the output type.
- if config.formatter_name:
- formatter_name = config.formatter_name
- else:
- formatter_name = default_formatter_name
-
- # Create an instance of the class.
- # First figure out the package/module.
- components = formatter_name.split(".")
- module = importlib.import_module(".".join(components[:-1]))
-
- # Create the class name we need to load.
- cls = getattr(module, components[-1])
-
- # Handle formatter options for the results formatter class.
- formatter_arg_parser = cls.arg_parser()
- if config.formatter_options and len(config.formatter_options) > 0:
- command_line_options = config.formatter_options
- else:
- command_line_options = []
-
- formatter_options = formatter_arg_parser.parse_args(
- command_line_options)
-
- # Create the TestResultsFormatter given the processed options.
- results_formatter_object = cls(
- results_file_object,
- formatter_options,
- file_is_stream)
-
- def shutdown_formatter():
- """Shuts down the formatter when it is no longer needed."""
- # Tell the formatter to write out anything it may have
- # been saving until the very end (e.g. xUnit results
- # can't complete its output until this point).
- results_formatter_object.send_terminate_as_needed()
-
- # And now close out the output file-like object.
- if cleanup_func is not None:
- cleanup_func()
-
- return CreatedFormatter(
- results_formatter_object,
- shutdown_formatter)
- else:
- return None
diff --git a/packages/Python/lldbsuite/test_event/formatter/curses.py b/packages/Python/lldbsuite/test_event/formatter/curses.py
deleted file mode 100644
index f415575ded8a..000000000000
--- a/packages/Python/lldbsuite/test_event/formatter/curses.py
+++ /dev/null
@@ -1,342 +0,0 @@
-"""
- The LLVM Compiler Infrastructure
-
- This file is distributed under the University of Illinois Open Source
- License. See LICENSE.TXT for details.
-"""
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-# System modules
-import curses
-import datetime
-import math
-import sys
-import time
-
-# Third-party modules
-
-# LLDB modules
-from lldbsuite.test import lldbcurses
-
-from . import results_formatter
-from ..event_builder import EventBuilder
-
-
-class Curses(results_formatter.ResultsFormatter):
- """Receives live results from tests that are running and reports them to the terminal in a curses GUI"""
-
- def __init__(self, out_file, options, file_is_stream):
- # Initialize the parent
- super(Curses, self).__init__(out_file, options, file_is_stream)
- self.using_terminal = True
- self.have_curses = True
- self.initialize_event = None
- self.jobs = [None] * 64
- self.job_tests = [None] * 64
- self.results = list()
- try:
- self.main_window = lldbcurses.intialize_curses()
- self.main_window.add_key_action(
- '\t',
- self.main_window.select_next_first_responder,
- "Switch between views that can respond to keyboard input")
- self.main_window.refresh()
- self.job_panel = None
- self.results_panel = None
- self.status_panel = None
- self.info_panel = None
- self.hide_status_list = list()
- self.start_time = time.time()
- except:
- self.have_curses = False
- lldbcurses.terminate_curses()
- self.using_terminal = False
- print("Unexpected error:", sys.exc_info()[0])
- raise
-
- self.line_dict = dict()
- # self.events_file = open("/tmp/events.txt", "w")
- # self.formatters = list()
- # if tee_results_formatter:
- # self.formatters.append(tee_results_formatter)
-
- def status_to_short_str(self, status, test_event):
- if status == EventBuilder.STATUS_SUCCESS:
- return '.'
- elif status == EventBuilder.STATUS_FAILURE:
- return 'F'
- elif status == EventBuilder.STATUS_UNEXPECTED_SUCCESS:
- return '?'
- elif status == EventBuilder.STATUS_EXPECTED_FAILURE:
- return 'X'
- elif status == EventBuilder.STATUS_SKIP:
- return 'S'
- elif status == EventBuilder.STATUS_ERROR:
- if test_event.get("issue_phase", None) == "build":
- # Build failure
- return 'B'
- else:
- return 'E'
- elif status == EventBuilder.STATUS_TIMEOUT:
- return 'T'
- elif status == EventBuilder.STATUS_EXPECTED_TIMEOUT:
- return 't'
- else:
- return status
-
- def show_info_panel(self):
- selected_idx = self.results_panel.get_selected_idx()
- if selected_idx >= 0 and selected_idx < len(self.results):
- if self.info_panel is None:
- info_frame = self.results_panel.get_contained_rect(
- top_inset=10, left_inset=10, right_inset=10, height=30)
- self.info_panel = lldbcurses.BoxedPanel(
- info_frame, "Result Details")
- # Add a key action for any key that will hide this panel when
- # any key is pressed
- self.info_panel.add_key_action(-1,
- self.hide_info_panel,
- 'Hide the info panel')
- self.info_panel.top()
- else:
- self.info_panel.show()
-
- self.main_window.push_first_responder(self.info_panel)
- test_start = self.results[selected_idx][0]
- test_result = self.results[selected_idx][1]
- self.info_panel.set_line(
- 0, "File: %s" %
- (test_start['test_filename']))
- self.info_panel.set_line(
- 1, "Test: %s.%s" %
- (test_start['test_class'], test_start['test_name']))
- self.info_panel.set_line(
- 2, "Time: %s" %
- (test_result['elapsed_time']))
- self.info_panel.set_line(3, "Status: %s" % (test_result['status']))
-
- def hide_info_panel(self):
- self.main_window.pop_first_responder(self.info_panel)
- self.info_panel.hide()
- self.main_window.refresh()
-
- def toggle_status(self, status):
- if status:
- # Toggle showing and hiding results whose status matches "status"
- # in "Results" window
- if status in self.hide_status_list:
- self.hide_status_list.remove(status)
- else:
- self.hide_status_list.append(status)
- self.update_results()
-
- def update_results(self, update=True):
- '''Called after a category of test have been show/hidden to update the results list with
- what the user desires to see.'''
- self.results_panel.clear(update=False)
- for result in self.results:
- test_result = result[1]
- status = test_result['status']
- if status in self.hide_status_list:
- continue
- name = test_result['test_class'] + '.' + test_result['test_name']
- self.results_panel.append_line(
- '%s (%6.2f sec) %s' %
- (self.status_to_short_str(
- status,
- test_result),
- test_result['elapsed_time'],
- name))
- if update:
- self.main_window.refresh()
-
- def handle_event(self, test_event):
- with self.lock:
- super(Curses, self).handle_event(test_event)
- # for formatter in self.formatters:
- # formatter.process_event(test_event)
- if self.have_curses:
- worker_index = -1
- if 'worker_index' in test_event:
- worker_index = test_event['worker_index']
- if 'event' in test_event:
- check_for_one_key = True
- #print(str(test_event), file=self.events_file)
- event = test_event['event']
- if self.status_panel:
- self.status_panel.update_status(
- 'time', str(
- datetime.timedelta(
- seconds=math.floor(
- time.time() - self.start_time))))
- if event == 'test_start':
- name = test_event['test_class'] + \
- '.' + test_event['test_name']
- self.job_tests[worker_index] = test_event
- if 'pid' in test_event:
- line = 'pid: %5d ' % (test_event['pid']) + name
- else:
- line = name
- self.job_panel.set_line(worker_index, line)
- self.main_window.refresh()
- elif event == 'test_result':
- status = test_event['status']
- self.status_panel.increment_status(status)
- if 'pid' in test_event:
- line = 'pid: %5d ' % (test_event['pid'])
- else:
- line = ''
- self.job_panel.set_line(worker_index, line)
- name = test_event['test_class'] + \
- '.' + test_event['test_name']
- elapsed_time = test_event[
- 'event_time'] - self.job_tests[worker_index]['event_time']
- if status not in self.hide_status_list:
- self.results_panel.append_line(
- '%s (%6.2f sec) %s' %
- (self.status_to_short_str(
- status, test_event), elapsed_time, name))
- self.main_window.refresh()
- # Append the result pairs
- test_event['elapsed_time'] = elapsed_time
- self.results.append(
- [self.job_tests[worker_index], test_event])
- self.job_tests[worker_index] = ''
- elif event == 'job_begin':
- self.jobs[worker_index] = test_event
- if 'pid' in test_event:
- line = 'pid: %5d ' % (test_event['pid'])
- else:
- line = ''
- self.job_panel.set_line(worker_index, line)
- elif event == 'job_end':
- self.jobs[worker_index] = ''
- self.job_panel.set_line(worker_index, '')
- elif event == 'initialize':
- self.initialize_event = test_event
- num_jobs = test_event['worker_count']
- job_frame = self.main_window.get_contained_rect(
- height=num_jobs + 2)
- results_frame = self.main_window.get_contained_rect(
- top_inset=num_jobs + 2, bottom_inset=1)
- status_frame = self.main_window.get_contained_rect(
- height=1, top_inset=self.main_window.get_size().h - 1)
- self.job_panel = lldbcurses.BoxedPanel(
- frame=job_frame, title="Jobs")
- self.results_panel = lldbcurses.BoxedPanel(
- frame=results_frame, title="Results")
-
- self.results_panel.add_key_action(
- curses.KEY_UP,
- self.results_panel.select_prev,
- "Select the previous list entry")
- self.results_panel.add_key_action(
- curses.KEY_DOWN, self.results_panel.select_next, "Select the next list entry")
- self.results_panel.add_key_action(
- curses.KEY_HOME,
- self.results_panel.scroll_begin,
- "Scroll to the start of the list")
- self.results_panel.add_key_action(
- curses.KEY_END, self.results_panel.scroll_end, "Scroll to the end of the list")
- self.results_panel.add_key_action(
- curses.KEY_ENTER,
- self.show_info_panel,
- "Display info for the selected result item")
- self.results_panel.add_key_action(
- '.',
- lambda: self.toggle_status(
- EventBuilder.STATUS_SUCCESS),
- "Toggle showing/hiding tests whose status is 'success'")
- self.results_panel.add_key_action(
- 'e',
- lambda: self.toggle_status(
- EventBuilder.STATUS_ERROR),
- "Toggle showing/hiding tests whose status is 'error'")
- self.results_panel.add_key_action(
- 'f',
- lambda: self.toggle_status(
- EventBuilder.STATUS_FAILURE),
- "Toggle showing/hiding tests whose status is 'failure'")
- self.results_panel.add_key_action('s', lambda: self.toggle_status(
- EventBuilder.STATUS_SKIP), "Toggle showing/hiding tests whose status is 'skip'")
- self.results_panel.add_key_action(
- 'x',
- lambda: self.toggle_status(
- EventBuilder.STATUS_EXPECTED_FAILURE),
- "Toggle showing/hiding tests whose status is 'expected_failure'")
- self.results_panel.add_key_action(
- '?',
- lambda: self.toggle_status(
- EventBuilder.STATUS_UNEXPECTED_SUCCESS),
- "Toggle showing/hiding tests whose status is 'unexpected_success'")
- self.status_panel = lldbcurses.StatusPanel(
- frame=status_frame)
-
- self.main_window.add_child(self.job_panel)
- self.main_window.add_child(self.results_panel)
- self.main_window.add_child(self.status_panel)
- self.main_window.set_first_responder(
- self.results_panel)
-
- self.status_panel.add_status_item(
- name="time",
- title="Elapsed",
- format="%s",
- width=20,
- value="0:00:00",
- update=False)
- self.status_panel.add_status_item(
- name=EventBuilder.STATUS_SUCCESS,
- title="Success",
- format="%u",
- width=20,
- value=0,
- update=False)
- self.status_panel.add_status_item(
- name=EventBuilder.STATUS_FAILURE,
- title="Failure",
- format="%u",
- width=20,
- value=0,
- update=False)
- self.status_panel.add_status_item(
- name=EventBuilder.STATUS_ERROR,
- title="Error",
- format="%u",
- width=20,
- value=0,
- update=False)
- self.status_panel.add_status_item(
- name=EventBuilder.STATUS_SKIP,
- title="Skipped",
- format="%u",
- width=20,
- value=0,
- update=True)
- self.status_panel.add_status_item(
- name=EventBuilder.STATUS_EXPECTED_FAILURE,
- title="Expected Failure",
- format="%u",
- width=30,
- value=0,
- update=False)
- self.status_panel.add_status_item(
- name=EventBuilder.STATUS_UNEXPECTED_SUCCESS,
- title="Unexpected Success",
- format="%u",
- width=30,
- value=0,
- update=False)
- self.main_window.refresh()
- elif event == 'terminate':
- # self.main_window.key_event_loop()
- lldbcurses.terminate_curses()
- check_for_one_key = False
- self.using_terminal = False
- # Check for 1 keypress with no delay
-
- # Check for 1 keypress with no delay
- if check_for_one_key:
- self.main_window.key_event_loop(0, 1)
diff --git a/packages/Python/lldbsuite/test_event/formatter/dump_formatter.py b/packages/Python/lldbsuite/test_event/formatter/dump_formatter.py
deleted file mode 100644
index d42dcb18bb4f..000000000000
--- a/packages/Python/lldbsuite/test_event/formatter/dump_formatter.py
+++ /dev/null
@@ -1,23 +0,0 @@
-"""
- The LLVM Compiler Infrastructure
-
-This file is distributed under the University of Illinois Open Source
-License. See LICENSE.TXT for details.
-"""
-
-from __future__ import print_function
-from __future__ import absolute_import
-
-# System modules
-import pprint
-
-# Our modules
-from .results_formatter import ResultsFormatter
-
-
-class DumpFormatter(ResultsFormatter):
- """Formats events to the file as their raw python dictionary format."""
-
- def handle_event(self, test_event):
- super(DumpFormatter, self).handle_event(test_event)
- self.out_file.write("\n" + pprint.pformat(test_event) + "\n")
diff --git a/packages/Python/lldbsuite/test_event/formatter/pickled.py b/packages/Python/lldbsuite/test_event/formatter/pickled.py
deleted file mode 100644
index 588614e2f7b4..000000000000
--- a/packages/Python/lldbsuite/test_event/formatter/pickled.py
+++ /dev/null
@@ -1,80 +0,0 @@
-"""
- The LLVM Compiler Infrastructure
-
-This file is distributed under the University of Illinois Open Source
-License. See LICENSE.TXT for details.
-"""
-
-from __future__ import print_function
-from __future__ import absolute_import
-
-# System modules
-import os
-
-# Our modules
-from .results_formatter import ResultsFormatter
-from six.moves import cPickle
-
-
-class RawPickledFormatter(ResultsFormatter):
- """Formats events as a pickled stream.
-
- The parallel test runner has inferiors pickle their results and send them
- over a socket back to the parallel test. The parallel test runner then
- aggregates them into the final results formatter (e.g. xUnit).
- """
-
- @classmethod
- def arg_parser(cls):
- """@return arg parser used to parse formatter-specific options."""
- parser = super(RawPickledFormatter, cls).arg_parser()
- return parser
-
- class StreamSerializer(object):
-
- @staticmethod
- def serialize(test_event, out_file):
- # Send it as
- # {serialized_length_of_serialized_bytes}{serialized_bytes}
- import struct
- msg = cPickle.dumps(test_event)
- packet = struct.pack("!I%ds" % len(msg), len(msg), msg)
- out_file.send(packet)
-
- class BlockSerializer(object):
-
- @staticmethod
- def serialize(test_event, out_file):
- cPickle.dump(test_event, out_file)
-
- def __init__(self, out_file, options, file_is_stream):
- super(
- RawPickledFormatter,
- self).__init__(
- out_file,
- options,
- file_is_stream)
- self.pid = os.getpid()
- if file_is_stream:
- self.serializer = self.StreamSerializer()
- else:
- self.serializer = self.BlockSerializer()
-
- def handle_event(self, test_event):
- super(RawPickledFormatter, self).handle_event(test_event)
-
- # Convert initialize/terminate events into job_begin/job_end events.
- event_type = test_event["event"]
- if event_type is None:
- return
-
- if event_type == "initialize":
- test_event["event"] = "job_begin"
- elif event_type == "terminate":
- test_event["event"] = "job_end"
-
- # Tack on the pid.
- test_event["pid"] = self.pid
-
- # Serialize the test event.
- self.serializer.serialize(test_event, self.out_file)
diff --git a/packages/Python/lldbsuite/test_event/formatter/results_formatter.py b/packages/Python/lldbsuite/test_event/formatter/results_formatter.py
deleted file mode 100644
index 8e341cb2ce84..000000000000
--- a/packages/Python/lldbsuite/test_event/formatter/results_formatter.py
+++ /dev/null
@@ -1,766 +0,0 @@
-"""
- The LLVM Compiler Infrastructure
-
-This file is distributed under the University of Illinois Open Source
-License. See LICENSE.TXT for details.
-
-Provides classes used by the test results reporting infrastructure
-within the LLDB test suite.
-"""
-
-from __future__ import print_function
-from __future__ import absolute_import
-
-# System modules
-import argparse
-import os
-import re
-import sys
-import threading
-
-# Third-party modules
-
-
-# LLDB modules
-from lldbsuite.test import configuration
-from ..event_builder import EventBuilder
-
-import lldbsuite
-
-
-FILE_LEVEL_KEY_RE = re.compile(r"^(.+\.py)[^.:]*$")
-
-
-class ResultsFormatter(object):
- """Provides interface to formatting test results out to a file-like object.
-
- This class allows the LLDB test framework's raw test-related
- events to be processed and formatted in any manner desired.
- Test events are represented by python dictionaries, formatted
- as in the EventBuilder class above.
-
- ResultFormatter instances are given a file-like object in which
- to write their results.
-
- ResultFormatter lifetime looks like the following:
-
- # The result formatter is created.
- # The argparse options dictionary is generated from calling
- # the SomeResultFormatter.arg_parser() with the options data
- # passed to dotest.py via the "--results-formatter-options"
- # argument. See the help on that for syntactic requirements
- # on getting that parsed correctly.
- formatter = SomeResultFormatter(file_like_object, argparse_options_dict)
-
- # Single call to session start, before parsing any events.
- formatter.begin_session()
-
- formatter.handle_event({"event":"initialize",...})
-
- # Zero or more calls specified for events recorded during the test session.
- # The parallel test runner manages getting results from all the inferior
- # dotest processes, so from a new format perspective, don't worry about
- # that. The formatter will be presented with a single stream of events
- # sandwiched between a single begin_session()/end_session() pair in the
- # parallel test runner process/thread.
- for event in zero_or_more_test_events():
- formatter.handle_event(event)
-
- # Single call to terminate/wrap-up. For formatters that need all the
- # data before they can print a correct result (e.g. xUnit/JUnit),
- # this is where the final report can be generated.
- formatter.handle_event({"event":"terminate",...})
-
- It is not the formatter's responsibility to close the file_like_object.
- (i.e. do not close it).
-
- The lldb test framework passes these test events in real time, so they
- arrive as they come in.
-
- In the case of the parallel test runner, the dotest inferiors
- add a 'pid' field to the dictionary that indicates which inferior
- pid generated the event.
-
- Note more events may be added in the future to support richer test
- reporting functionality. One example: creating a true flaky test
- result category so that unexpected successes really mean the test
- is marked incorrectly (either should be marked flaky, or is indeed
- passing consistently now and should have the xfail marker
- removed). In this case, a flaky_success and flaky_fail event
- likely will be added to capture these and support reporting things
- like percentages of flaky test passing so we can see if we're
- making some things worse/better with regards to failure rates.
-
- Another example: announcing all the test methods that are planned
- to be run, so we can better support redo operations of various kinds
- (redo all non-run tests, redo non-run tests except the one that
- was running [perhaps crashed], etc.)
-
- Implementers are expected to override all the public methods
- provided in this class. See each method's docstring to see
- expectations about when the call should be chained.
-
- """
- @classmethod
- def arg_parser(cls):
- """@return arg parser used to parse formatter-specific options."""
- parser = argparse.ArgumentParser(
- description='{} options'.format(cls.__name__),
- usage=('dotest.py --results-formatter-options='
- '"--option1 value1 [--option2 value2 [...]]"'))
- parser.add_argument(
- "--dump-results",
- action="store_true",
- help=('dump the raw results data after printing '
- 'the summary output.'))
- return parser
-
- def __init__(self, out_file, options, file_is_stream):
- super(ResultsFormatter, self).__init__()
- self.out_file = out_file
- self.options = options
- self.using_terminal = False
- if not self.out_file:
- raise Exception("ResultsFormatter created with no file object")
- self.start_time_by_test = {}
- self.terminate_called = False
- self.file_is_stream = file_is_stream
-
- # Track the most recent test start event by worker index.
- # We'll use this to assign TIMEOUT and exceptional
- # exits to the most recent test started on a given
- # worker index.
- self.started_tests_by_worker = {}
-
- # Store the most recent test_method/job status.
- self.result_events = {}
-
- # Track the number of test method reruns.
- self.test_method_rerun_count = 0
-
- # Lock that we use while mutating inner state, like the
- # total test count and the elements. We minimize how
- # long we hold the lock just to keep inner state safe, not
- # entirely consistent from the outside.
- self.lock = threading.RLock()
-
- # Keeps track of the test base filenames for tests that
- # are expected to timeout. If a timeout occurs in any test
- # basename that matches this list, that result should be
- # converted into a non-issue. We'll create an expected
- # timeout test status for this.
- self.expected_timeouts_by_basename = set()
-
- # Tests which have reported that they are expecting to fail. These will
- # be marked as expected failures even if they return a failing status,
- # probably because they crashed or deadlocked.
- self.expected_failures = set()
-
- # Keep track of rerun-eligible tests.
- # This is a set that contains tests saved as:
- # {test_filename}:{test_class}:{test_name}
- self.rerun_eligible_tests = set()
-
- # A dictionary of test files that had a failing
- # test, in the format of:
- # key = test path, value = array of test methods that need rerun
- self.tests_for_rerun = {}
-
- @classmethod
- def _make_key(cls, result_event):
- """Creates a key from a test or job result event.
-
- This key attempts to be as unique as possible. For
- test result events, it will be unique per test method.
- For job events (ones not promoted to a test result event),
- it will be unique per test case file.
-
- @return a string-based key of the form
- {test_filename}:{test_class}.{test_name}
- """
- if result_event is None:
- return None
- component_count = 0
- if "test_filename" in result_event:
- key = result_event["test_filename"]
- component_count += 1
- else:
- key = "<no_filename>"
- if "test_class" in result_event:
- if component_count > 0:
- key += ":"
- key += result_event["test_class"]
- component_count += 1
- if "test_name" in result_event:
- if component_count > 0:
- key += "."
- key += result_event["test_name"]
- component_count += 1
- return key
-
- @classmethod
- def _is_file_level_issue(cls, key, event):
- """Returns whether a given key represents a file-level event.
-
- @param cls this class. Unused, but following PEP8 for
- preferring @classmethod over @staticmethod.
-
- @param key the key for the issue being tested.
-
- @param event the event for the issue being tested.
-
- @return True when the given key (as made by _make_key())
- represents an event that is at the test file level (i.e.
- it isn't scoped to a test class or method).
- """
- if key is None:
- return False
- else:
- return FILE_LEVEL_KEY_RE.match(key) is not None
-
- def _mark_test_as_expected_failure(self, test_result_event):
- key = self._make_key(test_result_event)
- if key is not None:
- self.expected_failures.add(key)
- else:
- sys.stderr.write(
- "\nerror: test marked as expected failure but "
- "failed to create key.\n")
-
- def _mark_test_for_rerun_eligibility(self, test_result_event):
- key = self._make_key(test_result_event)
- if key is not None:
- self.rerun_eligible_tests.add(key)
- else:
- sys.stderr.write(
- "\nerror: test marked for re-run eligibility but "
- "failed to create key.\n")
-
- def _maybe_add_test_to_rerun_list(self, result_event):
- key = self._make_key(result_event)
- if key is not None:
- if (key in self.rerun_eligible_tests or
- configuration.rerun_all_issues):
- test_filename = result_event.get("test_filename", None)
- if test_filename is not None:
- test_name = result_event.get("test_name", None)
- if test_filename not in self.tests_for_rerun:
- self.tests_for_rerun[test_filename] = []
- if test_name is not None:
- self.tests_for_rerun[test_filename].append(test_name)
- else:
- sys.stderr.write(
- "\nerror: couldn't add testrun-failing test to rerun "
- "list because no eligibility key could be created.\n")
-
- def _maybe_remap_job_result_event(self, test_event):
- """Remaps timeout/exceptional exit job results to last test method running.
-
- @param test_event the job_result test event. This is an in/out
- parameter. It will be modified if it can be mapped to a test_result
- of the same status, using details from the last-running test method
- known to be most recently started on the same worker index.
- """
- test_start = None
-
- job_status = test_event["status"]
- if job_status in [
- EventBuilder.STATUS_TIMEOUT,
- EventBuilder.STATUS_EXCEPTIONAL_EXIT]:
- worker_index = test_event.get("worker_index", None)
- if worker_index is not None:
- test_start = self.started_tests_by_worker.get(
- worker_index, None)
-
- # If we have a test start to remap, do it here.
- if test_start is not None:
- test_event["event"] = EventBuilder.TYPE_TEST_RESULT
-
- # Fill in all fields from test start not present in
- # job status message.
- for (start_key, start_value) in test_start.items():
- if start_key not in test_event:
- test_event[start_key] = start_value
-
- def _maybe_remap_expected_timeout(self, event):
- if event is None:
- return
-
- status = event.get("status", None)
- if status is None or status != EventBuilder.STATUS_TIMEOUT:
- return
-
- # Check if the timeout test's basename is in the expected timeout
- # list. If so, convert to an expected timeout.
- basename = os.path.basename(event.get("test_filename", ""))
- if basename in self.expected_timeouts_by_basename:
- # Convert to an expected timeout.
- event["status"] = EventBuilder.STATUS_EXPECTED_TIMEOUT
-
- def _maybe_remap_expected_failure(self, event):
- if event is None:
- return
-
- key = self._make_key(event)
- if key not in self.expected_failures:
- return
-
- status = event.get("status", None)
- if status in EventBuilder.TESTRUN_ERROR_STATUS_VALUES:
- event["status"] = EventBuilder.STATUS_EXPECTED_FAILURE
- elif status == EventBuilder.STATUS_SUCCESS:
- event["status"] = EventBuilder.STATUS_UNEXPECTED_SUCCESS
-
- def handle_event(self, test_event):
- """Handles the test event for collection into the formatter output.
-
- Derived classes may override this but should call down to this
- implementation first.
-
- @param test_event the test event as formatted by one of the
- event_for_* calls.
- """
- with self.lock:
- # Keep track of whether terminate was received. We do this so
- # that a process can call the 'terminate' event on its own, to
- # close down a formatter at the appropriate time. Then the
- # atexit() cleanup can call the "terminate if it hasn't been
- # called yet".
- if test_event is not None:
- event_type = test_event.get("event", "")
- # We intentionally allow event_type to be checked anew
- # after this check below since this check may rewrite
- # the event type
- if event_type == EventBuilder.TYPE_JOB_RESULT:
- # Possibly convert the job status (timeout,
- # exceptional exit) # to an appropriate test_result event.
- self._maybe_remap_job_result_event(test_event)
- event_type = test_event.get("event", "")
-
- # Remap timeouts to expected timeouts.
- if event_type in EventBuilder.RESULT_TYPES:
- self._maybe_remap_expected_timeout(test_event)
- self._maybe_remap_expected_failure(test_event)
- event_type = test_event.get("event", "")
-
- if event_type == "terminate":
- self.terminate_called = True
- elif event_type in EventBuilder.RESULT_TYPES:
- # Clear the most recently started test for the related
- # worker.
- worker_index = test_event.get("worker_index", None)
- if worker_index is not None:
- self.started_tests_by_worker.pop(worker_index, None)
- status = test_event["status"]
- if status in EventBuilder.TESTRUN_ERROR_STATUS_VALUES:
- # A test/job status value in any of those status values
- # causes a testrun failure. If such a test fails, check
- # whether it can be rerun. If it can be rerun, add it
- # to the rerun job.
- self._maybe_add_test_to_rerun_list(test_event)
-
- # Build the test key.
- test_key = self._make_key(test_event)
- if test_key is None:
- raise Exception(
- "failed to find test filename for "
- "test event {}".format(test_event))
-
- # Save the most recent test event for the test key. This
- # allows a second test phase to overwrite the most recent
- # result for the test key (unique per method). We do final
- # reporting at the end, so we'll report based on final
- # results. We do this so that a re-run caused by, perhaps,
- # the need to run a low-load, single-worker test run can
- # have the final run's results to always be used.
- if test_key in self.result_events:
- self.test_method_rerun_count += 1
- self.result_events[test_key] = test_event
- elif event_type == EventBuilder.TYPE_TEST_START:
- # Track the start time for the test method.
- self.track_start_time(
- test_event["test_class"],
- test_event["test_name"],
- test_event["event_time"])
- # Track of the most recent test method start event
- # for the related worker. This allows us to figure
- # out whether a process timeout or exceptional exit
- # can be charged (i.e. assigned) to a test method.
- worker_index = test_event.get("worker_index", None)
- if worker_index is not None:
- self.started_tests_by_worker[worker_index] = test_event
-
- elif event_type == EventBuilder.TYPE_MARK_TEST_RERUN_ELIGIBLE:
- self._mark_test_for_rerun_eligibility(test_event)
- elif (event_type ==
- EventBuilder.TYPE_MARK_TEST_EXPECTED_FAILURE):
- self._mark_test_as_expected_failure(test_event)
-
- def set_expected_timeouts_by_basename(self, basenames):
- """Specifies a list of test file basenames that are allowed to timeout
- without being called out as a timeout issue.
-
- These fall into a new status category called STATUS_EXPECTED_TIMEOUT.
- """
- if basenames is not None:
- for basename in basenames:
- self.expected_timeouts_by_basename.add(basename)
-
- def track_start_time(self, test_class, test_name, start_time):
- """tracks the start time of a test so elapsed time can be computed.
-
- this alleviates the need for test results to be processed serially
- by test. it will save the start time for the test so that
- elapsed_time_for_test() can compute the elapsed time properly.
- """
- if test_class is None or test_name is None:
- return
-
- test_key = "{}.{}".format(test_class, test_name)
- self.start_time_by_test[test_key] = start_time
-
- def elapsed_time_for_test(self, test_class, test_name, end_time):
- """returns the elapsed time for a test.
-
- this function can only be called once per test and requires that
- the track_start_time() method be called sometime prior to calling
- this method.
- """
- if test_class is None or test_name is None:
- return -2.0
-
- test_key = "{}.{}".format(test_class, test_name)
- if test_key not in self.start_time_by_test:
- return -1.0
- else:
- start_time = self.start_time_by_test[test_key]
- del self.start_time_by_test[test_key]
- return end_time - start_time
-
- def is_using_terminal(self):
- """returns true if this results formatter is using the terminal and
- output should be avoided."""
- return self.using_terminal
-
- def send_terminate_as_needed(self):
- """sends the terminate event if it hasn't been received yet."""
- if not self.terminate_called:
- terminate_event = EventBuilder.bare_event("terminate")
- self.handle_event(terminate_event)
-
- # Derived classes may require self access
- # pylint: disable=no-self-use
- # noinspection PyMethodMayBeStatic,PyMethodMayBeStatic
- def replaces_summary(self):
- """Returns whether the results formatter includes a summary
- suitable to replace the old lldb test run results.
-
- @return True if the lldb test runner can skip its summary
- generation when using this results formatter; False otherwise.
- """
- return False
-
- def counts_by_test_result_status(self, status):
- """Returns number of test method results for the given status.
-
- @status_result a test result status (e.g. success, fail, skip)
- as defined by the EventBuilder.STATUS_* class members.
-
- @return an integer returning the number of test methods matching
- the given test result status.
- """
- return len([
- [key, event] for (key, event) in self.result_events.items()
- if event.get("status", "") == status])
-
- @classmethod
- def _event_sort_key(cls, event):
- """Returns the sort key to be used for a test event.
-
- This method papers over the differences in a test method result vs. a
- job (i.e. inferior process) result.
-
- @param event a test result or job result event.
- @return a key useful for sorting events by name (test name preferably,
- then by test filename).
- """
- if "test_name" in event:
- return event["test_name"]
- else:
- return event.get("test_filename", None)
-
- def _partition_results_by_status(self, categories):
- """Partitions the captured test results by event status.
-
- This permits processing test results by the category ids.
-
- @param categories the list of categories on which to partition.
- Follows the format described in _report_category_details().
-
- @return a dictionary where each key is the test result status,
- and each entry is a list containing all the test result events
- that matched that test result status. Result status IDs with
- no matching entries will have a zero-length list.
- """
- partitioned_events = {}
- for category in categories:
- result_status_id = category[0]
- matching_events = [
- [key, event] for (key, event) in self.result_events.items()
- if event.get("status", "") == result_status_id]
- partitioned_events[result_status_id] = sorted(
- matching_events,
- key=lambda x: self._event_sort_key(x[1]))
- return partitioned_events
-
- @staticmethod
- def _print_banner(out_file, banner_text):
- """Prints an ASCII banner around given text.
-
- Output goes to the out file for the results formatter.
-
- @param out_file a file-like object where output will be written.
- @param banner_text the text to display, with a banner
- of '=' around the line above and line below.
- """
- banner_separator = "".ljust(len(banner_text), "=")
-
- out_file.write("\n{}\n{}\n{}\n".format(
- banner_separator,
- banner_text,
- banner_separator))
-
- def _print_summary_counts(
- self, out_file, categories, result_events_by_status, extra_rows):
- """Prints summary counts for all categories.
-
- @param out_file a file-like object used to print output.
-
- @param categories the list of categories on which to partition.
- Follows the format described in _report_category_details().
-
- @param result_events_by_status the partitioned list of test
- result events in a dictionary, with the key set to the test
- result status id and the value set to the list of test method
- results that match the status id.
- """
-
- # Get max length for category printed name
- category_with_max_printed_name = max(
- categories, key=lambda x: len(x[1]))
- max_category_name_length = len(category_with_max_printed_name[1])
-
- # If we are provided with extra rows, consider these row name lengths.
- if extra_rows is not None:
- for row in extra_rows:
- name_length = len(row[0])
- if name_length > max_category_name_length:
- max_category_name_length = name_length
-
- self._print_banner(out_file, "Test Result Summary")
-
- # Prepend extra rows
- if extra_rows is not None:
- for row in extra_rows:
- extra_label = "{}:".format(row[0]).ljust(
- max_category_name_length + 1)
- out_file.write("{} {:4}\n".format(extra_label, row[1]))
-
- for category in categories:
- result_status_id = category[0]
- result_label = "{}:".format(category[1]).ljust(
- max_category_name_length + 1)
- count = len(result_events_by_status[result_status_id])
- out_file.write("{} {:4}\n".format(
- result_label,
- count))
-
- @classmethod
- def _has_printable_details(cls, categories, result_events_by_status):
- """Returns whether there are any test result details that need to be printed.
-
- This will spin through the results and see if any result in a category
- that is printable has any results to print.
-
- @param categories the list of categories on which to partition.
- Follows the format described in _report_category_details().
-
- @param result_events_by_status the partitioned list of test
- result events in a dictionary, with the key set to the test
- result status id and the value set to the list of test method
- results that match the status id.
-
- @return True if there are any details (i.e. test results
- for failures, errors, unexpected successes); False otherwise.
- """
- for category in categories:
- result_status_id = category[0]
- print_matching_tests = category[2]
- if print_matching_tests:
- if len(result_events_by_status[result_status_id]) > 0:
- # We found a printable details test result status
- # that has details to print.
- return True
- # We didn't find any test result category with printable
- # details.
- return False
-
- @staticmethod
- def _report_category_details(out_file, category, result_events_by_status):
- """Reports all test results matching the given category spec.
-
- @param out_file a file-like object used to print output.
-
- @param category a category spec of the format [test_event_name,
- printed_category_name, print_matching_entries?]
-
- @param result_events_by_status the partitioned list of test
- result events in a dictionary, with the key set to the test
- result status id and the value set to the list of test method
- results that match the status id.
- """
- result_status_id = category[0]
- print_matching_tests = category[2]
- detail_label = category[3]
-
- if print_matching_tests:
- # Sort by test name
- for (_, event) in result_events_by_status[result_status_id]:
- # Convert full test path into test-root-relative.
- test_relative_path = os.path.relpath(
- os.path.realpath(event["test_filename"]),
- lldbsuite.lldb_test_root)
-
- # Create extra info component (used for exceptional exit info)
- if result_status_id == EventBuilder.STATUS_EXCEPTIONAL_EXIT:
- extra_info = "[EXCEPTIONAL EXIT {} ({})] ".format(
- event["exception_code"],
- event["exception_description"])
- else:
- extra_info = ""
-
- # Figure out the identity we will use for this test.
- if configuration.verbose and ("test_class" in event):
- test_id = "{}.{}".format(
- event["test_class"], event["test_name"])
- elif "test_name" in event:
- test_id = event["test_name"]
- else:
- test_id = "<no_running_test_method>"
-
- # Display the info.
- out_file.write("{}: {}{} ({})\n".format(
- detail_label,
- extra_info,
- test_id,
- test_relative_path))
-
- def print_results(self, out_file):
- """Writes the test result report to the output file.
-
- @param out_file a file-like object used for printing summary
- results. This is different than self.out_file, which might
- be something else for non-summary data.
- """
- extra_results = [
- # Total test methods processed, excluding reruns.
- ["Test Methods", len(self.result_events)],
- ["Reruns", self.test_method_rerun_count]]
-
- # Output each of the test result entries.
- categories = [
- # result id, printed name, print matching tests?, detail label
- [EventBuilder.STATUS_SUCCESS,
- "Success", False, None],
- [EventBuilder.STATUS_EXPECTED_FAILURE,
- "Expected Failure", False, None],
- [EventBuilder.STATUS_FAILURE,
- "Failure", True, "FAIL"],
- [EventBuilder.STATUS_ERROR,
- "Error", True, "ERROR"],
- [EventBuilder.STATUS_EXCEPTIONAL_EXIT,
- "Exceptional Exit", True, "ERROR"],
- [EventBuilder.STATUS_UNEXPECTED_SUCCESS,
- "Unexpected Success", True, "UNEXPECTED SUCCESS"],
- [EventBuilder.STATUS_SKIP, "Skip", False, None],
- [EventBuilder.STATUS_TIMEOUT,
- "Timeout", True, "TIMEOUT"],
- [EventBuilder.STATUS_EXPECTED_TIMEOUT,
- # Intentionally using the unusual hyphenation in TIME-OUT to
- # prevent buildbots from thinking it is an issue when scanning
- # for TIMEOUT.
- "Expected Timeout", True, "EXPECTED TIME-OUT"]
- ]
-
- # Partition all the events by test result status
- result_events_by_status = self._partition_results_by_status(
- categories)
-
- # Print the details
- have_details = self._has_printable_details(
- categories, result_events_by_status)
- if have_details:
- self._print_banner(out_file, "Issue Details")
- for category in categories:
- self._report_category_details(
- out_file, category, result_events_by_status)
-
- # Print the summary
- self._print_summary_counts(
- out_file, categories, result_events_by_status, extra_results)
-
- if self.options.dump_results:
- # Debug dump of the key/result info for all categories.
- self._print_banner(out_file, "Results Dump")
- for status, events_by_key in result_events_by_status.items():
- out_file.write("\nSTATUS: {}\n".format(status))
- for key, event in events_by_key:
- out_file.write("key: {}\n".format(key))
- out_file.write("event: {}\n".format(event))
-
- def clear_file_level_issues(self, tests_for_rerun, out_file):
- """Clear file-charged issues in any of the test rerun files.
-
- @param tests_for_rerun the list of test-dir-relative paths that have
- functions that require rerunning. This is the test list
- returned by the results_formatter at the end of the previous run.
-
- @return the number of file-level issues that were cleared.
- """
- if tests_for_rerun is None:
- return 0
-
- cleared_file_level_issues = 0
- # Find the unique set of files that are covered by the given tests
- # that are to be rerun. We derive the files that are eligible for
- # having their markers cleared, because we support running in a mode
- # where only flaky tests are eligible for rerun. If the file-level
- # issue occurred in a file that was not marked as flaky, then we
- # shouldn't be clearing the event here.
- basename_set = set()
- for test_file_relpath in tests_for_rerun:
- basename_set.add(os.path.basename(test_file_relpath))
-
- # Find all the keys for file-level events that are considered
- # test issues.
- file_level_issues = [(key, event)
- for key, event in self.result_events.items()
- if ResultsFormatter._is_file_level_issue(
- key, event)
- and event.get("status", "") in
- EventBuilder.TESTRUN_ERROR_STATUS_VALUES]
-
- # Now remove any file-level error for the given test base name.
- for key, event in file_level_issues:
- # If the given file base name is in the rerun set, then we
- # clear that entry from the result set.
- if os.path.basename(key) in basename_set:
- self.result_events.pop(key, None)
- cleared_file_level_issues += 1
- if out_file is not None:
- out_file.write(
- "clearing file-level issue for file {} "
- "(issue type: {})"
- .format(key, event.get("status", "<unset-status>")))
-
- return cleared_file_level_issues
diff --git a/packages/Python/lldbsuite/test_event/formatter/xunit.py b/packages/Python/lldbsuite/test_event/formatter/xunit.py
deleted file mode 100644
index 4c53ff8062d7..000000000000
--- a/packages/Python/lldbsuite/test_event/formatter/xunit.py
+++ /dev/null
@@ -1,596 +0,0 @@
-"""
- The LLVM Compiler Infrastructure
-
-This file is distributed under the University of Illinois Open Source
-License. See LICENSE.TXT for details.
-
-Provides an xUnit ResultsFormatter for integrating the LLDB
-test suite with the Jenkins xUnit aggregator and other xUnit-compliant
-test output processors.
-"""
-from __future__ import absolute_import
-from __future__ import print_function
-
-# System modules
-import re
-import sys
-import xml.sax.saxutils
-
-# Third-party modules
-import six
-
-# Local modules
-from ..event_builder import EventBuilder
-from ..build_exception import BuildError
-from .results_formatter import ResultsFormatter
-
-
-class XunitFormatter(ResultsFormatter):
- """Provides xUnit-style formatted output.
- """
-
- # Result mapping arguments
- RM_IGNORE = 'ignore'
- RM_SUCCESS = 'success'
- RM_FAILURE = 'failure'
- RM_PASSTHRU = 'passthru'
-
- @staticmethod
- def _build_illegal_xml_regex():
- """Constructs a regex to match all illegal xml characters.
-
- Expects to be used against a unicode string."""
- # Construct the range pairs of invalid unicode characters.
- illegal_chars_u = [
- (0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), (0x7F, 0x84),
- (0x86, 0x9F), (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)]
-
- # For wide builds, we have more.
- if sys.maxunicode >= 0x10000:
- illegal_chars_u.extend(
- [(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), (0x3FFFE, 0x3FFFF),
- (0x4FFFE, 0x4FFFF), (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF),
- (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), (0x9FFFE, 0x9FFFF),
- (0xAFFFE, 0xAFFFF), (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF),
- (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), (0xFFFFE, 0xFFFFF),
- (0x10FFFE, 0x10FFFF)])
-
- # Build up an array of range expressions.
- illegal_ranges = [
- "%s-%s" % (six.unichr(low), six.unichr(high))
- for (low, high) in illegal_chars_u]
-
- # Compile the regex
- return re.compile(six.u('[%s]') % six.u('').join(illegal_ranges))
-
- @staticmethod
- def _quote_attribute(text):
- """Returns the given text in a manner safe for usage in an XML attribute.
-
- @param text the text that should appear within an XML attribute.
- @return the attribute-escaped version of the input text.
- """
- return xml.sax.saxutils.quoteattr(text)
-
- def _replace_invalid_xml(self, str_or_unicode):
- """Replaces invalid XML characters with a '?'.
-
- @param str_or_unicode a string to replace invalid XML
- characters within. Can be unicode or not. If not unicode,
- assumes it is a byte string in utf-8 encoding.
-
- @returns a utf-8-encoded byte string with invalid
- XML replaced with '?'.
- """
- # Get the content into unicode
- if isinstance(str_or_unicode, str):
- # If we hit decoding errors due to data corruption, replace the
- # invalid characters with U+FFFD REPLACEMENT CHARACTER.
- unicode_content = str_or_unicode.decode('utf-8', 'replace')
- else:
- unicode_content = str_or_unicode
- return self.invalid_xml_re.sub(
- six.u('?'), unicode_content).encode('utf-8')
-
- @classmethod
- def arg_parser(cls):
- """@return arg parser used to parse formatter-specific options."""
- parser = super(XunitFormatter, cls).arg_parser()
-
- # These are valid choices for results mapping.
- results_mapping_choices = [
- XunitFormatter.RM_IGNORE,
- XunitFormatter.RM_SUCCESS,
- XunitFormatter.RM_FAILURE,
- XunitFormatter.RM_PASSTHRU]
- parser.add_argument(
- "--assert-on-unknown-events",
- action="store_true",
- help=('cause unknown test events to generate '
- 'a python assert. Default is to ignore.'))
- parser.add_argument(
- "--ignore-skip-name",
- "-n",
- metavar='PATTERN',
- action="append",
- dest='ignore_skip_name_patterns',
- help=('a python regex pattern, where '
- 'any skipped test with a test method name where regex '
- 'matches (via search) will be ignored for xUnit test '
- 'result purposes. Can be specified multiple times.'))
- parser.add_argument(
- "--ignore-skip-reason",
- "-r",
- metavar='PATTERN',
- action="append",
- dest='ignore_skip_reason_patterns',
- help=('a python regex pattern, where '
- 'any skipped test with a skip reason where the regex '
- 'matches (via search) will be ignored for xUnit test '
- 'result purposes. Can be specified multiple times.'))
- parser.add_argument(
- "--xpass", action="store", choices=results_mapping_choices,
- default=XunitFormatter.RM_FAILURE,
- help=('specify mapping from unexpected success to jUnit/xUnit '
- 'result type'))
- parser.add_argument(
- "--xfail", action="store", choices=results_mapping_choices,
- default=XunitFormatter.RM_IGNORE,
- help=('specify mapping from expected failure to jUnit/xUnit '
- 'result type'))
- return parser
-
- @staticmethod
- def _build_regex_list_from_patterns(patterns):
- """Builds a list of compiled regular expressions from option value.
-
- @param patterns contains a list of regular expression
- patterns.
-
- @return list of compiled regular expressions, empty if no
- patterns provided.
- """
- regex_list = []
- if patterns is not None:
- for pattern in patterns:
- regex_list.append(re.compile(pattern))
- return regex_list
-
- def __init__(self, out_file, options, file_is_stream):
- """Initializes the XunitFormatter instance.
- @param out_file file-like object where formatted output is written.
- @param options specifies a dictionary of options for the
- formatter.
- """
- # Initialize the parent
- super(XunitFormatter, self).__init__(out_file, options, file_is_stream)
- self.text_encoding = "UTF-8"
- self.invalid_xml_re = XunitFormatter._build_illegal_xml_regex()
- self.total_test_count = 0
- self.ignore_skip_name_regexes = (
- XunitFormatter._build_regex_list_from_patterns(
- options.ignore_skip_name_patterns))
- self.ignore_skip_reason_regexes = (
- XunitFormatter._build_regex_list_from_patterns(
- options.ignore_skip_reason_patterns))
-
- self.elements = {
- "successes": [],
- "errors": [],
- "failures": [],
- "skips": [],
- "unexpected_successes": [],
- "expected_failures": [],
- "all": []
- }
-
- self.status_handlers = {
- EventBuilder.STATUS_SUCCESS: self._handle_success,
- EventBuilder.STATUS_FAILURE: self._handle_failure,
- EventBuilder.STATUS_ERROR: self._handle_error,
- EventBuilder.STATUS_SKIP: self._handle_skip,
- EventBuilder.STATUS_EXPECTED_FAILURE:
- self._handle_expected_failure,
- EventBuilder.STATUS_EXPECTED_TIMEOUT:
- self._handle_expected_timeout,
- EventBuilder.STATUS_UNEXPECTED_SUCCESS:
- self._handle_unexpected_success,
- EventBuilder.STATUS_EXCEPTIONAL_EXIT:
- self._handle_exceptional_exit,
- EventBuilder.STATUS_TIMEOUT:
- self._handle_timeout
- }
-
- RESULT_TYPES = {
- EventBuilder.TYPE_TEST_RESULT,
- EventBuilder.TYPE_JOB_RESULT}
-
- def handle_event(self, test_event):
- super(XunitFormatter, self).handle_event(test_event)
-
- event_type = test_event["event"]
- if event_type is None:
- return
-
- if event_type == "terminate":
- # Process all the final result events into their
- # XML counterparts.
- for result_event in self.result_events.values():
- self._process_test_result(result_event)
- self._finish_output()
- else:
- # This is an unknown event.
- if self.options.assert_on_unknown_events:
- raise Exception("unknown event type {} from {}\n".format(
- event_type, test_event))
-
- def _handle_success(self, test_event):
- """Handles a test success.
- @param test_event the test event to handle.
- """
- result = self._common_add_testcase_entry(test_event)
- with self.lock:
- self.elements["successes"].append(result)
-
- def _handle_failure(self, test_event):
- """Handles a test failure.
- @param test_event the test event to handle.
- """
- message = self._replace_invalid_xml(test_event["issue_message"])
- backtrace = self._replace_invalid_xml(
- "".join(test_event.get("issue_backtrace", [])))
-
- result = self._common_add_testcase_entry(
- test_event,
- inner_content=(
- '<failure type={} message={}><![CDATA[{}]]></failure>'.format(
- XunitFormatter._quote_attribute(test_event["issue_class"]),
- XunitFormatter._quote_attribute(message),
- backtrace)
- ))
- with self.lock:
- self.elements["failures"].append(result)
-
- def _handle_error_build(self, test_event):
- """Handles a test error.
- @param test_event the test event to handle.
- """
- message = self._replace_invalid_xml(test_event["issue_message"])
- build_issue_description = self._replace_invalid_xml(
- BuildError.format_build_error(
- test_event.get("build_command", "<None>"),
- test_event.get("build_error", "<None>")))
-
- result = self._common_add_testcase_entry(
- test_event,
- inner_content=(
- '<error type={} message={}><![CDATA[{}]]></error>'.format(
- XunitFormatter._quote_attribute(test_event["issue_class"]),
- XunitFormatter._quote_attribute(message),
- build_issue_description)
- ))
- with self.lock:
- self.elements["errors"].append(result)
-
- def _handle_error_standard(self, test_event):
- """Handles a test error.
- @param test_event the test event to handle.
- """
- message = self._replace_invalid_xml(test_event["issue_message"])
- backtrace = self._replace_invalid_xml(
- "".join(test_event.get("issue_backtrace", [])))
-
- result = self._common_add_testcase_entry(
- test_event,
- inner_content=(
- '<error type={} message={}><![CDATA[{}]]></error>'.format(
- XunitFormatter._quote_attribute(test_event["issue_class"]),
- XunitFormatter._quote_attribute(message),
- backtrace)
- ))
- with self.lock:
- self.elements["errors"].append(result)
-
- def _handle_error(self, test_event):
- if test_event.get("issue_phase", None) == "build":
- self._handle_error_build(test_event)
- else:
- self._handle_error_standard(test_event)
-
- def _handle_exceptional_exit(self, test_event):
- """Handles an exceptional exit.
- @param test_event the test method or job result event to handle.
- """
- if "test_name" in test_event:
- name = test_event["test_name"]
- else:
- name = test_event.get("test_filename", "<unknown test/filename>")
-
- message_text = "ERROR: {} ({}): {}".format(
- test_event.get("exception_code", 0),
- test_event.get("exception_description", ""),
- name)
- message = self._replace_invalid_xml(message_text)
-
- result = self._common_add_testcase_entry(
- test_event,
- inner_content=(
- '<error type={} message={}></error>'.format(
- "exceptional_exit",
- XunitFormatter._quote_attribute(message))
- ))
- with self.lock:
- self.elements["errors"].append(result)
-
- def _handle_timeout(self, test_event):
- """Handles a test method or job timeout.
- @param test_event the test method or job result event to handle.
- """
- if "test_name" in test_event:
- name = test_event["test_name"]
- else:
- name = test_event.get("test_filename", "<unknown test/filename>")
-
- message_text = "TIMEOUT: {}".format(name)
- message = self._replace_invalid_xml(message_text)
-
- result = self._common_add_testcase_entry(
- test_event,
- inner_content=(
- '<error type={} message={}></error>'.format(
- XunitFormatter._quote_attribute("timeout"),
- XunitFormatter._quote_attribute(message))
- ))
- with self.lock:
- self.elements["errors"].append(result)
-
- @staticmethod
- def _ignore_based_on_regex_list(test_event, test_key, regex_list):
- """Returns whether to ignore a test event based on patterns.
-
- @param test_event the test event dictionary to check.
- @param test_key the key within the dictionary to check.
- @param regex_list a list of zero or more regexes. May contain
- zero or more compiled regexes.
-
- @return True if any o the regex list match based on the
- re.search() method; false otherwise.
- """
- for regex in regex_list:
- match = regex.search(test_event.get(test_key, ''))
- if match:
- return True
- return False
-
- def _handle_skip(self, test_event):
- """Handles a skipped test.
- @param test_event the test event to handle.
- """
-
- # Are we ignoring this test based on test name?
- if XunitFormatter._ignore_based_on_regex_list(
- test_event, 'test_name', self.ignore_skip_name_regexes):
- return
-
- # Are we ignoring this test based on skip reason?
- if XunitFormatter._ignore_based_on_regex_list(
- test_event, 'skip_reason', self.ignore_skip_reason_regexes):
- return
-
- # We're not ignoring this test. Process the skip.
- reason = self._replace_invalid_xml(test_event.get("skip_reason", ""))
- result = self._common_add_testcase_entry(
- test_event,
- inner_content='<skipped message={} />'.format(
- XunitFormatter._quote_attribute(reason)))
- with self.lock:
- self.elements["skips"].append(result)
-
- def _handle_expected_failure(self, test_event):
- """Handles a test that failed as expected.
- @param test_event the test event to handle.
- """
- if self.options.xfail == XunitFormatter.RM_PASSTHRU:
- # This is not a natively-supported junit/xunit
- # testcase mode, so it might fail a validating
- # test results viewer.
- if "bugnumber" in test_event:
- bug_id_attribute = 'bug-id={} '.format(
- XunitFormatter._quote_attribute(test_event["bugnumber"]))
- else:
- bug_id_attribute = ''
-
- result = self._common_add_testcase_entry(
- test_event,
- inner_content=(
- '<expected-failure {}type={} message={} />'.format(
- bug_id_attribute,
- XunitFormatter._quote_attribute(
- test_event["issue_class"]),
- XunitFormatter._quote_attribute(
- test_event["issue_message"]))
- ))
- with self.lock:
- self.elements["expected_failures"].append(result)
- elif self.options.xfail == XunitFormatter.RM_SUCCESS:
- result = self._common_add_testcase_entry(test_event)
- with self.lock:
- self.elements["successes"].append(result)
- elif self.options.xfail == XunitFormatter.RM_FAILURE:
- result = self._common_add_testcase_entry(
- test_event,
- inner_content='<failure type={} message={} />'.format(
- XunitFormatter._quote_attribute(test_event["issue_class"]),
- XunitFormatter._quote_attribute(
- test_event["issue_message"])))
- with self.lock:
- self.elements["failures"].append(result)
- elif self.options.xfail == XunitFormatter.RM_IGNORE:
- pass
- else:
- raise Exception(
- "unknown xfail option: {}".format(self.options.xfail))
-
- @staticmethod
- def _handle_expected_timeout(test_event):
- """Handles expected_timeout.
- @param test_event the test event to handle.
- """
- # We don't do anything with expected timeouts, not even report.
- pass
-
- def _handle_unexpected_success(self, test_event):
- """Handles a test that passed but was expected to fail.
- @param test_event the test event to handle.
- """
- if self.options.xpass == XunitFormatter.RM_PASSTHRU:
- # This is not a natively-supported junit/xunit
- # testcase mode, so it might fail a validating
- # test results viewer.
- result = self._common_add_testcase_entry(
- test_event,
- inner_content="<unexpected-success />")
- with self.lock:
- self.elements["unexpected_successes"].append(result)
- elif self.options.xpass == XunitFormatter.RM_SUCCESS:
- # Treat the xpass as a success.
- result = self._common_add_testcase_entry(test_event)
- with self.lock:
- self.elements["successes"].append(result)
- elif self.options.xpass == XunitFormatter.RM_FAILURE:
- # Treat the xpass as a failure.
- if "bugnumber" in test_event:
- message = "unexpected success (bug_id:{})".format(
- test_event["bugnumber"])
- else:
- message = "unexpected success (bug_id:none)"
- result = self._common_add_testcase_entry(
- test_event,
- inner_content='<failure type={} message={} />'.format(
- XunitFormatter._quote_attribute("unexpected_success"),
- XunitFormatter._quote_attribute(message)))
- with self.lock:
- self.elements["failures"].append(result)
- elif self.options.xpass == XunitFormatter.RM_IGNORE:
- # Ignore the xpass result as far as xUnit reporting goes.
- pass
- else:
- raise Exception("unknown xpass option: {}".format(
- self.options.xpass))
-
- def _process_test_result(self, test_event):
- """Processes the test_event known to be a test result.
-
- This categorizes the event appropriately and stores the data needed
- to generate the final xUnit report. This method skips events that
- cannot be represented in xUnit output.
- """
- if "status" not in test_event:
- raise Exception("test event dictionary missing 'status' key")
-
- status = test_event["status"]
- if status not in self.status_handlers:
- raise Exception("test event status '{}' unsupported".format(
- status))
-
- # Call the status handler for the test result.
- self.status_handlers[status](test_event)
-
- def _common_add_testcase_entry(self, test_event, inner_content=None):
- """Registers a testcase result, and returns the text created.
-
- The caller is expected to manage failure/skip/success counts
- in some kind of appropriate way. This call simply constructs
- the XML and appends the returned result to the self.all_results
- list.
-
- @param test_event the test event dictionary.
-
- @param inner_content if specified, gets included in the <testcase>
- inner section, at the point before stdout and stderr would be
- included. This is where a <failure/>, <skipped/>, <error/>, etc.
- could go.
-
- @return the text of the xml testcase element.
- """
-
- # Get elapsed time.
- test_class = test_event.get("test_class", "<no_class>")
- test_name = test_event.get("test_name", "<no_test_method>")
- event_time = test_event["event_time"]
- time_taken = self.elapsed_time_for_test(
- test_class, test_name, event_time)
-
- # Plumb in stdout/stderr once we shift over to only test results.
- test_stdout = ''
- test_stderr = ''
-
- # Formulate the output xml.
- if not inner_content:
- inner_content = ""
- result = (
- '<testcase classname="{}" name="{}" time="{:.3f}">'
- '{}{}{}</testcase>'.format(
- test_class,
- test_name,
- time_taken,
- inner_content,
- test_stdout,
- test_stderr))
-
- # Save the result, update total test count.
- with self.lock:
- self.total_test_count += 1
- self.elements["all"].append(result)
-
- return result
-
- def _finish_output_no_lock(self):
- """Flushes out the report of test executions to form valid xml output.
-
- xUnit output is in XML. The reporting system cannot complete the
- formatting of the output without knowing when there is no more input.
- This call addresses notification of the completed test run and thus is
- when we can finish off the report output.
- """
-
- # Figure out the counts line for the testsuite. If we have
- # been counting either unexpected successes or expected
- # failures, we'll output those in the counts, at the risk of
- # being invalidated by a validating test results viewer.
- # These aren't counted by default so they won't show up unless
- # the user specified a formatter option to include them.
- xfail_count = len(self.elements["expected_failures"])
- xpass_count = len(self.elements["unexpected_successes"])
- if xfail_count > 0 or xpass_count > 0:
- extra_testsuite_attributes = (
- ' expected-failures="{}"'
- ' unexpected-successes="{}"'.format(xfail_count, xpass_count))
- else:
- extra_testsuite_attributes = ""
-
- # Output the header.
- self.out_file.write(
- '<?xml version="1.0" encoding="{}"?>\n'
- '<testsuites>'
- '<testsuite name="{}" tests="{}" errors="{}" failures="{}" '
- 'skip="{}"{}>\n'.format(
- self.text_encoding,
- "LLDB test suite",
- self.total_test_count,
- len(self.elements["errors"]),
- len(self.elements["failures"]),
- len(self.elements["skips"]),
- extra_testsuite_attributes))
-
- # Output each of the test result entries.
- for result in self.elements["all"]:
- self.out_file.write(result + '\n')
-
- # Close off the test suite.
- self.out_file.write('</testsuite></testsuites>\n')
-
- def _finish_output(self):
- """Finish writing output as all incoming events have arrived."""
- with self.lock:
- self._finish_output_no_lock()
diff --git a/packages/Python/lldbsuite/test_event/test/resources/invalid_decorator/TestInvalidDecorator.py b/packages/Python/lldbsuite/test_event/test/resources/invalid_decorator/TestInvalidDecorator.py
deleted file mode 100644
index 7f5c4cb79cf5..000000000000
--- a/packages/Python/lldbsuite/test_event/test/resources/invalid_decorator/TestInvalidDecorator.py
+++ /dev/null
@@ -1,13 +0,0 @@
-from __future__ import print_function
-from lldbsuite.test import lldbtest
-from lldbsuite.test import decorators
-
-
-class NonExistentDecoratorTestCase(lldbtest.TestBase):
-
- mydir = lldbtest.TestBase.compute_mydir(__file__)
-
- @decorators.nonExistentDecorator(bugnumber="yt/1300")
- def test(self):
- """Verify non-existent decorators are picked up by test runner."""
- pass
diff --git a/packages/Python/lldbsuite/test_event/test/src/TestCatchInvalidDecorator.py b/packages/Python/lldbsuite/test_event/test/src/TestCatchInvalidDecorator.py
deleted file mode 100644
index 5b199defc5df..000000000000
--- a/packages/Python/lldbsuite/test_event/test/src/TestCatchInvalidDecorator.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#!/usr/bin/env python
-"""
-Tests that the event system reports issues during decorator
-handling as errors.
-"""
-# System-provided imports
-import os
-import unittest
-
-# Local-provided imports
-import event_collector
-
-
-class TestCatchInvalidDecorator(unittest.TestCase):
-
- TEST_DIR = os.path.join(
- os.path.dirname(__file__),
- os.path.pardir,
- "resources",
- "invalid_decorator")
-
- def test_with_whole_file(self):
- """
- Test that a non-existent decorator generates a test-event error
- when running all tests in the file.
- """
- # Determine the test case file we're using.
- test_file = os.path.join(self.TEST_DIR, "TestInvalidDecorator.py")
-
- # Collect all test events generated for this file.
- error_results = _filter_error_results(
- event_collector.collect_events_whole_file(test_file))
-
- self.assertGreater(
- len(error_results),
- 0,
- "At least one job or test error result should have been returned")
-
- def test_with_function_filter(self):
- """
- Test that a non-existent decorator generates a test-event error
- when running a filtered test.
- """
- # Collect all test events generated during running of tests
- # in a given directory using a test name filter. Internally,
- # this runs through a different code path that needs to be
- # set up to catch exceptions.
- error_results = _filter_error_results(
- event_collector.collect_events_for_directory_with_filter(
- self.TEST_DIR,
- "NonExistentDecoratorTestCase.test"))
-
- self.assertGreater(
- len(error_results),
- 0,
- "At least one job or test error result should have been returned")
-
-
-def _filter_error_results(events):
- # Filter out job result events.
- return [
- event
- for event in events
- if event.get("event", None) in ["job_result", "test_result"] and
- event.get("status", None) == "error"
- ]
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/packages/Python/lldbsuite/test_event/test/src/event_collector.py b/packages/Python/lldbsuite/test_event/test/src/event_collector.py
deleted file mode 100644
index 6b64cc71ac67..000000000000
--- a/packages/Python/lldbsuite/test_event/test/src/event_collector.py
+++ /dev/null
@@ -1,85 +0,0 @@
-from __future__ import absolute_import
-from __future__ import print_function
-
-import os
-import subprocess
-import sys
-import tempfile
-
-# noinspection PyUnresolvedReferences
-from six.moves import cPickle
-
-
-def path_to_dotest_py():
- return os.path.join(
- os.path.dirname(__file__),
- os.path.pardir,
- os.path.pardir,
- os.path.pardir,
- os.path.pardir,
- os.path.pardir,
- os.path.pardir,
- "test",
- "dotest.py")
-
-
-def _make_pickled_events_filename():
- with tempfile.NamedTemporaryFile(
- prefix="lldb_test_event_pickled_event_output",
- delete=False) as temp_file:
- return temp_file.name
-
-
-def _collect_events_with_command(command, events_filename):
- # Run the single test with dotest.py, outputting
- # the raw pickled events to a temp file.
- with open(os.devnull, 'w') as dev_null_file:
- subprocess.call(
- command,
- stdout=dev_null_file,
- stderr=dev_null_file)
-
- # Unpickle the events
- events = []
- if os.path.exists(events_filename):
- with open(events_filename, "rb") as events_file:
- while True:
- try:
- # print("reading event")
- event = cPickle.load(events_file)
- # print("read event: {}".format(event))
- if event:
- events.append(event)
- except EOFError:
- # This is okay.
- break
- os.remove(events_filename)
- return events
-
-
-def collect_events_whole_file(test_filename):
- events_filename = _make_pickled_events_filename()
- command = [
- sys.executable,
- path_to_dotest_py(),
- "--inferior",
- "--results-formatter=lldbsuite.test_event.formatter.pickled.RawPickledFormatter",
- "--results-file={}".format(events_filename),
- "-p",
- os.path.basename(test_filename),
- os.path.dirname(test_filename)]
- return _collect_events_with_command(command, events_filename)
-
-
-def collect_events_for_directory_with_filter(test_filename, filter_desc):
- events_filename = _make_pickled_events_filename()
- command = [
- sys.executable,
- path_to_dotest_py(),
- "--inferior",
- "--results-formatter=lldbsuite.test_event.formatter.pickled.RawPickledFormatter",
- "--results-file={}".format(events_filename),
- "-f",
- filter_desc,
- os.path.dirname(test_filename)]
- return _collect_events_with_command(command, events_filename)