diff options
Diffstat (limited to 'packages/Python/lldbsuite/test_event')
13 files changed, 0 insertions, 2845 deletions
| diff --git a/packages/Python/lldbsuite/test_event/__init__.py b/packages/Python/lldbsuite/test_event/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 --- a/packages/Python/lldbsuite/test_event/__init__.py +++ /dev/null diff --git a/packages/Python/lldbsuite/test_event/build_exception.py b/packages/Python/lldbsuite/test_event/build_exception.py deleted file mode 100644 index 5b00b92d4738..000000000000 --- a/packages/Python/lldbsuite/test_event/build_exception.py +++ /dev/null @@ -1,16 +0,0 @@ -class BuildError(Exception): - -    def __init__(self, called_process_error): -        super(BuildError, self).__init__("Error when building test subject") -        self.command = called_process_error.lldb_extensions.get( -            "command", "<command unavailable>") -        self.build_error = called_process_error.lldb_extensions.get( -            "stderr_content", "<error output unavailable>") - -    def __str__(self): -        return self.format_build_error(self.command, self.build_error) - -    @staticmethod -    def format_build_error(command, command_output): -        return "Error when building test subject.\n\nBuild Command:\n{}\n\nBuild Command Output:\n{}".format( -            command, command_output) diff --git a/packages/Python/lldbsuite/test_event/dotest_channels.py b/packages/Python/lldbsuite/test_event/dotest_channels.py deleted file mode 100644 index 4f79193514bb..000000000000 --- a/packages/Python/lldbsuite/test_event/dotest_channels.py +++ /dev/null @@ -1,209 +0,0 @@ -""" -                     The LLVM Compiler Infrastructure - -This file is distributed under the University of Illinois Open Source -License. See LICENSE.TXT for details. - -Sync lldb and related source from a local machine to a remote machine. - -This facilitates working on the lldb sourcecode on multiple machines -and multiple OS types, verifying changes across all. - - -This module provides asyncore channels used within the LLDB test -framework. -""" - -from __future__ import print_function -from __future__ import absolute_import - - -# System modules -import asyncore -import socket - -# Third-party modules -from six.moves import cPickle - -# LLDB modules - - -class UnpicklingForwardingReaderChannel(asyncore.dispatcher): -    """Provides an unpickling, forwarding asyncore dispatch channel reader. - -    Inferior dotest.py processes with side-channel-based test results will -    send test result event data in a pickled format, one event at a time. -    This class supports reconstructing the pickled data and forwarding it -    on to its final destination. - -    The channel data is written in the form: -    {num_payload_bytes}#{payload_bytes} - -    The bulk of this class is devoted to reading and parsing out -    the payload bytes. -    """ - -    def __init__(self, file_object, async_map, forwarding_func): -        asyncore.dispatcher.__init__(self, sock=file_object, map=async_map) - -        self.header_contents = b"" -        self.packet_bytes_remaining = 0 -        self.reading_header = True -        self.ibuffer = b'' -        self.forwarding_func = forwarding_func -        if forwarding_func is None: -            # This whole class is useless if we do nothing with the -            # unpickled results. -            raise Exception("forwarding function must be set") - -        # Initiate all connections by sending an ack.  This allows -        # the initiators of the socket to await this to ensure -        # that this end is up and running (and therefore already -        # into the async map). -        ack_bytes = b'*' -        file_object.send(ack_bytes) - -    def deserialize_payload(self): -        """Unpickles the collected input buffer bytes and forwards.""" -        if len(self.ibuffer) > 0: -            self.forwarding_func(cPickle.loads(self.ibuffer)) -            self.ibuffer = b'' - -    def consume_header_bytes(self, data): -        """Consumes header bytes from the front of data. -        @param data the incoming data stream bytes -        @return any data leftover after consuming header bytes. -        """ -        # We're done if there is no content. -        if not data or (len(data) == 0): -            return None - -        full_header_len = 4 - -        assert len(self.header_contents) < full_header_len - -        bytes_avail = len(data) -        bytes_needed = full_header_len - len(self.header_contents) -        header_bytes_avail = min(bytes_needed, bytes_avail) -        self.header_contents += data[:header_bytes_avail] -        if len(self.header_contents) == full_header_len: -            import struct -            # End of header. -            self.packet_bytes_remaining = struct.unpack( -                "!I", self.header_contents)[0] -            self.header_contents = b"" -            self.reading_header = False -            return data[header_bytes_avail:] - -        # If we made it here, we've exhausted the data and -        # we're still parsing header content. -        return None - -    def consume_payload_bytes(self, data): -        """Consumes payload bytes from the front of data. -        @param data the incoming data stream bytes -        @return any data leftover after consuming remaining payload bytes. -        """ -        if not data or (len(data) == 0): -            # We're done and there's nothing to do. -            return None - -        data_len = len(data) -        if data_len <= self.packet_bytes_remaining: -            # We're consuming all the data provided. -            self.ibuffer += data -            self.packet_bytes_remaining -= data_len - -            # If we're no longer waiting for payload bytes, -            # we flip back to parsing header bytes and we -            # unpickle the payload contents. -            if self.packet_bytes_remaining < 1: -                self.reading_header = True -                self.deserialize_payload() - -            # We're done, no more data left. -            return None -        else: -            # We're only consuming a portion of the data since -            # the data contains more than the payload amount. -            self.ibuffer += data[:self.packet_bytes_remaining] -            data = data[self.packet_bytes_remaining:] - -            # We now move on to reading the header. -            self.reading_header = True -            self.packet_bytes_remaining = 0 - -            # And we can deserialize the payload. -            self.deserialize_payload() - -            # Return the remaining data. -            return data - -    def handle_read(self): -        # Read some data from the socket. -        try: -            data = self.recv(8192) -            # print('driver socket READ: %d bytes' % len(data)) -        except socket.error as socket_error: -            print( -                "\nINFO: received socket error when reading data " -                "from test inferior:\n{}".format(socket_error)) -            raise -        except Exception as general_exception: -            print( -                "\nERROR: received non-socket error when reading data " -                "from the test inferior:\n{}".format(general_exception)) -            raise - -        # Consume the message content. -        while data and (len(data) > 0): -            # If we're reading the header, gather header bytes. -            if self.reading_header: -                data = self.consume_header_bytes(data) -            else: -                data = self.consume_payload_bytes(data) - -    def handle_close(self): -        # print("socket reader: closing port") -        self.close() - - -class UnpicklingForwardingListenerChannel(asyncore.dispatcher): -    """Provides a socket listener asyncore channel for unpickling/forwarding. - -    This channel will listen on a socket port (use 0 for host-selected).  Any -    client that connects will have an UnpicklingForwardingReaderChannel handle -    communication over the connection. - -    The dotest parallel test runners, when collecting test results, open the -    test results side channel over a socket.  This channel handles connections -    from inferiors back to the test runner.  Each worker fires up a listener -    for each inferior invocation.  This simplifies the asyncore.loop() usage, -    one of the reasons for implementing with asyncore.  This listener shuts -    down once a single connection is made to it. -    """ - -    def __init__(self, async_map, host, port, backlog_count, forwarding_func): -        asyncore.dispatcher.__init__(self, map=async_map) -        self.create_socket(socket.AF_INET, socket.SOCK_STREAM) -        self.set_reuse_addr() -        self.bind((host, port)) -        self.address = self.socket.getsockname() -        self.listen(backlog_count) -        self.handler = None -        self.async_map = async_map -        self.forwarding_func = forwarding_func -        if forwarding_func is None: -            # This whole class is useless if we do nothing with the -            # unpickled results. -            raise Exception("forwarding function must be set") - -    def handle_accept(self): -        (sock, addr) = self.socket.accept() -        if sock and addr: -            # print('Incoming connection from %s' % repr(addr)) -            self.handler = UnpicklingForwardingReaderChannel( -                sock, self.async_map, self.forwarding_func) - -    def handle_close(self): -        self.close() diff --git a/packages/Python/lldbsuite/test_event/event_builder.py b/packages/Python/lldbsuite/test_event/event_builder.py deleted file mode 100644 index a7cb57c7f9c0..000000000000 --- a/packages/Python/lldbsuite/test_event/event_builder.py +++ /dev/null @@ -1,482 +0,0 @@ -""" -    The LLVM Compiler Infrastructure - -This file is distributed under the University of Illinois Open Source -License. See LICENSE.TXT for details. - -Provides a class to build Python test event data structures. -""" - -from __future__ import print_function -from __future__ import absolute_import - -# System modules -import inspect -import time -import traceback - -# Third-party modules - -# LLDB modules -from . import build_exception - - -class EventBuilder(object): -    """Helper class to build test result event dictionaries.""" - -    BASE_DICTIONARY = None - -    # Test Event Types -    TYPE_JOB_RESULT = "job_result" -    TYPE_TEST_RESULT = "test_result" -    TYPE_TEST_START = "test_start" -    TYPE_MARK_TEST_RERUN_ELIGIBLE = "test_eligible_for_rerun" -    TYPE_MARK_TEST_EXPECTED_FAILURE = "test_expected_failure" -    TYPE_SESSION_TERMINATE = "terminate" - -    RESULT_TYPES = {TYPE_JOB_RESULT, TYPE_TEST_RESULT} - -    # Test/Job Status Tags -    STATUS_EXCEPTIONAL_EXIT = "exceptional_exit" -    STATUS_SUCCESS = "success" -    STATUS_FAILURE = "failure" -    STATUS_EXPECTED_FAILURE = "expected_failure" -    STATUS_EXPECTED_TIMEOUT = "expected_timeout" -    STATUS_UNEXPECTED_SUCCESS = "unexpected_success" -    STATUS_SKIP = "skip" -    STATUS_ERROR = "error" -    STATUS_TIMEOUT = "timeout" - -    """Test methods or jobs with a status matching any of these -    status values will cause a testrun failure, unless -    the test methods rerun and do not trigger an issue when rerun.""" -    TESTRUN_ERROR_STATUS_VALUES = { -        STATUS_ERROR, -        STATUS_EXCEPTIONAL_EXIT, -        STATUS_FAILURE, -        STATUS_TIMEOUT} - -    @staticmethod -    def _get_test_name_info(test): -        """Returns (test-class-name, test-method-name) from a test case instance. - -        @param test a unittest.TestCase instance. - -        @return tuple containing (test class name, test method name) -        """ -        test_class_components = test.id().split(".") -        test_class_name = ".".join(test_class_components[:-1]) -        test_name = test_class_components[-1] -        return test_class_name, test_name - -    @staticmethod -    def bare_event(event_type): -        """Creates an event with default additions, event type and timestamp. - -        @param event_type the value set for the "event" key, used -        to distinguish events. - -        @returns an event dictionary with all default additions, the "event" -        key set to the passed in event_type, and the event_time value set to -        time.time(). -        """ -        if EventBuilder.BASE_DICTIONARY is not None: -            # Start with a copy of the "always include" entries. -            event = dict(EventBuilder.BASE_DICTIONARY) -        else: -            event = {} - -        event.update({ -            "event": event_type, -            "event_time": time.time() -        }) -        return event - -    @staticmethod -    def _assert_is_python_sourcefile(test_filename): -        if test_filename is not None: -            if not test_filename.endswith(".py"): -                raise Exception( -                    "source python filename has unexpected extension: {}".format(test_filename)) -        return test_filename - -    @staticmethod -    def _event_dictionary_common(test, event_type): -        """Returns an event dictionary setup with values for the given event type. - -        @param test the unittest.TestCase instance - -        @param event_type the name of the event type (string). - -        @return event dictionary with common event fields set. -        """ -        test_class_name, test_name = EventBuilder._get_test_name_info(test) - -        # Determine the filename for the test case.  If there is an attribute -        # for it, use it.  Otherwise, determine from the TestCase class path. -        if hasattr(test, "test_filename"): -            test_filename = EventBuilder._assert_is_python_sourcefile( -                test.test_filename) -        else: -            test_filename = EventBuilder._assert_is_python_sourcefile( -                inspect.getsourcefile(test.__class__)) - -        event = EventBuilder.bare_event(event_type) -        event.update({ -            "test_class": test_class_name, -            "test_name": test_name, -            "test_filename": test_filename -        }) - -        return event - -    @staticmethod -    def _error_tuple_class(error_tuple): -        """Returns the unittest error tuple's error class as a string. - -        @param error_tuple the error tuple provided by the test framework. - -        @return the error type (typically an exception) raised by the -        test framework. -        """ -        type_var = error_tuple[0] -        module = inspect.getmodule(type_var) -        if module: -            return "{}.{}".format(module.__name__, type_var.__name__) -        else: -            return type_var.__name__ - -    @staticmethod -    def _error_tuple_message(error_tuple): -        """Returns the unittest error tuple's error message. - -        @param error_tuple the error tuple provided by the test framework. - -        @return the error message provided by the test framework. -        """ -        return str(error_tuple[1]) - -    @staticmethod -    def _error_tuple_traceback(error_tuple): -        """Returns the unittest error tuple's error message. - -        @param error_tuple the error tuple provided by the test framework. - -        @return the error message provided by the test framework. -        """ -        return error_tuple[2] - -    @staticmethod -    def _event_dictionary_test_result(test, status): -        """Returns an event dictionary with common test result fields set. - -        @param test a unittest.TestCase instance. - -        @param status the status/result of the test -        (e.g. "success", "failure", etc.) - -        @return the event dictionary -        """ -        event = EventBuilder._event_dictionary_common( -            test, EventBuilder.TYPE_TEST_RESULT) -        event["status"] = status -        return event - -    @staticmethod -    def _event_dictionary_issue(test, status, error_tuple): -        """Returns an event dictionary with common issue-containing test result -        fields set. - -        @param test a unittest.TestCase instance. - -        @param status the status/result of the test -        (e.g. "success", "failure", etc.) - -        @param error_tuple the error tuple as reported by the test runner. -        This is of the form (type<error>, error). - -        @return the event dictionary -        """ -        event = EventBuilder._event_dictionary_test_result(test, status) -        event["issue_class"] = EventBuilder._error_tuple_class(error_tuple) -        event["issue_message"] = EventBuilder._error_tuple_message(error_tuple) -        backtrace = EventBuilder._error_tuple_traceback(error_tuple) -        if backtrace is not None: -            event["issue_backtrace"] = traceback.format_tb(backtrace) -        return event - -    @staticmethod -    def event_for_start(test): -        """Returns an event dictionary for the test start event. - -        @param test a unittest.TestCase instance. - -        @return the event dictionary -        """ -        return EventBuilder._event_dictionary_common( -            test, EventBuilder.TYPE_TEST_START) - -    @staticmethod -    def event_for_success(test): -        """Returns an event dictionary for a successful test. - -        @param test a unittest.TestCase instance. - -        @return the event dictionary -        """ -        return EventBuilder._event_dictionary_test_result( -            test, EventBuilder.STATUS_SUCCESS) - -    @staticmethod -    def event_for_unexpected_success(test, bugnumber): -        """Returns an event dictionary for a test that succeeded but was -        expected to fail. - -        @param test a unittest.TestCase instance. - -        @param bugnumber the issue identifier for the bug tracking the -        fix request for the test expected to fail (but is in fact -        passing here). - -        @return the event dictionary - -        """ -        event = EventBuilder._event_dictionary_test_result( -            test, EventBuilder.STATUS_UNEXPECTED_SUCCESS) -        if bugnumber: -            event["bugnumber"] = str(bugnumber) -        return event - -    @staticmethod -    def event_for_failure(test, error_tuple): -        """Returns an event dictionary for a test that failed. - -        @param test a unittest.TestCase instance. - -        @param error_tuple the error tuple as reported by the test runner. -        This is of the form (type<error>, error). - -        @return the event dictionary -        """ -        return EventBuilder._event_dictionary_issue( -            test, EventBuilder.STATUS_FAILURE, error_tuple) - -    @staticmethod -    def event_for_expected_failure(test, error_tuple, bugnumber): -        """Returns an event dictionary for a test that failed as expected. - -        @param test a unittest.TestCase instance. - -        @param error_tuple the error tuple as reported by the test runner. -        This is of the form (type<error>, error). - -        @param bugnumber the issue identifier for the bug tracking the -        fix request for the test expected to fail. - -        @return the event dictionary - -        """ -        event = EventBuilder._event_dictionary_issue( -            test, EventBuilder.STATUS_EXPECTED_FAILURE, error_tuple) -        if bugnumber: -            event["bugnumber"] = str(bugnumber) -        return event - -    @staticmethod -    def event_for_skip(test, reason): -        """Returns an event dictionary for a test that was skipped. - -        @param test a unittest.TestCase instance. - -        @param reason the reason why the test is being skipped. - -        @return the event dictionary -        """ -        event = EventBuilder._event_dictionary_test_result( -            test, EventBuilder.STATUS_SKIP) -        event["skip_reason"] = reason -        return event - -    @staticmethod -    def event_for_error(test, error_tuple): -        """Returns an event dictionary for a test that hit a test execution error. - -        @param test a unittest.TestCase instance. - -        @param error_tuple the error tuple as reported by the test runner. -        This is of the form (type<error>, error). - -        @return the event dictionary -        """ -        event = EventBuilder._event_dictionary_issue( -            test, EventBuilder.STATUS_ERROR, error_tuple) -        event["issue_phase"] = "test" -        return event - -    @staticmethod -    def event_for_build_error(test, error_tuple): -        """Returns an event dictionary for a test that hit a test execution error -        during the test cleanup phase. - -        @param test a unittest.TestCase instance. - -        @param error_tuple the error tuple as reported by the test runner. -        This is of the form (type<error>, error). - -        @return the event dictionary -        """ -        event = EventBuilder._event_dictionary_issue( -            test, EventBuilder.STATUS_ERROR, error_tuple) -        event["issue_phase"] = "build" - -        build_error = error_tuple[1] -        event["build_command"] = build_error.command -        event["build_error"] = build_error.build_error -        return event - -    @staticmethod -    def event_for_cleanup_error(test, error_tuple): -        """Returns an event dictionary for a test that hit a test execution error -        during the test cleanup phase. - -        @param test a unittest.TestCase instance. - -        @param error_tuple the error tuple as reported by the test runner. -        This is of the form (type<error>, error). - -        @return the event dictionary -        """ -        event = EventBuilder._event_dictionary_issue( -            test, EventBuilder.STATUS_ERROR, error_tuple) -        event["issue_phase"] = "cleanup" -        return event - -    @staticmethod -    def event_for_job_test_add_error(test_filename, exception, backtrace): -        event = EventBuilder.bare_event(EventBuilder.TYPE_JOB_RESULT) -        event["status"] = EventBuilder.STATUS_ERROR -        if test_filename is not None: -            event["test_filename"] = EventBuilder._assert_is_python_sourcefile( -                test_filename) -        if exception is not None and "__class__" in dir(exception): -            event["issue_class"] = exception.__class__ -        event["issue_message"] = exception -        if backtrace is not None: -            event["issue_backtrace"] = backtrace -        return event - -    @staticmethod -    def event_for_job_exceptional_exit( -            pid, worker_index, exception_code, exception_description, -            test_filename, command_line): -        """Creates an event for a job (i.e. process) exit due to signal. - -        @param pid the process id for the job that failed -        @param worker_index optional id for the job queue running the process -        @param exception_code optional code -        (e.g. SIGTERM integer signal number) -        @param exception_description optional string containing symbolic -        representation of the issue (e.g. "SIGTERM") -        @param test_filename the path to the test filename that exited -        in some exceptional way. -        @param command_line the Popen()-style list provided as the command line -        for the process that timed out. - -        @return an event dictionary coding the job completion description. -        """ -        event = EventBuilder.bare_event(EventBuilder.TYPE_JOB_RESULT) -        event["status"] = EventBuilder.STATUS_EXCEPTIONAL_EXIT -        if pid is not None: -            event["pid"] = pid -        if worker_index is not None: -            event["worker_index"] = int(worker_index) -        if exception_code is not None: -            event["exception_code"] = exception_code -        if exception_description is not None: -            event["exception_description"] = exception_description -        if test_filename is not None: -            event["test_filename"] = EventBuilder._assert_is_python_sourcefile( -                test_filename) -        if command_line is not None: -            event["command_line"] = command_line -        return event - -    @staticmethod -    def event_for_job_timeout(pid, worker_index, test_filename, command_line): -        """Creates an event for a job (i.e. process) timeout. - -        @param pid the process id for the job that timed out -        @param worker_index optional id for the job queue running the process -        @param test_filename the path to the test filename that timed out. -        @param command_line the Popen-style list provided as the command line -        for the process that timed out. - -        @return an event dictionary coding the job completion description. -        """ -        event = EventBuilder.bare_event(EventBuilder.TYPE_JOB_RESULT) -        event["status"] = "timeout" -        if pid is not None: -            event["pid"] = pid -        if worker_index is not None: -            event["worker_index"] = int(worker_index) -        if test_filename is not None: -            event["test_filename"] = EventBuilder._assert_is_python_sourcefile( -                test_filename) -        if command_line is not None: -            event["command_line"] = command_line -        return event - -    @staticmethod -    def event_for_mark_test_rerun_eligible(test): -        """Creates an event that indicates the specified test is explicitly -        eligible for rerun. - -        Note there is a mode that will enable test rerun eligibility at the -        global level.  These markings for explicit rerun eligibility are -        intended for the mode of running where only explicitly re-runnable -        tests are rerun upon hitting an issue. - -        @param test the TestCase instance to which this pertains. - -        @return an event that specifies the given test as being eligible to -        be rerun. -        """ -        event = EventBuilder._event_dictionary_common( -            test, -            EventBuilder.TYPE_MARK_TEST_RERUN_ELIGIBLE) -        return event - -    @staticmethod -    def event_for_mark_test_expected_failure(test): -        """Creates an event that indicates the specified test is expected -        to fail. - -        @param test the TestCase instance to which this pertains. - -        @return an event that specifies the given test is expected to fail. -        """ -        event = EventBuilder._event_dictionary_common( -            test, -            EventBuilder.TYPE_MARK_TEST_EXPECTED_FAILURE) -        return event - -    @staticmethod -    def add_entries_to_all_events(entries_dict): -        """Specifies a dictionary of entries to add to all test events. - -        This provides a mechanism for, say, a parallel test runner to -        indicate to each inferior dotest.py that it should add a -        worker index to each. - -        Calling this method replaces all previous entries added -        by a prior call to this. - -        Event build methods will overwrite any entries that collide. -        Thus, the passed in dictionary is the base, which gets merged -        over by event building when keys collide. - -        @param entries_dict a dictionary containing key and value -        pairs that should be merged into all events created by the -        event generator.  May be None to clear out any extra entries. -        """ -        EventBuilder.BASE_DICTIONARY = dict(entries_dict) diff --git a/packages/Python/lldbsuite/test_event/formatter/__init__.py b/packages/Python/lldbsuite/test_event/formatter/__init__.py deleted file mode 100644 index 2481e326e946..000000000000 --- a/packages/Python/lldbsuite/test_event/formatter/__init__.py +++ /dev/null @@ -1,163 +0,0 @@ -""" -    The LLVM Compiler Infrastructure - -This file is distributed under the University of Illinois Open Source -License. See LICENSE.TXT for details. -""" - -from __future__ import print_function -from __future__ import absolute_import - -# System modules -import importlib -import socket -import sys - -# Third-party modules - -# LLDB modules - - -# Ignore method count on DTOs. -# pylint: disable=too-few-public-methods -class FormatterConfig(object): -    """Provides formatter configuration info to create_results_formatter().""" - -    def __init__(self): -        self.filename = None -        self.port = None -        self.formatter_name = None -        self.formatter_options = None - - -# Ignore method count on DTOs. -# pylint: disable=too-few-public-methods -class CreatedFormatter(object): -    """Provides transfer object for returns from create_results_formatter().""" - -    def __init__(self, formatter, cleanup_func): -        self.formatter = formatter -        self.cleanup_func = cleanup_func - - -def create_results_formatter(config): -    """Sets up a test results formatter. - -    @param config an instance of FormatterConfig -    that indicates how to setup the ResultsFormatter. - -    @return an instance of CreatedFormatter. -    """ - -    def create_socket(port): -        """Creates a socket to the localhost on the given port. - -        @param port the port number of the listening port on -        the localhost. - -        @return (socket object, socket closing function) -        """ - -        def socket_closer(open_sock): -            """Close down an opened socket properly.""" -            open_sock.shutdown(socket.SHUT_RDWR) -            open_sock.close() - -        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -        sock.connect(("localhost", port)) - -        # Wait for the ack from the listener side. -        # This is needed to prevent a race condition -        # in the main dosep.py processing loop: we -        # can't allow a worker queue thread to die -        # that has outstanding messages to a listener -        # socket before the listener socket asyncore -        # listener socket gets spun up; otherwise, -        # we lose the test result info. -        read_bytes = sock.recv(1) -        if read_bytes is None or (len(read_bytes) < 1) or (read_bytes != b'*'): -            raise Exception( -                "listening socket did not respond with ack byte: response={}".format(read_bytes)) - -        return sock, lambda: socket_closer(sock) - -    default_formatter_name = None -    results_file_object = None -    cleanup_func = None - -    file_is_stream = False -    if config.filename: -        # Open the results file for writing. -        if config.filename == 'stdout': -            results_file_object = sys.stdout -            cleanup_func = None -        elif config.filename == 'stderr': -            results_file_object = sys.stderr -            cleanup_func = None -        else: -            results_file_object = open(config.filename, "w") -            cleanup_func = results_file_object.close -        default_formatter_name = ( -            "lldbsuite.test_event.formatter.xunit.XunitFormatter") -    elif config.port: -        # Connect to the specified localhost port. -        results_file_object, cleanup_func = create_socket(config.port) -        default_formatter_name = ( -            "lldbsuite.test_event.formatter.pickled.RawPickledFormatter") -        file_is_stream = True - -    # If we have a results formatter name specified and we didn't specify -    # a results file, we should use stdout. -    if config.formatter_name is not None and results_file_object is None: -        # Use stdout. -        results_file_object = sys.stdout -        cleanup_func = None - -    if results_file_object: -        # We care about the formatter.  Choose user-specified or, if -        # none specified, use the default for the output type. -        if config.formatter_name: -            formatter_name = config.formatter_name -        else: -            formatter_name = default_formatter_name - -        # Create an instance of the class. -        # First figure out the package/module. -        components = formatter_name.split(".") -        module = importlib.import_module(".".join(components[:-1])) - -        # Create the class name we need to load. -        cls = getattr(module, components[-1]) - -        # Handle formatter options for the results formatter class. -        formatter_arg_parser = cls.arg_parser() -        if config.formatter_options and len(config.formatter_options) > 0: -            command_line_options = config.formatter_options -        else: -            command_line_options = [] - -        formatter_options = formatter_arg_parser.parse_args( -            command_line_options) - -        # Create the TestResultsFormatter given the processed options. -        results_formatter_object = cls( -            results_file_object, -            formatter_options, -            file_is_stream) - -        def shutdown_formatter(): -            """Shuts down the formatter when it is no longer needed.""" -            # Tell the formatter to write out anything it may have -            # been saving until the very end (e.g. xUnit results -            # can't complete its output until this point). -            results_formatter_object.send_terminate_as_needed() - -            # And now close out the output file-like object. -            if cleanup_func is not None: -                cleanup_func() - -        return CreatedFormatter( -            results_formatter_object, -            shutdown_formatter) -    else: -        return None diff --git a/packages/Python/lldbsuite/test_event/formatter/curses.py b/packages/Python/lldbsuite/test_event/formatter/curses.py deleted file mode 100644 index f415575ded8a..000000000000 --- a/packages/Python/lldbsuite/test_event/formatter/curses.py +++ /dev/null @@ -1,342 +0,0 @@ -""" -                     The LLVM Compiler Infrastructure - - This file is distributed under the University of Illinois Open Source - License. See LICENSE.TXT for details. -""" - -from __future__ import absolute_import -from __future__ import print_function - -# System modules -import curses -import datetime -import math -import sys -import time - -# Third-party modules - -# LLDB modules -from lldbsuite.test import lldbcurses - -from . import results_formatter -from ..event_builder import EventBuilder - - -class Curses(results_formatter.ResultsFormatter): -    """Receives live results from tests that are running and reports them to the terminal in a curses GUI""" - -    def __init__(self, out_file, options, file_is_stream): -        # Initialize the parent -        super(Curses, self).__init__(out_file, options, file_is_stream) -        self.using_terminal = True -        self.have_curses = True -        self.initialize_event = None -        self.jobs = [None] * 64 -        self.job_tests = [None] * 64 -        self.results = list() -        try: -            self.main_window = lldbcurses.intialize_curses() -            self.main_window.add_key_action( -                '\t', -                self.main_window.select_next_first_responder, -                "Switch between views that can respond to keyboard input") -            self.main_window.refresh() -            self.job_panel = None -            self.results_panel = None -            self.status_panel = None -            self.info_panel = None -            self.hide_status_list = list() -            self.start_time = time.time() -        except: -            self.have_curses = False -            lldbcurses.terminate_curses() -            self.using_terminal = False -            print("Unexpected error:", sys.exc_info()[0]) -            raise - -        self.line_dict = dict() -        # self.events_file = open("/tmp/events.txt", "w") -        # self.formatters = list() -        # if tee_results_formatter: -        #     self.formatters.append(tee_results_formatter) - -    def status_to_short_str(self, status, test_event): -        if status == EventBuilder.STATUS_SUCCESS: -            return '.' -        elif status == EventBuilder.STATUS_FAILURE: -            return 'F' -        elif status == EventBuilder.STATUS_UNEXPECTED_SUCCESS: -            return '?' -        elif status == EventBuilder.STATUS_EXPECTED_FAILURE: -            return 'X' -        elif status == EventBuilder.STATUS_SKIP: -            return 'S' -        elif status == EventBuilder.STATUS_ERROR: -            if test_event.get("issue_phase", None) == "build": -                # Build failure -                return 'B' -            else: -                return 'E' -        elif status == EventBuilder.STATUS_TIMEOUT: -            return 'T' -        elif status == EventBuilder.STATUS_EXPECTED_TIMEOUT: -            return 't' -        else: -            return status - -    def show_info_panel(self): -        selected_idx = self.results_panel.get_selected_idx() -        if selected_idx >= 0 and selected_idx < len(self.results): -            if self.info_panel is None: -                info_frame = self.results_panel.get_contained_rect( -                    top_inset=10, left_inset=10, right_inset=10, height=30) -                self.info_panel = lldbcurses.BoxedPanel( -                    info_frame, "Result Details") -                # Add a key action for any key that will hide this panel when -                # any key is pressed -                self.info_panel.add_key_action(-1, -                                               self.hide_info_panel, -                                               'Hide the info panel') -                self.info_panel.top() -            else: -                self.info_panel.show() - -            self.main_window.push_first_responder(self.info_panel) -            test_start = self.results[selected_idx][0] -            test_result = self.results[selected_idx][1] -            self.info_panel.set_line( -                0, "File: %s" % -                (test_start['test_filename'])) -            self.info_panel.set_line( -                1, "Test: %s.%s" % -                (test_start['test_class'], test_start['test_name'])) -            self.info_panel.set_line( -                2, "Time: %s" % -                (test_result['elapsed_time'])) -            self.info_panel.set_line(3, "Status: %s" % (test_result['status'])) - -    def hide_info_panel(self): -        self.main_window.pop_first_responder(self.info_panel) -        self.info_panel.hide() -        self.main_window.refresh() - -    def toggle_status(self, status): -        if status: -            # Toggle showing and hiding results whose status matches "status" -            # in "Results" window -            if status in self.hide_status_list: -                self.hide_status_list.remove(status) -            else: -                self.hide_status_list.append(status) -            self.update_results() - -    def update_results(self, update=True): -        '''Called after a category of test have been show/hidden to update the results list with -           what the user desires to see.''' -        self.results_panel.clear(update=False) -        for result in self.results: -            test_result = result[1] -            status = test_result['status'] -            if status in self.hide_status_list: -                continue -            name = test_result['test_class'] + '.' + test_result['test_name'] -            self.results_panel.append_line( -                '%s (%6.2f sec) %s' % -                (self.status_to_short_str( -                    status, -                    test_result), -                    test_result['elapsed_time'], -                    name)) -        if update: -            self.main_window.refresh() - -    def handle_event(self, test_event): -        with self.lock: -            super(Curses, self).handle_event(test_event) -            # for formatter in self.formatters: -            #     formatter.process_event(test_event) -            if self.have_curses: -                worker_index = -1 -                if 'worker_index' in test_event: -                    worker_index = test_event['worker_index'] -                if 'event' in test_event: -                    check_for_one_key = True -                    #print(str(test_event), file=self.events_file) -                    event = test_event['event'] -                    if self.status_panel: -                        self.status_panel.update_status( -                            'time', str( -                                datetime.timedelta( -                                    seconds=math.floor( -                                        time.time() - self.start_time)))) -                    if event == 'test_start': -                        name = test_event['test_class'] + \ -                            '.' + test_event['test_name'] -                        self.job_tests[worker_index] = test_event -                        if 'pid' in test_event: -                            line = 'pid: %5d ' % (test_event['pid']) + name -                        else: -                            line = name -                        self.job_panel.set_line(worker_index, line) -                        self.main_window.refresh() -                    elif event == 'test_result': -                        status = test_event['status'] -                        self.status_panel.increment_status(status) -                        if 'pid' in test_event: -                            line = 'pid: %5d ' % (test_event['pid']) -                        else: -                            line = '' -                        self.job_panel.set_line(worker_index, line) -                        name = test_event['test_class'] + \ -                            '.' + test_event['test_name'] -                        elapsed_time = test_event[ -                            'event_time'] - self.job_tests[worker_index]['event_time'] -                        if status not in self.hide_status_list: -                            self.results_panel.append_line( -                                '%s (%6.2f sec) %s' % -                                (self.status_to_short_str( -                                    status, test_event), elapsed_time, name)) -                        self.main_window.refresh() -                        # Append the result pairs -                        test_event['elapsed_time'] = elapsed_time -                        self.results.append( -                            [self.job_tests[worker_index], test_event]) -                        self.job_tests[worker_index] = '' -                    elif event == 'job_begin': -                        self.jobs[worker_index] = test_event -                        if 'pid' in test_event: -                            line = 'pid: %5d ' % (test_event['pid']) -                        else: -                            line = '' -                        self.job_panel.set_line(worker_index, line) -                    elif event == 'job_end': -                        self.jobs[worker_index] = '' -                        self.job_panel.set_line(worker_index, '') -                    elif event == 'initialize': -                        self.initialize_event = test_event -                        num_jobs = test_event['worker_count'] -                        job_frame = self.main_window.get_contained_rect( -                            height=num_jobs + 2) -                        results_frame = self.main_window.get_contained_rect( -                            top_inset=num_jobs + 2, bottom_inset=1) -                        status_frame = self.main_window.get_contained_rect( -                            height=1, top_inset=self.main_window.get_size().h - 1) -                        self.job_panel = lldbcurses.BoxedPanel( -                            frame=job_frame, title="Jobs") -                        self.results_panel = lldbcurses.BoxedPanel( -                            frame=results_frame, title="Results") - -                        self.results_panel.add_key_action( -                            curses.KEY_UP, -                            self.results_panel.select_prev, -                            "Select the previous list entry") -                        self.results_panel.add_key_action( -                            curses.KEY_DOWN, self.results_panel.select_next, "Select the next list entry") -                        self.results_panel.add_key_action( -                            curses.KEY_HOME, -                            self.results_panel.scroll_begin, -                            "Scroll to the start of the list") -                        self.results_panel.add_key_action( -                            curses.KEY_END, self.results_panel.scroll_end, "Scroll to the end of the list") -                        self.results_panel.add_key_action( -                            curses.KEY_ENTER, -                            self.show_info_panel, -                            "Display info for the selected result item") -                        self.results_panel.add_key_action( -                            '.', -                            lambda: self.toggle_status( -                                EventBuilder.STATUS_SUCCESS), -                            "Toggle showing/hiding tests whose status is 'success'") -                        self.results_panel.add_key_action( -                            'e', -                            lambda: self.toggle_status( -                                EventBuilder.STATUS_ERROR), -                            "Toggle showing/hiding tests whose status is 'error'") -                        self.results_panel.add_key_action( -                            'f', -                            lambda: self.toggle_status( -                                EventBuilder.STATUS_FAILURE), -                            "Toggle showing/hiding tests whose status is 'failure'") -                        self.results_panel.add_key_action('s', lambda: self.toggle_status( -                            EventBuilder.STATUS_SKIP), "Toggle showing/hiding tests whose status is 'skip'") -                        self.results_panel.add_key_action( -                            'x', -                            lambda: self.toggle_status( -                                EventBuilder.STATUS_EXPECTED_FAILURE), -                            "Toggle showing/hiding tests whose status is 'expected_failure'") -                        self.results_panel.add_key_action( -                            '?', -                            lambda: self.toggle_status( -                                EventBuilder.STATUS_UNEXPECTED_SUCCESS), -                            "Toggle showing/hiding tests whose status is 'unexpected_success'") -                        self.status_panel = lldbcurses.StatusPanel( -                            frame=status_frame) - -                        self.main_window.add_child(self.job_panel) -                        self.main_window.add_child(self.results_panel) -                        self.main_window.add_child(self.status_panel) -                        self.main_window.set_first_responder( -                            self.results_panel) - -                        self.status_panel.add_status_item( -                            name="time", -                            title="Elapsed", -                            format="%s", -                            width=20, -                            value="0:00:00", -                            update=False) -                        self.status_panel.add_status_item( -                            name=EventBuilder.STATUS_SUCCESS, -                            title="Success", -                            format="%u", -                            width=20, -                            value=0, -                            update=False) -                        self.status_panel.add_status_item( -                            name=EventBuilder.STATUS_FAILURE, -                            title="Failure", -                            format="%u", -                            width=20, -                            value=0, -                            update=False) -                        self.status_panel.add_status_item( -                            name=EventBuilder.STATUS_ERROR, -                            title="Error", -                            format="%u", -                            width=20, -                            value=0, -                            update=False) -                        self.status_panel.add_status_item( -                            name=EventBuilder.STATUS_SKIP, -                            title="Skipped", -                            format="%u", -                            width=20, -                            value=0, -                            update=True) -                        self.status_panel.add_status_item( -                            name=EventBuilder.STATUS_EXPECTED_FAILURE, -                            title="Expected Failure", -                            format="%u", -                            width=30, -                            value=0, -                            update=False) -                        self.status_panel.add_status_item( -                            name=EventBuilder.STATUS_UNEXPECTED_SUCCESS, -                            title="Unexpected Success", -                            format="%u", -                            width=30, -                            value=0, -                            update=False) -                        self.main_window.refresh() -                    elif event == 'terminate': -                        # self.main_window.key_event_loop() -                        lldbcurses.terminate_curses() -                        check_for_one_key = False -                        self.using_terminal = False -                        # Check for 1 keypress with no delay - -                    # Check for 1 keypress with no delay -                    if check_for_one_key: -                        self.main_window.key_event_loop(0, 1) diff --git a/packages/Python/lldbsuite/test_event/formatter/dump_formatter.py b/packages/Python/lldbsuite/test_event/formatter/dump_formatter.py deleted file mode 100644 index d42dcb18bb4f..000000000000 --- a/packages/Python/lldbsuite/test_event/formatter/dump_formatter.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -    The LLVM Compiler Infrastructure - -This file is distributed under the University of Illinois Open Source -License. See LICENSE.TXT for details. -""" - -from __future__ import print_function -from __future__ import absolute_import - -# System modules -import pprint - -# Our modules -from .results_formatter import ResultsFormatter - - -class DumpFormatter(ResultsFormatter): -    """Formats events to the file as their raw python dictionary format.""" - -    def handle_event(self, test_event): -        super(DumpFormatter, self).handle_event(test_event) -        self.out_file.write("\n" + pprint.pformat(test_event) + "\n") diff --git a/packages/Python/lldbsuite/test_event/formatter/pickled.py b/packages/Python/lldbsuite/test_event/formatter/pickled.py deleted file mode 100644 index 588614e2f7b4..000000000000 --- a/packages/Python/lldbsuite/test_event/formatter/pickled.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -    The LLVM Compiler Infrastructure - -This file is distributed under the University of Illinois Open Source -License. See LICENSE.TXT for details. -""" - -from __future__ import print_function -from __future__ import absolute_import - -# System modules -import os - -# Our modules -from .results_formatter import ResultsFormatter -from six.moves import cPickle - - -class RawPickledFormatter(ResultsFormatter): -    """Formats events as a pickled stream. - -    The parallel test runner has inferiors pickle their results and send them -    over a socket back to the parallel test.  The parallel test runner then -    aggregates them into the final results formatter (e.g. xUnit). -    """ - -    @classmethod -    def arg_parser(cls): -        """@return arg parser used to parse formatter-specific options.""" -        parser = super(RawPickledFormatter, cls).arg_parser() -        return parser - -    class StreamSerializer(object): - -        @staticmethod -        def serialize(test_event, out_file): -            # Send it as -            # {serialized_length_of_serialized_bytes}{serialized_bytes} -            import struct -            msg = cPickle.dumps(test_event) -            packet = struct.pack("!I%ds" % len(msg), len(msg), msg) -            out_file.send(packet) - -    class BlockSerializer(object): - -        @staticmethod -        def serialize(test_event, out_file): -            cPickle.dump(test_event, out_file) - -    def __init__(self, out_file, options, file_is_stream): -        super( -            RawPickledFormatter, -            self).__init__( -            out_file, -            options, -            file_is_stream) -        self.pid = os.getpid() -        if file_is_stream: -            self.serializer = self.StreamSerializer() -        else: -            self.serializer = self.BlockSerializer() - -    def handle_event(self, test_event): -        super(RawPickledFormatter, self).handle_event(test_event) - -        # Convert initialize/terminate events into job_begin/job_end events. -        event_type = test_event["event"] -        if event_type is None: -            return - -        if event_type == "initialize": -            test_event["event"] = "job_begin" -        elif event_type == "terminate": -            test_event["event"] = "job_end" - -        # Tack on the pid. -        test_event["pid"] = self.pid - -        # Serialize the test event. -        self.serializer.serialize(test_event, self.out_file) diff --git a/packages/Python/lldbsuite/test_event/formatter/results_formatter.py b/packages/Python/lldbsuite/test_event/formatter/results_formatter.py deleted file mode 100644 index 8e341cb2ce84..000000000000 --- a/packages/Python/lldbsuite/test_event/formatter/results_formatter.py +++ /dev/null @@ -1,766 +0,0 @@ -""" -                     The LLVM Compiler Infrastructure - -This file is distributed under the University of Illinois Open Source -License. See LICENSE.TXT for details. - -Provides classes used by the test results reporting infrastructure -within the LLDB test suite. -""" - -from __future__ import print_function -from __future__ import absolute_import - -# System modules -import argparse -import os -import re -import sys -import threading - -# Third-party modules - - -# LLDB modules -from lldbsuite.test import configuration -from ..event_builder import EventBuilder - -import lldbsuite - - -FILE_LEVEL_KEY_RE = re.compile(r"^(.+\.py)[^.:]*$") - - -class ResultsFormatter(object): -    """Provides interface to formatting test results out to a file-like object. - -    This class allows the LLDB test framework's raw test-related -    events to be processed and formatted in any manner desired. -    Test events are represented by python dictionaries, formatted -    as in the EventBuilder class above. - -    ResultFormatter instances are given a file-like object in which -    to write their results. - -    ResultFormatter lifetime looks like the following: - -    # The result formatter is created. -    # The argparse options dictionary is generated from calling -    # the SomeResultFormatter.arg_parser() with the options data -    # passed to dotest.py via the "--results-formatter-options" -    # argument.  See the help on that for syntactic requirements -    # on getting that parsed correctly. -    formatter = SomeResultFormatter(file_like_object, argparse_options_dict) - -    # Single call to session start, before parsing any events. -    formatter.begin_session() - -    formatter.handle_event({"event":"initialize",...}) - -    # Zero or more calls specified for events recorded during the test session. -    # The parallel test runner manages getting results from all the inferior -    # dotest processes, so from a new format perspective, don't worry about -    # that.  The formatter will be presented with a single stream of events -    # sandwiched between a single begin_session()/end_session() pair in the -    # parallel test runner process/thread. -    for event in zero_or_more_test_events(): -        formatter.handle_event(event) - -    # Single call to terminate/wrap-up. For formatters that need all the -    # data before they can print a correct result (e.g. xUnit/JUnit), -    # this is where the final report can be generated. -    formatter.handle_event({"event":"terminate",...}) - -    It is not the formatter's responsibility to close the file_like_object. -    (i.e. do not close it). - -    The lldb test framework passes these test events in real time, so they -    arrive as they come in. - -    In the case of the parallel test runner, the dotest inferiors -    add a 'pid' field to the dictionary that indicates which inferior -    pid generated the event. - -    Note more events may be added in the future to support richer test -    reporting functionality. One example: creating a true flaky test -    result category so that unexpected successes really mean the test -    is marked incorrectly (either should be marked flaky, or is indeed -    passing consistently now and should have the xfail marker -    removed). In this case, a flaky_success and flaky_fail event -    likely will be added to capture these and support reporting things -    like percentages of flaky test passing so we can see if we're -    making some things worse/better with regards to failure rates. - -    Another example: announcing all the test methods that are planned -    to be run, so we can better support redo operations of various kinds -    (redo all non-run tests, redo non-run tests except the one that -    was running [perhaps crashed], etc.) - -    Implementers are expected to override all the public methods -    provided in this class. See each method's docstring to see -    expectations about when the call should be chained. - -    """ -    @classmethod -    def arg_parser(cls): -        """@return arg parser used to parse formatter-specific options.""" -        parser = argparse.ArgumentParser( -            description='{} options'.format(cls.__name__), -            usage=('dotest.py --results-formatter-options=' -                   '"--option1 value1 [--option2 value2 [...]]"')) -        parser.add_argument( -            "--dump-results", -            action="store_true", -            help=('dump the raw results data after printing ' -                  'the summary output.')) -        return parser - -    def __init__(self, out_file, options, file_is_stream): -        super(ResultsFormatter, self).__init__() -        self.out_file = out_file -        self.options = options -        self.using_terminal = False -        if not self.out_file: -            raise Exception("ResultsFormatter created with no file object") -        self.start_time_by_test = {} -        self.terminate_called = False -        self.file_is_stream = file_is_stream - -        # Track the most recent test start event by worker index. -        # We'll use this to assign TIMEOUT and exceptional -        # exits to the most recent test started on a given -        # worker index. -        self.started_tests_by_worker = {} - -        # Store the most recent test_method/job status. -        self.result_events = {} - -        # Track the number of test method reruns. -        self.test_method_rerun_count = 0 - -        # Lock that we use while mutating inner state, like the -        # total test count and the elements.  We minimize how -        # long we hold the lock just to keep inner state safe, not -        # entirely consistent from the outside. -        self.lock = threading.RLock() - -        # Keeps track of the test base filenames for tests that -        # are expected to timeout.  If a timeout occurs in any test -        # basename that matches this list, that result should be -        # converted into a non-issue.  We'll create an expected -        # timeout test status for this. -        self.expected_timeouts_by_basename = set() - -        # Tests which have reported that they are expecting to fail. These will -        # be marked as expected failures even if they return a failing status, -        # probably because they crashed or deadlocked. -        self.expected_failures = set() - -        # Keep track of rerun-eligible tests. -        # This is a set that contains tests saved as: -        # {test_filename}:{test_class}:{test_name} -        self.rerun_eligible_tests = set() - -        # A dictionary of test files that had a failing -        # test, in the format of: -        # key = test path, value = array of test methods that need rerun -        self.tests_for_rerun = {} - -    @classmethod -    def _make_key(cls, result_event): -        """Creates a key from a test or job result event. - -        This key attempts to be as unique as possible.  For -        test result events, it will be unique per test method. -        For job events (ones not promoted to a test result event), -        it will be unique per test case file. - -        @return a string-based key of the form -        {test_filename}:{test_class}.{test_name} -        """ -        if result_event is None: -            return None -        component_count = 0 -        if "test_filename" in result_event: -            key = result_event["test_filename"] -            component_count += 1 -        else: -            key = "<no_filename>" -        if "test_class" in result_event: -            if component_count > 0: -                key += ":" -            key += result_event["test_class"] -            component_count += 1 -        if "test_name" in result_event: -            if component_count > 0: -                key += "." -            key += result_event["test_name"] -            component_count += 1 -        return key - -    @classmethod -    def _is_file_level_issue(cls, key, event): -        """Returns whether a given key represents a file-level event. - -        @param cls this class.  Unused, but following PEP8 for -        preferring @classmethod over @staticmethod. - -        @param key the key for the issue being tested. - -        @param event the event for the issue being tested. - -        @return True when the given key (as made by _make_key()) -        represents an event that is at the test file level (i.e. -        it isn't scoped to a test class or method). -        """ -        if key is None: -            return False -        else: -            return FILE_LEVEL_KEY_RE.match(key) is not None - -    def _mark_test_as_expected_failure(self, test_result_event): -        key = self._make_key(test_result_event) -        if key is not None: -            self.expected_failures.add(key) -        else: -            sys.stderr.write( -                "\nerror: test marked as expected failure but " -                "failed to create key.\n") - -    def _mark_test_for_rerun_eligibility(self, test_result_event): -        key = self._make_key(test_result_event) -        if key is not None: -            self.rerun_eligible_tests.add(key) -        else: -            sys.stderr.write( -                "\nerror: test marked for re-run eligibility but " -                "failed to create key.\n") - -    def _maybe_add_test_to_rerun_list(self, result_event): -        key = self._make_key(result_event) -        if key is not None: -            if (key in self.rerun_eligible_tests or -                    configuration.rerun_all_issues): -                test_filename = result_event.get("test_filename", None) -                if test_filename is not None: -                    test_name = result_event.get("test_name", None) -                    if test_filename not in self.tests_for_rerun: -                        self.tests_for_rerun[test_filename] = [] -                    if test_name is not None: -                        self.tests_for_rerun[test_filename].append(test_name) -        else: -            sys.stderr.write( -                "\nerror: couldn't add testrun-failing test to rerun " -                "list because no eligibility key could be created.\n") - -    def _maybe_remap_job_result_event(self, test_event): -        """Remaps timeout/exceptional exit job results to last test method running. - -        @param test_event the job_result test event.  This is an in/out -        parameter.  It will be modified if it can be mapped to a test_result -        of the same status, using details from the last-running test method -        known to be most recently started on the same worker index. -        """ -        test_start = None - -        job_status = test_event["status"] -        if job_status in [ -                EventBuilder.STATUS_TIMEOUT, -                EventBuilder.STATUS_EXCEPTIONAL_EXIT]: -            worker_index = test_event.get("worker_index", None) -            if worker_index is not None: -                test_start = self.started_tests_by_worker.get( -                    worker_index, None) - -        # If we have a test start to remap, do it here. -        if test_start is not None: -            test_event["event"] = EventBuilder.TYPE_TEST_RESULT - -            # Fill in all fields from test start not present in -            # job status message. -            for (start_key, start_value) in test_start.items(): -                if start_key not in test_event: -                    test_event[start_key] = start_value - -    def _maybe_remap_expected_timeout(self, event): -        if event is None: -            return - -        status = event.get("status", None) -        if status is None or status != EventBuilder.STATUS_TIMEOUT: -            return - -        # Check if the timeout test's basename is in the expected timeout -        # list.  If so, convert to an expected timeout. -        basename = os.path.basename(event.get("test_filename", "")) -        if basename in self.expected_timeouts_by_basename: -            # Convert to an expected timeout. -            event["status"] = EventBuilder.STATUS_EXPECTED_TIMEOUT - -    def _maybe_remap_expected_failure(self, event): -        if event is None: -            return - -        key = self._make_key(event) -        if key not in self.expected_failures: -            return - -        status = event.get("status", None) -        if status in EventBuilder.TESTRUN_ERROR_STATUS_VALUES: -            event["status"] = EventBuilder.STATUS_EXPECTED_FAILURE -        elif status == EventBuilder.STATUS_SUCCESS: -            event["status"] = EventBuilder.STATUS_UNEXPECTED_SUCCESS - -    def handle_event(self, test_event): -        """Handles the test event for collection into the formatter output. - -        Derived classes may override this but should call down to this -        implementation first. - -        @param test_event the test event as formatted by one of the -        event_for_* calls. -        """ -        with self.lock: -            # Keep track of whether terminate was received.  We do this so -            # that a process can call the 'terminate' event on its own, to -            # close down a formatter at the appropriate time.  Then the -            # atexit() cleanup can call the "terminate if it hasn't been -            # called yet". -            if test_event is not None: -                event_type = test_event.get("event", "") -                # We intentionally allow event_type to be checked anew -                # after this check below since this check may rewrite -                # the event type -                if event_type == EventBuilder.TYPE_JOB_RESULT: -                    # Possibly convert the job status (timeout, -                    # exceptional exit) # to an appropriate test_result event. -                    self._maybe_remap_job_result_event(test_event) -                    event_type = test_event.get("event", "") - -                # Remap timeouts to expected timeouts. -                if event_type in EventBuilder.RESULT_TYPES: -                    self._maybe_remap_expected_timeout(test_event) -                    self._maybe_remap_expected_failure(test_event) -                    event_type = test_event.get("event", "") - -                if event_type == "terminate": -                    self.terminate_called = True -                elif event_type in EventBuilder.RESULT_TYPES: -                    # Clear the most recently started test for the related -                    # worker. -                    worker_index = test_event.get("worker_index", None) -                    if worker_index is not None: -                        self.started_tests_by_worker.pop(worker_index, None) -                    status = test_event["status"] -                    if status in EventBuilder.TESTRUN_ERROR_STATUS_VALUES: -                        # A test/job status value in any of those status values -                        # causes a testrun failure. If such a test fails, check -                        # whether it can be rerun. If it can be rerun, add it -                        # to the rerun job. -                        self._maybe_add_test_to_rerun_list(test_event) - -                    # Build the test key. -                    test_key = self._make_key(test_event) -                    if test_key is None: -                        raise Exception( -                            "failed to find test filename for " -                            "test event {}".format(test_event)) - -                    # Save the most recent test event for the test key. This -                    # allows a second test phase to overwrite the most recent -                    # result for the test key (unique per method). We do final -                    # reporting at the end, so we'll report based on final -                    # results. We do this so that a re-run caused by, perhaps, -                    # the need to run a low-load, single-worker test run can -                    # have the final run's results to always be used. -                    if test_key in self.result_events: -                        self.test_method_rerun_count += 1 -                    self.result_events[test_key] = test_event -                elif event_type == EventBuilder.TYPE_TEST_START: -                    # Track the start time for the test method. -                    self.track_start_time( -                        test_event["test_class"], -                        test_event["test_name"], -                        test_event["event_time"]) -                    # Track of the most recent test method start event -                    # for the related worker.  This allows us to figure -                    # out whether a process timeout or exceptional exit -                    # can be charged (i.e. assigned) to a test method. -                    worker_index = test_event.get("worker_index", None) -                    if worker_index is not None: -                        self.started_tests_by_worker[worker_index] = test_event - -                elif event_type == EventBuilder.TYPE_MARK_TEST_RERUN_ELIGIBLE: -                    self._mark_test_for_rerun_eligibility(test_event) -                elif (event_type == -                      EventBuilder.TYPE_MARK_TEST_EXPECTED_FAILURE): -                    self._mark_test_as_expected_failure(test_event) - -    def set_expected_timeouts_by_basename(self, basenames): -        """Specifies a list of test file basenames that are allowed to timeout -        without being called out as a timeout issue. - -        These fall into a new status category called STATUS_EXPECTED_TIMEOUT. -        """ -        if basenames is not None: -            for basename in basenames: -                self.expected_timeouts_by_basename.add(basename) - -    def track_start_time(self, test_class, test_name, start_time): -        """tracks the start time of a test so elapsed time can be computed. - -        this alleviates the need for test results to be processed serially -        by test.  it will save the start time for the test so that -        elapsed_time_for_test() can compute the elapsed time properly. -        """ -        if test_class is None or test_name is None: -            return - -        test_key = "{}.{}".format(test_class, test_name) -        self.start_time_by_test[test_key] = start_time - -    def elapsed_time_for_test(self, test_class, test_name, end_time): -        """returns the elapsed time for a test. - -        this function can only be called once per test and requires that -        the track_start_time() method be called sometime prior to calling -        this method. -        """ -        if test_class is None or test_name is None: -            return -2.0 - -        test_key = "{}.{}".format(test_class, test_name) -        if test_key not in self.start_time_by_test: -            return -1.0 -        else: -            start_time = self.start_time_by_test[test_key] -        del self.start_time_by_test[test_key] -        return end_time - start_time - -    def is_using_terminal(self): -        """returns true if this results formatter is using the terminal and -        output should be avoided.""" -        return self.using_terminal - -    def send_terminate_as_needed(self): -        """sends the terminate event if it hasn't been received yet.""" -        if not self.terminate_called: -            terminate_event = EventBuilder.bare_event("terminate") -            self.handle_event(terminate_event) - -    # Derived classes may require self access -    # pylint: disable=no-self-use -    # noinspection PyMethodMayBeStatic,PyMethodMayBeStatic -    def replaces_summary(self): -        """Returns whether the results formatter includes a summary -        suitable to replace the old lldb test run results. - -        @return True if the lldb test runner can skip its summary -        generation when using this results formatter; False otherwise. -        """ -        return False - -    def counts_by_test_result_status(self, status): -        """Returns number of test method results for the given status. - -        @status_result a test result status (e.g. success, fail, skip) -        as defined by the EventBuilder.STATUS_* class members. - -        @return an integer returning the number of test methods matching -        the given test result status. -        """ -        return len([ -            [key, event] for (key, event) in self.result_events.items() -            if event.get("status", "") == status]) - -    @classmethod -    def _event_sort_key(cls, event): -        """Returns the sort key to be used for a test event. - -        This method papers over the differences in a test method result vs. a -        job (i.e. inferior process) result. - -        @param event a test result or job result event. -        @return a key useful for sorting events by name (test name preferably, -        then by test filename). -        """ -        if "test_name" in event: -            return event["test_name"] -        else: -            return event.get("test_filename", None) - -    def _partition_results_by_status(self, categories): -        """Partitions the captured test results by event status. - -        This permits processing test results by the category ids. - -        @param categories the list of categories on which to partition. -        Follows the format described in _report_category_details(). - -        @return a dictionary where each key is the test result status, -        and each entry is a list containing all the test result events -        that matched that test result status.  Result status IDs with -        no matching entries will have a zero-length list. -        """ -        partitioned_events = {} -        for category in categories: -            result_status_id = category[0] -            matching_events = [ -                [key, event] for (key, event) in self.result_events.items() -                if event.get("status", "") == result_status_id] -            partitioned_events[result_status_id] = sorted( -                matching_events, -                key=lambda x: self._event_sort_key(x[1])) -        return partitioned_events - -    @staticmethod -    def _print_banner(out_file, banner_text): -        """Prints an ASCII banner around given text. - -        Output goes to the out file for the results formatter. - -        @param out_file a file-like object where output will be written. -        @param banner_text the text to display, with a banner -        of '=' around the line above and line below. -        """ -        banner_separator = "".ljust(len(banner_text), "=") - -        out_file.write("\n{}\n{}\n{}\n".format( -            banner_separator, -            banner_text, -            banner_separator)) - -    def _print_summary_counts( -            self, out_file, categories, result_events_by_status, extra_rows): -        """Prints summary counts for all categories. - -        @param out_file a file-like object used to print output. - -        @param categories the list of categories on which to partition. -        Follows the format described in _report_category_details(). - -        @param result_events_by_status the partitioned list of test -        result events in a dictionary, with the key set to the test -        result status id and the value set to the list of test method -        results that match the status id. -        """ - -        # Get max length for category printed name -        category_with_max_printed_name = max( -            categories, key=lambda x: len(x[1])) -        max_category_name_length = len(category_with_max_printed_name[1]) - -        # If we are provided with extra rows, consider these row name lengths. -        if extra_rows is not None: -            for row in extra_rows: -                name_length = len(row[0]) -                if name_length > max_category_name_length: -                    max_category_name_length = name_length - -        self._print_banner(out_file, "Test Result Summary") - -        # Prepend extra rows -        if extra_rows is not None: -            for row in extra_rows: -                extra_label = "{}:".format(row[0]).ljust( -                    max_category_name_length + 1) -                out_file.write("{} {:4}\n".format(extra_label, row[1])) - -        for category in categories: -            result_status_id = category[0] -            result_label = "{}:".format(category[1]).ljust( -                max_category_name_length + 1) -            count = len(result_events_by_status[result_status_id]) -            out_file.write("{} {:4}\n".format( -                result_label, -                count)) - -    @classmethod -    def _has_printable_details(cls, categories, result_events_by_status): -        """Returns whether there are any test result details that need to be printed. - -        This will spin through the results and see if any result in a category -        that is printable has any results to print. - -        @param categories the list of categories on which to partition. -        Follows the format described in _report_category_details(). - -        @param result_events_by_status the partitioned list of test -        result events in a dictionary, with the key set to the test -        result status id and the value set to the list of test method -        results that match the status id. - -        @return True if there are any details (i.e. test results -        for failures, errors, unexpected successes); False otherwise. -        """ -        for category in categories: -            result_status_id = category[0] -            print_matching_tests = category[2] -            if print_matching_tests: -                if len(result_events_by_status[result_status_id]) > 0: -                    # We found a printable details test result status -                    # that has details to print. -                    return True -        # We didn't find any test result category with printable -        # details. -        return False - -    @staticmethod -    def _report_category_details(out_file, category, result_events_by_status): -        """Reports all test results matching the given category spec. - -        @param out_file a file-like object used to print output. - -        @param category a category spec of the format [test_event_name, -        printed_category_name, print_matching_entries?] - -        @param result_events_by_status the partitioned list of test -        result events in a dictionary, with the key set to the test -        result status id and the value set to the list of test method -        results that match the status id. -        """ -        result_status_id = category[0] -        print_matching_tests = category[2] -        detail_label = category[3] - -        if print_matching_tests: -            # Sort by test name -            for (_, event) in result_events_by_status[result_status_id]: -                # Convert full test path into test-root-relative. -                test_relative_path = os.path.relpath( -                    os.path.realpath(event["test_filename"]), -                    lldbsuite.lldb_test_root) - -                # Create extra info component (used for exceptional exit info) -                if result_status_id == EventBuilder.STATUS_EXCEPTIONAL_EXIT: -                    extra_info = "[EXCEPTIONAL EXIT {} ({})] ".format( -                        event["exception_code"], -                        event["exception_description"]) -                else: -                    extra_info = "" - -                # Figure out the identity we will use for this test. -                if configuration.verbose and ("test_class" in event): -                    test_id = "{}.{}".format( -                        event["test_class"], event["test_name"]) -                elif "test_name" in event: -                    test_id = event["test_name"] -                else: -                    test_id = "<no_running_test_method>" - -                # Display the info. -                out_file.write("{}: {}{} ({})\n".format( -                    detail_label, -                    extra_info, -                    test_id, -                    test_relative_path)) - -    def print_results(self, out_file): -        """Writes the test result report to the output file. - -        @param out_file a file-like object used for printing summary -        results.  This is different than self.out_file, which might -        be something else for non-summary data. -        """ -        extra_results = [ -            # Total test methods processed, excluding reruns. -            ["Test Methods", len(self.result_events)], -            ["Reruns", self.test_method_rerun_count]] - -        # Output each of the test result entries. -        categories = [ -            # result id, printed name, print matching tests?, detail label -            [EventBuilder.STATUS_SUCCESS, -             "Success", False, None], -            [EventBuilder.STATUS_EXPECTED_FAILURE, -             "Expected Failure", False, None], -            [EventBuilder.STATUS_FAILURE, -             "Failure", True, "FAIL"], -            [EventBuilder.STATUS_ERROR, -             "Error", True, "ERROR"], -            [EventBuilder.STATUS_EXCEPTIONAL_EXIT, -             "Exceptional Exit", True, "ERROR"], -            [EventBuilder.STATUS_UNEXPECTED_SUCCESS, -             "Unexpected Success", True, "UNEXPECTED SUCCESS"], -            [EventBuilder.STATUS_SKIP, "Skip", False, None], -            [EventBuilder.STATUS_TIMEOUT, -             "Timeout", True, "TIMEOUT"], -            [EventBuilder.STATUS_EXPECTED_TIMEOUT, -             # Intentionally using the unusual hyphenation in TIME-OUT to -             # prevent buildbots from thinking it is an issue when scanning -             # for TIMEOUT. -             "Expected Timeout", True, "EXPECTED TIME-OUT"] -        ] - -        # Partition all the events by test result status -        result_events_by_status = self._partition_results_by_status( -            categories) - -        # Print the details -        have_details = self._has_printable_details( -            categories, result_events_by_status) -        if have_details: -            self._print_banner(out_file, "Issue Details") -            for category in categories: -                self._report_category_details( -                    out_file, category, result_events_by_status) - -        # Print the summary -        self._print_summary_counts( -            out_file, categories, result_events_by_status, extra_results) - -        if self.options.dump_results: -            # Debug dump of the key/result info for all categories. -            self._print_banner(out_file, "Results Dump") -            for status, events_by_key in result_events_by_status.items(): -                out_file.write("\nSTATUS: {}\n".format(status)) -                for key, event in events_by_key: -                    out_file.write("key:   {}\n".format(key)) -                    out_file.write("event: {}\n".format(event)) - -    def clear_file_level_issues(self, tests_for_rerun, out_file): -        """Clear file-charged issues in any of the test rerun files. - -        @param tests_for_rerun the list of test-dir-relative paths that have -        functions that require rerunning.  This is the test list -        returned by the results_formatter at the end of the previous run. - -        @return the number of file-level issues that were cleared. -        """ -        if tests_for_rerun is None: -            return 0 - -        cleared_file_level_issues = 0 -        # Find the unique set of files that are covered by the given tests -        # that are to be rerun.  We derive the files that are eligible for -        # having their markers cleared, because we support running in a mode -        # where only flaky tests are eligible for rerun.  If the file-level -        # issue occurred in a file that was not marked as flaky, then we -        # shouldn't be clearing the event here. -        basename_set = set() -        for test_file_relpath in tests_for_rerun: -            basename_set.add(os.path.basename(test_file_relpath)) - -        # Find all the keys for file-level events that are considered -        # test issues. -        file_level_issues = [(key, event) -                             for key, event in self.result_events.items() -                             if ResultsFormatter._is_file_level_issue( -                                     key, event) -                             and event.get("status", "") in -                             EventBuilder.TESTRUN_ERROR_STATUS_VALUES] - -        # Now remove any file-level error for the given test base name. -        for key, event in file_level_issues: -            # If the given file base name is in the rerun set, then we -            # clear that entry from the result set. -            if os.path.basename(key) in basename_set: -                self.result_events.pop(key, None) -                cleared_file_level_issues += 1 -                if out_file is not None: -                    out_file.write( -                        "clearing file-level issue for file {} " -                        "(issue type: {})" -                        .format(key, event.get("status", "<unset-status>"))) - -        return cleared_file_level_issues diff --git a/packages/Python/lldbsuite/test_event/formatter/xunit.py b/packages/Python/lldbsuite/test_event/formatter/xunit.py deleted file mode 100644 index 4c53ff8062d7..000000000000 --- a/packages/Python/lldbsuite/test_event/formatter/xunit.py +++ /dev/null @@ -1,596 +0,0 @@ -""" -                     The LLVM Compiler Infrastructure - -This file is distributed under the University of Illinois Open Source -License. See LICENSE.TXT for details. - -Provides an xUnit ResultsFormatter for integrating the LLDB -test suite with the Jenkins xUnit aggregator and other xUnit-compliant -test output processors. -""" -from __future__ import absolute_import -from __future__ import print_function - -# System modules -import re -import sys -import xml.sax.saxutils - -# Third-party modules -import six - -# Local modules -from ..event_builder import EventBuilder -from ..build_exception import BuildError -from .results_formatter import ResultsFormatter - - -class XunitFormatter(ResultsFormatter): -    """Provides xUnit-style formatted output. -    """ - -    # Result mapping arguments -    RM_IGNORE = 'ignore' -    RM_SUCCESS = 'success' -    RM_FAILURE = 'failure' -    RM_PASSTHRU = 'passthru' - -    @staticmethod -    def _build_illegal_xml_regex(): -        """Constructs a regex to match all illegal xml characters. - -        Expects to be used against a unicode string.""" -        # Construct the range pairs of invalid unicode characters. -        illegal_chars_u = [ -            (0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), (0x7F, 0x84), -            (0x86, 0x9F), (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)] - -        # For wide builds, we have more. -        if sys.maxunicode >= 0x10000: -            illegal_chars_u.extend( -                [(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), (0x3FFFE, 0x3FFFF), -                 (0x4FFFE, 0x4FFFF), (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF), -                 (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), (0x9FFFE, 0x9FFFF), -                 (0xAFFFE, 0xAFFFF), (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF), -                 (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), (0xFFFFE, 0xFFFFF), -                 (0x10FFFE, 0x10FFFF)]) - -        # Build up an array of range expressions. -        illegal_ranges = [ -            "%s-%s" % (six.unichr(low), six.unichr(high)) -            for (low, high) in illegal_chars_u] - -        # Compile the regex -        return re.compile(six.u('[%s]') % six.u('').join(illegal_ranges)) - -    @staticmethod -    def _quote_attribute(text): -        """Returns the given text in a manner safe for usage in an XML attribute. - -        @param text the text that should appear within an XML attribute. -        @return the attribute-escaped version of the input text. -        """ -        return xml.sax.saxutils.quoteattr(text) - -    def _replace_invalid_xml(self, str_or_unicode): -        """Replaces invalid XML characters with a '?'. - -        @param str_or_unicode a string to replace invalid XML -        characters within.  Can be unicode or not.  If not unicode, -        assumes it is a byte string in utf-8 encoding. - -        @returns a utf-8-encoded byte string with invalid -        XML replaced with '?'. -        """ -        # Get the content into unicode -        if isinstance(str_or_unicode, str): -            # If we hit decoding errors due to data corruption, replace the -            # invalid characters with U+FFFD REPLACEMENT CHARACTER. -            unicode_content = str_or_unicode.decode('utf-8', 'replace') -        else: -            unicode_content = str_or_unicode -        return self.invalid_xml_re.sub( -            six.u('?'), unicode_content).encode('utf-8') - -    @classmethod -    def arg_parser(cls): -        """@return arg parser used to parse formatter-specific options.""" -        parser = super(XunitFormatter, cls).arg_parser() - -        # These are valid choices for results mapping. -        results_mapping_choices = [ -            XunitFormatter.RM_IGNORE, -            XunitFormatter.RM_SUCCESS, -            XunitFormatter.RM_FAILURE, -            XunitFormatter.RM_PASSTHRU] -        parser.add_argument( -            "--assert-on-unknown-events", -            action="store_true", -            help=('cause unknown test events to generate ' -                  'a python assert.  Default is to ignore.')) -        parser.add_argument( -            "--ignore-skip-name", -            "-n", -            metavar='PATTERN', -            action="append", -            dest='ignore_skip_name_patterns', -            help=('a python regex pattern, where ' -                  'any skipped test with a test method name where regex ' -                  'matches (via search) will be ignored for xUnit test ' -                  'result purposes.  Can be specified multiple times.')) -        parser.add_argument( -            "--ignore-skip-reason", -            "-r", -            metavar='PATTERN', -            action="append", -            dest='ignore_skip_reason_patterns', -            help=('a python regex pattern, where ' -                  'any skipped test with a skip reason where the regex ' -                  'matches (via search) will be ignored for xUnit test ' -                  'result purposes.  Can be specified multiple times.')) -        parser.add_argument( -            "--xpass", action="store", choices=results_mapping_choices, -            default=XunitFormatter.RM_FAILURE, -            help=('specify mapping from unexpected success to jUnit/xUnit ' -                  'result type')) -        parser.add_argument( -            "--xfail", action="store", choices=results_mapping_choices, -            default=XunitFormatter.RM_IGNORE, -            help=('specify mapping from expected failure to jUnit/xUnit ' -                  'result type')) -        return parser - -    @staticmethod -    def _build_regex_list_from_patterns(patterns): -        """Builds a list of compiled regular expressions from option value. - -        @param patterns contains a list of regular expression -        patterns. - -        @return list of compiled regular expressions, empty if no -        patterns provided. -        """ -        regex_list = [] -        if patterns is not None: -            for pattern in patterns: -                regex_list.append(re.compile(pattern)) -        return regex_list - -    def __init__(self, out_file, options, file_is_stream): -        """Initializes the XunitFormatter instance. -        @param out_file file-like object where formatted output is written. -        @param options specifies a dictionary of options for the -        formatter. -        """ -        # Initialize the parent -        super(XunitFormatter, self).__init__(out_file, options, file_is_stream) -        self.text_encoding = "UTF-8" -        self.invalid_xml_re = XunitFormatter._build_illegal_xml_regex() -        self.total_test_count = 0 -        self.ignore_skip_name_regexes = ( -            XunitFormatter._build_regex_list_from_patterns( -                options.ignore_skip_name_patterns)) -        self.ignore_skip_reason_regexes = ( -            XunitFormatter._build_regex_list_from_patterns( -                options.ignore_skip_reason_patterns)) - -        self.elements = { -            "successes": [], -            "errors": [], -            "failures": [], -            "skips": [], -            "unexpected_successes": [], -            "expected_failures": [], -            "all": [] -        } - -        self.status_handlers = { -            EventBuilder.STATUS_SUCCESS: self._handle_success, -            EventBuilder.STATUS_FAILURE: self._handle_failure, -            EventBuilder.STATUS_ERROR: self._handle_error, -            EventBuilder.STATUS_SKIP: self._handle_skip, -            EventBuilder.STATUS_EXPECTED_FAILURE: -                self._handle_expected_failure, -            EventBuilder.STATUS_EXPECTED_TIMEOUT: -                self._handle_expected_timeout, -            EventBuilder.STATUS_UNEXPECTED_SUCCESS: -                self._handle_unexpected_success, -            EventBuilder.STATUS_EXCEPTIONAL_EXIT: -                self._handle_exceptional_exit, -            EventBuilder.STATUS_TIMEOUT: -                self._handle_timeout -        } - -    RESULT_TYPES = { -        EventBuilder.TYPE_TEST_RESULT, -        EventBuilder.TYPE_JOB_RESULT} - -    def handle_event(self, test_event): -        super(XunitFormatter, self).handle_event(test_event) - -        event_type = test_event["event"] -        if event_type is None: -            return - -        if event_type == "terminate": -            # Process all the final result events into their -            # XML counterparts. -            for result_event in self.result_events.values(): -                self._process_test_result(result_event) -            self._finish_output() -        else: -            # This is an unknown event. -            if self.options.assert_on_unknown_events: -                raise Exception("unknown event type {} from {}\n".format( -                    event_type, test_event)) - -    def _handle_success(self, test_event): -        """Handles a test success. -        @param test_event the test event to handle. -        """ -        result = self._common_add_testcase_entry(test_event) -        with self.lock: -            self.elements["successes"].append(result) - -    def _handle_failure(self, test_event): -        """Handles a test failure. -        @param test_event the test event to handle. -        """ -        message = self._replace_invalid_xml(test_event["issue_message"]) -        backtrace = self._replace_invalid_xml( -            "".join(test_event.get("issue_backtrace", []))) - -        result = self._common_add_testcase_entry( -            test_event, -            inner_content=( -                '<failure type={} message={}><![CDATA[{}]]></failure>'.format( -                    XunitFormatter._quote_attribute(test_event["issue_class"]), -                    XunitFormatter._quote_attribute(message), -                    backtrace) -            )) -        with self.lock: -            self.elements["failures"].append(result) - -    def _handle_error_build(self, test_event): -        """Handles a test error. -        @param test_event the test event to handle. -        """ -        message = self._replace_invalid_xml(test_event["issue_message"]) -        build_issue_description = self._replace_invalid_xml( -            BuildError.format_build_error( -                test_event.get("build_command", "<None>"), -                test_event.get("build_error", "<None>"))) - -        result = self._common_add_testcase_entry( -            test_event, -            inner_content=( -                '<error type={} message={}><![CDATA[{}]]></error>'.format( -                    XunitFormatter._quote_attribute(test_event["issue_class"]), -                    XunitFormatter._quote_attribute(message), -                    build_issue_description) -            )) -        with self.lock: -            self.elements["errors"].append(result) - -    def _handle_error_standard(self, test_event): -        """Handles a test error. -        @param test_event the test event to handle. -        """ -        message = self._replace_invalid_xml(test_event["issue_message"]) -        backtrace = self._replace_invalid_xml( -            "".join(test_event.get("issue_backtrace", []))) - -        result = self._common_add_testcase_entry( -            test_event, -            inner_content=( -                '<error type={} message={}><![CDATA[{}]]></error>'.format( -                    XunitFormatter._quote_attribute(test_event["issue_class"]), -                    XunitFormatter._quote_attribute(message), -                    backtrace) -            )) -        with self.lock: -            self.elements["errors"].append(result) - -    def _handle_error(self, test_event): -        if test_event.get("issue_phase", None) == "build": -            self._handle_error_build(test_event) -        else: -            self._handle_error_standard(test_event) - -    def _handle_exceptional_exit(self, test_event): -        """Handles an exceptional exit. -        @param test_event the test method or job result event to handle. -        """ -        if "test_name" in test_event: -            name = test_event["test_name"] -        else: -            name = test_event.get("test_filename", "<unknown test/filename>") - -        message_text = "ERROR: {} ({}): {}".format( -            test_event.get("exception_code", 0), -            test_event.get("exception_description", ""), -            name) -        message = self._replace_invalid_xml(message_text) - -        result = self._common_add_testcase_entry( -            test_event, -            inner_content=( -                '<error type={} message={}></error>'.format( -                    "exceptional_exit", -                    XunitFormatter._quote_attribute(message)) -            )) -        with self.lock: -            self.elements["errors"].append(result) - -    def _handle_timeout(self, test_event): -        """Handles a test method or job timeout. -        @param test_event the test method or job result event to handle. -        """ -        if "test_name" in test_event: -            name = test_event["test_name"] -        else: -            name = test_event.get("test_filename", "<unknown test/filename>") - -        message_text = "TIMEOUT: {}".format(name) -        message = self._replace_invalid_xml(message_text) - -        result = self._common_add_testcase_entry( -            test_event, -            inner_content=( -                '<error type={} message={}></error>'.format( -                    XunitFormatter._quote_attribute("timeout"), -                    XunitFormatter._quote_attribute(message)) -            )) -        with self.lock: -            self.elements["errors"].append(result) - -    @staticmethod -    def _ignore_based_on_regex_list(test_event, test_key, regex_list): -        """Returns whether to ignore a test event based on patterns. - -        @param test_event the test event dictionary to check. -        @param test_key the key within the dictionary to check. -        @param regex_list a list of zero or more regexes.  May contain -        zero or more compiled regexes. - -        @return True if any o the regex list match based on the -        re.search() method; false otherwise. -        """ -        for regex in regex_list: -            match = regex.search(test_event.get(test_key, '')) -            if match: -                return True -        return False - -    def _handle_skip(self, test_event): -        """Handles a skipped test. -        @param test_event the test event to handle. -        """ - -        # Are we ignoring this test based on test name? -        if XunitFormatter._ignore_based_on_regex_list( -                test_event, 'test_name', self.ignore_skip_name_regexes): -            return - -        # Are we ignoring this test based on skip reason? -        if XunitFormatter._ignore_based_on_regex_list( -                test_event, 'skip_reason', self.ignore_skip_reason_regexes): -            return - -        # We're not ignoring this test.  Process the skip. -        reason = self._replace_invalid_xml(test_event.get("skip_reason", "")) -        result = self._common_add_testcase_entry( -            test_event, -            inner_content='<skipped message={} />'.format( -                XunitFormatter._quote_attribute(reason))) -        with self.lock: -            self.elements["skips"].append(result) - -    def _handle_expected_failure(self, test_event): -        """Handles a test that failed as expected. -        @param test_event the test event to handle. -        """ -        if self.options.xfail == XunitFormatter.RM_PASSTHRU: -            # This is not a natively-supported junit/xunit -            # testcase mode, so it might fail a validating -            # test results viewer. -            if "bugnumber" in test_event: -                bug_id_attribute = 'bug-id={} '.format( -                    XunitFormatter._quote_attribute(test_event["bugnumber"])) -            else: -                bug_id_attribute = '' - -            result = self._common_add_testcase_entry( -                test_event, -                inner_content=( -                    '<expected-failure {}type={} message={} />'.format( -                        bug_id_attribute, -                        XunitFormatter._quote_attribute( -                            test_event["issue_class"]), -                        XunitFormatter._quote_attribute( -                            test_event["issue_message"])) -                )) -            with self.lock: -                self.elements["expected_failures"].append(result) -        elif self.options.xfail == XunitFormatter.RM_SUCCESS: -            result = self._common_add_testcase_entry(test_event) -            with self.lock: -                self.elements["successes"].append(result) -        elif self.options.xfail == XunitFormatter.RM_FAILURE: -            result = self._common_add_testcase_entry( -                test_event, -                inner_content='<failure type={} message={} />'.format( -                    XunitFormatter._quote_attribute(test_event["issue_class"]), -                    XunitFormatter._quote_attribute( -                        test_event["issue_message"]))) -            with self.lock: -                self.elements["failures"].append(result) -        elif self.options.xfail == XunitFormatter.RM_IGNORE: -            pass -        else: -            raise Exception( -                "unknown xfail option: {}".format(self.options.xfail)) - -    @staticmethod -    def _handle_expected_timeout(test_event): -        """Handles expected_timeout. -        @param test_event the test event to handle. -        """ -        # We don't do anything with expected timeouts, not even report. -        pass - -    def _handle_unexpected_success(self, test_event): -        """Handles a test that passed but was expected to fail. -        @param test_event the test event to handle. -        """ -        if self.options.xpass == XunitFormatter.RM_PASSTHRU: -            # This is not a natively-supported junit/xunit -            # testcase mode, so it might fail a validating -            # test results viewer. -            result = self._common_add_testcase_entry( -                test_event, -                inner_content="<unexpected-success />") -            with self.lock: -                self.elements["unexpected_successes"].append(result) -        elif self.options.xpass == XunitFormatter.RM_SUCCESS: -            # Treat the xpass as a success. -            result = self._common_add_testcase_entry(test_event) -            with self.lock: -                self.elements["successes"].append(result) -        elif self.options.xpass == XunitFormatter.RM_FAILURE: -            # Treat the xpass as a failure. -            if "bugnumber" in test_event: -                message = "unexpected success (bug_id:{})".format( -                    test_event["bugnumber"]) -            else: -                message = "unexpected success (bug_id:none)" -            result = self._common_add_testcase_entry( -                test_event, -                inner_content='<failure type={} message={} />'.format( -                    XunitFormatter._quote_attribute("unexpected_success"), -                    XunitFormatter._quote_attribute(message))) -            with self.lock: -                self.elements["failures"].append(result) -        elif self.options.xpass == XunitFormatter.RM_IGNORE: -            # Ignore the xpass result as far as xUnit reporting goes. -            pass -        else: -            raise Exception("unknown xpass option: {}".format( -                self.options.xpass)) - -    def _process_test_result(self, test_event): -        """Processes the test_event known to be a test result. - -        This categorizes the event appropriately and stores the data needed -        to generate the final xUnit report.  This method skips events that -        cannot be represented in xUnit output. -        """ -        if "status" not in test_event: -            raise Exception("test event dictionary missing 'status' key") - -        status = test_event["status"] -        if status not in self.status_handlers: -            raise Exception("test event status '{}' unsupported".format( -                status)) - -        # Call the status handler for the test result. -        self.status_handlers[status](test_event) - -    def _common_add_testcase_entry(self, test_event, inner_content=None): -        """Registers a testcase result, and returns the text created. - -        The caller is expected to manage failure/skip/success counts -        in some kind of appropriate way.  This call simply constructs -        the XML and appends the returned result to the self.all_results -        list. - -        @param test_event the test event dictionary. - -        @param inner_content if specified, gets included in the <testcase> -        inner section, at the point before stdout and stderr would be -        included.  This is where a <failure/>, <skipped/>, <error/>, etc. -        could go. - -        @return the text of the xml testcase element. -        """ - -        # Get elapsed time. -        test_class = test_event.get("test_class", "<no_class>") -        test_name = test_event.get("test_name", "<no_test_method>") -        event_time = test_event["event_time"] -        time_taken = self.elapsed_time_for_test( -            test_class, test_name, event_time) - -        # Plumb in stdout/stderr once we shift over to only test results. -        test_stdout = '' -        test_stderr = '' - -        # Formulate the output xml. -        if not inner_content: -            inner_content = "" -        result = ( -            '<testcase classname="{}" name="{}" time="{:.3f}">' -            '{}{}{}</testcase>'.format( -                test_class, -                test_name, -                time_taken, -                inner_content, -                test_stdout, -                test_stderr)) - -        # Save the result, update total test count. -        with self.lock: -            self.total_test_count += 1 -            self.elements["all"].append(result) - -        return result - -    def _finish_output_no_lock(self): -        """Flushes out the report of test executions to form valid xml output. - -        xUnit output is in XML.  The reporting system cannot complete the -        formatting of the output without knowing when there is no more input. -        This call addresses notification of the completed test run and thus is -        when we can finish off the report output. -        """ - -        # Figure out the counts line for the testsuite.  If we have -        # been counting either unexpected successes or expected -        # failures, we'll output those in the counts, at the risk of -        # being invalidated by a validating test results viewer. -        # These aren't counted by default so they won't show up unless -        # the user specified a formatter option to include them. -        xfail_count = len(self.elements["expected_failures"]) -        xpass_count = len(self.elements["unexpected_successes"]) -        if xfail_count > 0 or xpass_count > 0: -            extra_testsuite_attributes = ( -                ' expected-failures="{}"' -                ' unexpected-successes="{}"'.format(xfail_count, xpass_count)) -        else: -            extra_testsuite_attributes = "" - -        # Output the header. -        self.out_file.write( -            '<?xml version="1.0" encoding="{}"?>\n' -            '<testsuites>' -            '<testsuite name="{}" tests="{}" errors="{}" failures="{}" ' -            'skip="{}"{}>\n'.format( -                self.text_encoding, -                "LLDB test suite", -                self.total_test_count, -                len(self.elements["errors"]), -                len(self.elements["failures"]), -                len(self.elements["skips"]), -                extra_testsuite_attributes)) - -        # Output each of the test result entries. -        for result in self.elements["all"]: -            self.out_file.write(result + '\n') - -        # Close off the test suite. -        self.out_file.write('</testsuite></testsuites>\n') - -    def _finish_output(self): -        """Finish writing output as all incoming events have arrived.""" -        with self.lock: -            self._finish_output_no_lock() diff --git a/packages/Python/lldbsuite/test_event/test/resources/invalid_decorator/TestInvalidDecorator.py b/packages/Python/lldbsuite/test_event/test/resources/invalid_decorator/TestInvalidDecorator.py deleted file mode 100644 index 7f5c4cb79cf5..000000000000 --- a/packages/Python/lldbsuite/test_event/test/resources/invalid_decorator/TestInvalidDecorator.py +++ /dev/null @@ -1,13 +0,0 @@ -from __future__ import print_function -from lldbsuite.test import lldbtest -from lldbsuite.test import decorators - - -class NonExistentDecoratorTestCase(lldbtest.TestBase): - -    mydir = lldbtest.TestBase.compute_mydir(__file__) - -    @decorators.nonExistentDecorator(bugnumber="yt/1300") -    def test(self): -        """Verify non-existent decorators are picked up by test runner.""" -        pass diff --git a/packages/Python/lldbsuite/test_event/test/src/TestCatchInvalidDecorator.py b/packages/Python/lldbsuite/test_event/test/src/TestCatchInvalidDecorator.py deleted file mode 100644 index 5b199defc5df..000000000000 --- a/packages/Python/lldbsuite/test_event/test/src/TestCatchInvalidDecorator.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env python -""" -Tests that the event system reports issues during decorator -handling as errors. -""" -# System-provided imports -import os -import unittest - -# Local-provided imports -import event_collector - - -class TestCatchInvalidDecorator(unittest.TestCase): - -    TEST_DIR = os.path.join( -        os.path.dirname(__file__), -        os.path.pardir, -        "resources", -        "invalid_decorator") - -    def test_with_whole_file(self): -        """ -        Test that a non-existent decorator generates a test-event error -        when running all tests in the file. -        """ -        # Determine the test case file we're using. -        test_file = os.path.join(self.TEST_DIR, "TestInvalidDecorator.py") - -        # Collect all test events generated for this file. -        error_results = _filter_error_results( -            event_collector.collect_events_whole_file(test_file)) - -        self.assertGreater( -            len(error_results), -            0, -            "At least one job or test error result should have been returned") - -    def test_with_function_filter(self): -        """ -        Test that a non-existent decorator generates a test-event error -        when running a filtered test. -        """ -        # Collect all test events generated during running of tests -        # in a given directory using a test name filter.  Internally, -        # this runs through a different code path that needs to be -        # set up to catch exceptions. -        error_results = _filter_error_results( -            event_collector.collect_events_for_directory_with_filter( -                self.TEST_DIR, -                "NonExistentDecoratorTestCase.test")) - -        self.assertGreater( -            len(error_results), -            0, -            "At least one job or test error result should have been returned") - - -def _filter_error_results(events): -    # Filter out job result events. -    return [ -        event -        for event in events -        if event.get("event", None) in ["job_result", "test_result"] and -        event.get("status", None) == "error" -    ] - - -if __name__ == "__main__": -    unittest.main() diff --git a/packages/Python/lldbsuite/test_event/test/src/event_collector.py b/packages/Python/lldbsuite/test_event/test/src/event_collector.py deleted file mode 100644 index 6b64cc71ac67..000000000000 --- a/packages/Python/lldbsuite/test_event/test/src/event_collector.py +++ /dev/null @@ -1,85 +0,0 @@ -from __future__ import absolute_import -from __future__ import print_function - -import os -import subprocess -import sys -import tempfile - -# noinspection PyUnresolvedReferences -from six.moves import cPickle - - -def path_to_dotest_py(): -    return os.path.join( -        os.path.dirname(__file__), -        os.path.pardir, -        os.path.pardir, -        os.path.pardir, -        os.path.pardir, -        os.path.pardir, -        os.path.pardir, -        "test", -        "dotest.py") - - -def _make_pickled_events_filename(): -    with tempfile.NamedTemporaryFile( -            prefix="lldb_test_event_pickled_event_output", -            delete=False) as temp_file: -        return temp_file.name - - -def _collect_events_with_command(command, events_filename): -    # Run the single test with dotest.py, outputting -    # the raw pickled events to a temp file. -    with open(os.devnull, 'w') as dev_null_file: -        subprocess.call( -            command, -            stdout=dev_null_file, -            stderr=dev_null_file) - -    # Unpickle the events -    events = [] -    if os.path.exists(events_filename): -        with open(events_filename, "rb") as events_file: -            while True: -                try: -                    # print("reading event") -                    event = cPickle.load(events_file) -                    # print("read event: {}".format(event)) -                    if event: -                        events.append(event) -                except EOFError: -                    # This is okay. -                    break -        os.remove(events_filename) -    return events - - -def collect_events_whole_file(test_filename): -    events_filename = _make_pickled_events_filename() -    command = [ -        sys.executable, -        path_to_dotest_py(), -        "--inferior", -        "--results-formatter=lldbsuite.test_event.formatter.pickled.RawPickledFormatter", -        "--results-file={}".format(events_filename), -        "-p", -        os.path.basename(test_filename), -        os.path.dirname(test_filename)] -    return _collect_events_with_command(command, events_filename) - - -def collect_events_for_directory_with_filter(test_filename, filter_desc): -    events_filename = _make_pickled_events_filename() -    command = [ -        sys.executable, -        path_to_dotest_py(), -        "--inferior", -        "--results-formatter=lldbsuite.test_event.formatter.pickled.RawPickledFormatter", -        "--results-file={}".format(events_filename), -        "-f", -        filter_desc, -        os.path.dirname(test_filename)] -    return _collect_events_with_command(command, events_filename) | 
