diff options
Diffstat (limited to 'utils/lit/lit/run.py')
-rw-r--r-- | utils/lit/lit/run.py | 224 |
1 files changed, 196 insertions, 28 deletions
diff --git a/utils/lit/lit/run.py b/utils/lit/lit/run.py index f7e84d316a7c..14d8ec98490e 100644 --- a/utils/lit/lit/run.py +++ b/utils/lit/lit/run.py @@ -1,4 +1,5 @@ import os +import sys import threading import time import traceback @@ -84,11 +85,13 @@ class Tester(object): def run_test(self, test_index): test = self.run_instance.tests[test_index] try: - self.run_instance.execute_test(test) + execute_test(test, self.run_instance.lit_config, + self.run_instance.parallelism_semaphores) except KeyboardInterrupt: # This is a sad hack. Unfortunately subprocess goes # bonkers with ctrl-c and we start forking merrily. print('\nCtrl-C detected, goodbye.') + sys.stdout.flush() os.kill(0,9) self.consumer.update(test_index, test) @@ -167,6 +170,44 @@ class _Display(object): def handleFailures(provider, consumer, maxFailures): consumer.display = _Display(consumer.display, provider, maxFailures) +def execute_test(test, lit_config, parallelism_semaphores): + """Execute one test""" + pg = test.config.parallelism_group + if callable(pg): + pg = pg(test) + + result = None + semaphore = None + try: + if pg: + semaphore = parallelism_semaphores[pg] + if semaphore: + semaphore.acquire() + start_time = time.time() + result = test.config.test_format.execute(test, lit_config) + # Support deprecated result from execute() which returned the result + # code and additional output as a tuple. + if isinstance(result, tuple): + code, output = result + result = lit.Test.Result(code, output) + elif not isinstance(result, lit.Test.Result): + raise ValueError("unexpected result from test execution") + result.elapsed = time.time() - start_time + except KeyboardInterrupt: + raise + except: + if lit_config.debug: + raise + output = 'Exception during script execution:\n' + output += traceback.format_exc() + output += '\n' + result = lit.Test.Result(lit.Test.UNRESOLVED, output) + finally: + if semaphore: + semaphore.release() + + test.setResult(result) + class Run(object): """ This class represents a concrete, configured testing run. @@ -177,33 +218,10 @@ class Run(object): self.tests = tests def execute_test(self, test): - result = None - start_time = time.time() - try: - result = test.config.test_format.execute(test, self.lit_config) - - # Support deprecated result from execute() which returned the result - # code and additional output as a tuple. - if isinstance(result, tuple): - code, output = result - result = lit.Test.Result(code, output) - elif not isinstance(result, lit.Test.Result): - raise ValueError("unexpected result from test execution") - except KeyboardInterrupt: - raise - except: - if self.lit_config.debug: - raise - output = 'Exception during script execution:\n' - output += traceback.format_exc() - output += '\n' - result = lit.Test.Result(lit.Test.UNRESOLVED, output) - result.elapsed = time.time() - start_time - - test.setResult(result) + return execute_test(test, self.lit_config, self.parallelism_semaphores) def execute_tests(self, display, jobs, max_time=None, - use_processes=False): + execution_strategy=None): """ execute_tests(display, jobs, [max_time]) @@ -225,12 +243,21 @@ class Run(object): be given an UNRESOLVED result. """ + if execution_strategy == 'PROCESS_POOL': + self.execute_tests_with_mp_pool(display, jobs, max_time) + return + # FIXME: Standardize on the PROCESS_POOL execution strategy and remove + # the other two strategies. + + use_processes = execution_strategy == 'PROCESSES' + # Choose the appropriate parallel execution implementation. consumer = None if jobs != 1 and use_processes and multiprocessing: try: task_impl = multiprocessing.Process queue_impl = multiprocessing.Queue + sem_impl = multiprocessing.Semaphore canceled_flag = multiprocessing.Value('i', 0) consumer = MultiprocessResultsConsumer(self, display, jobs) except: @@ -242,15 +269,19 @@ class Run(object): if not consumer: task_impl = threading.Thread queue_impl = queue.Queue + sem_impl = threading.Semaphore canceled_flag = LockedValue(0) consumer = ThreadResultsConsumer(display) + self.parallelism_semaphores = {k: sem_impl(v) + for k, v in self.lit_config.parallelism_groups.items()} + # Create the test provider. provider = TestProvider(queue_impl, canceled_flag) handleFailures(provider, consumer, self.lit_config.maxFailures) - # Queue the tests outside the main thread because we can't guarantee - # that we can put() all the tests without blocking: + # Putting tasks into the threading or multiprocessing Queue may block, + # so do it in a separate thread. # https://docs.python.org/2/library/multiprocessing.html # e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue # without taking any out. @@ -303,3 +334,140 @@ class Run(object): # Wait for all the tasks to complete. for t in tasks: t.join() + + def execute_tests_with_mp_pool(self, display, jobs, max_time=None): + # Don't do anything if we aren't going to run any tests. + if not self.tests or jobs == 0: + return + + # Set up semaphores to limit parallelism of certain classes of tests. + # For example, some ASan tests require lots of virtual memory and run + # faster with less parallelism on OS X. + self.parallelism_semaphores = \ + {k: multiprocessing.Semaphore(v) for k, v in + self.lit_config.parallelism_groups.items()} + + # Install a console-control signal handler on Windows. + if win32api is not None: + def console_ctrl_handler(type): + print('\nCtrl-C detected, terminating.') + pool.terminate() + pool.join() + os.kill(0,9) + return True + win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) + + # Save the display object on the runner so that we can update it from + # our task completion callback. + self.display = display + + # We need to issue many wait calls, so compute the final deadline and + # subtract time.time() from that as we go along. + deadline = None + if max_time: + deadline = time.time() + max_time + + # Start a process pool. Copy over the data shared between all test runs. + pool = multiprocessing.Pool(jobs, worker_initializer, + (self.lit_config, + self.parallelism_semaphores)) + + try: + self.failure_count = 0 + self.hit_max_failures = False + async_results = [pool.apply_async(worker_run_one_test, + args=(test_index, test), + callback=self.consume_test_result) + for test_index, test in enumerate(self.tests)] + + # Wait for all results to come in. The callback that runs in the + # parent process will update the display. + for a in async_results: + if deadline: + a.wait(deadline - time.time()) + else: + # Python condition variables cannot be interrupted unless + # they have a timeout. This can make lit unresponsive to + # KeyboardInterrupt, so do a busy wait with a timeout. + while not a.ready(): + a.wait(1) + if not a.successful(): + a.get() # Exceptions raised here come from the worker. + if self.hit_max_failures: + break + finally: + # Stop the workers and wait for any straggling results to come in + # if we exited without waiting on every async result. + pool.terminate() + pool.join() + + # Mark any tests that weren't run as UNRESOLVED. + for test in self.tests: + if test.result is None: + test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) + + def consume_test_result(self, pool_result): + """Test completion callback for worker_run_one_test + + Updates the test result status in the parent process. Each task in the + pool returns the test index and the result, and we use the index to look + up the original test object. Also updates the progress bar as tasks + complete. + """ + # Don't add any more test results after we've hit the maximum failure + # count. Otherwise we're racing with the main thread, which is going + # to terminate the process pool soon. + if self.hit_max_failures: + return + + (test_index, test_with_result) = pool_result + # Update the parent process copy of the test. This includes the result, + # XFAILS, REQUIRES, and UNSUPPORTED statuses. + assert self.tests[test_index].file_path == test_with_result.file_path, \ + "parent and child disagree on test path" + self.tests[test_index] = test_with_result + self.display.update(test_with_result) + + # If we've finished all the tests or too many tests have failed, notify + # the main thread that we've stopped testing. + self.failure_count += (test_with_result.result.code == lit.Test.FAIL) + if self.lit_config.maxFailures and \ + self.failure_count == self.lit_config.maxFailures: + self.hit_max_failures = True + +child_lit_config = None +child_parallelism_semaphores = None + +def worker_initializer(lit_config, parallelism_semaphores): + """Copy expensive repeated data into worker processes""" + global child_lit_config + child_lit_config = lit_config + global child_parallelism_semaphores + child_parallelism_semaphores = parallelism_semaphores + +def worker_run_one_test(test_index, test): + """Run one test in a multiprocessing.Pool + + Side effects in this function and functions it calls are not visible in the + main lit process. + + Arguments and results of this function are pickled, so they should be cheap + to copy. For efficiency, we copy all data needed to execute all tests into + each worker and store it in the child_* global variables. This reduces the + cost of each task. + + Returns an index and a Result, which the parent process uses to update + the display. + """ + try: + execute_test(test, child_lit_config, child_parallelism_semaphores) + return (test_index, test) + except KeyboardInterrupt as e: + # This is a sad hack. Unfortunately subprocess goes + # bonkers with ctrl-c and we start forking merrily. + print('\nCtrl-C detected, goodbye.') + traceback.print_exc() + sys.stdout.flush() + os.kill(0,9) + except: + traceback.print_exc() |