summaryrefslogtreecommitdiff
path: root/utils/lit/lit/run.py
diff options
context:
space:
mode:
Diffstat (limited to 'utils/lit/lit/run.py')
-rw-r--r--utils/lit/lit/run.py224
1 files changed, 196 insertions, 28 deletions
diff --git a/utils/lit/lit/run.py b/utils/lit/lit/run.py
index f7e84d316a7c..14d8ec98490e 100644
--- a/utils/lit/lit/run.py
+++ b/utils/lit/lit/run.py
@@ -1,4 +1,5 @@
import os
+import sys
import threading
import time
import traceback
@@ -84,11 +85,13 @@ class Tester(object):
def run_test(self, test_index):
test = self.run_instance.tests[test_index]
try:
- self.run_instance.execute_test(test)
+ execute_test(test, self.run_instance.lit_config,
+ self.run_instance.parallelism_semaphores)
except KeyboardInterrupt:
# This is a sad hack. Unfortunately subprocess goes
# bonkers with ctrl-c and we start forking merrily.
print('\nCtrl-C detected, goodbye.')
+ sys.stdout.flush()
os.kill(0,9)
self.consumer.update(test_index, test)
@@ -167,6 +170,44 @@ class _Display(object):
def handleFailures(provider, consumer, maxFailures):
consumer.display = _Display(consumer.display, provider, maxFailures)
+def execute_test(test, lit_config, parallelism_semaphores):
+ """Execute one test"""
+ pg = test.config.parallelism_group
+ if callable(pg):
+ pg = pg(test)
+
+ result = None
+ semaphore = None
+ try:
+ if pg:
+ semaphore = parallelism_semaphores[pg]
+ if semaphore:
+ semaphore.acquire()
+ start_time = time.time()
+ result = test.config.test_format.execute(test, lit_config)
+ # Support deprecated result from execute() which returned the result
+ # code and additional output as a tuple.
+ if isinstance(result, tuple):
+ code, output = result
+ result = lit.Test.Result(code, output)
+ elif not isinstance(result, lit.Test.Result):
+ raise ValueError("unexpected result from test execution")
+ result.elapsed = time.time() - start_time
+ except KeyboardInterrupt:
+ raise
+ except:
+ if lit_config.debug:
+ raise
+ output = 'Exception during script execution:\n'
+ output += traceback.format_exc()
+ output += '\n'
+ result = lit.Test.Result(lit.Test.UNRESOLVED, output)
+ finally:
+ if semaphore:
+ semaphore.release()
+
+ test.setResult(result)
+
class Run(object):
"""
This class represents a concrete, configured testing run.
@@ -177,33 +218,10 @@ class Run(object):
self.tests = tests
def execute_test(self, test):
- result = None
- start_time = time.time()
- try:
- result = test.config.test_format.execute(test, self.lit_config)
-
- # Support deprecated result from execute() which returned the result
- # code and additional output as a tuple.
- if isinstance(result, tuple):
- code, output = result
- result = lit.Test.Result(code, output)
- elif not isinstance(result, lit.Test.Result):
- raise ValueError("unexpected result from test execution")
- except KeyboardInterrupt:
- raise
- except:
- if self.lit_config.debug:
- raise
- output = 'Exception during script execution:\n'
- output += traceback.format_exc()
- output += '\n'
- result = lit.Test.Result(lit.Test.UNRESOLVED, output)
- result.elapsed = time.time() - start_time
-
- test.setResult(result)
+ return execute_test(test, self.lit_config, self.parallelism_semaphores)
def execute_tests(self, display, jobs, max_time=None,
- use_processes=False):
+ execution_strategy=None):
"""
execute_tests(display, jobs, [max_time])
@@ -225,12 +243,21 @@ class Run(object):
be given an UNRESOLVED result.
"""
+ if execution_strategy == 'PROCESS_POOL':
+ self.execute_tests_with_mp_pool(display, jobs, max_time)
+ return
+ # FIXME: Standardize on the PROCESS_POOL execution strategy and remove
+ # the other two strategies.
+
+ use_processes = execution_strategy == 'PROCESSES'
+
# Choose the appropriate parallel execution implementation.
consumer = None
if jobs != 1 and use_processes and multiprocessing:
try:
task_impl = multiprocessing.Process
queue_impl = multiprocessing.Queue
+ sem_impl = multiprocessing.Semaphore
canceled_flag = multiprocessing.Value('i', 0)
consumer = MultiprocessResultsConsumer(self, display, jobs)
except:
@@ -242,15 +269,19 @@ class Run(object):
if not consumer:
task_impl = threading.Thread
queue_impl = queue.Queue
+ sem_impl = threading.Semaphore
canceled_flag = LockedValue(0)
consumer = ThreadResultsConsumer(display)
+ self.parallelism_semaphores = {k: sem_impl(v)
+ for k, v in self.lit_config.parallelism_groups.items()}
+
# Create the test provider.
provider = TestProvider(queue_impl, canceled_flag)
handleFailures(provider, consumer, self.lit_config.maxFailures)
- # Queue the tests outside the main thread because we can't guarantee
- # that we can put() all the tests without blocking:
+ # Putting tasks into the threading or multiprocessing Queue may block,
+ # so do it in a separate thread.
# https://docs.python.org/2/library/multiprocessing.html
# e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue
# without taking any out.
@@ -303,3 +334,140 @@ class Run(object):
# Wait for all the tasks to complete.
for t in tasks:
t.join()
+
+ def execute_tests_with_mp_pool(self, display, jobs, max_time=None):
+ # Don't do anything if we aren't going to run any tests.
+ if not self.tests or jobs == 0:
+ return
+
+ # Set up semaphores to limit parallelism of certain classes of tests.
+ # For example, some ASan tests require lots of virtual memory and run
+ # faster with less parallelism on OS X.
+ self.parallelism_semaphores = \
+ {k: multiprocessing.Semaphore(v) for k, v in
+ self.lit_config.parallelism_groups.items()}
+
+ # Install a console-control signal handler on Windows.
+ if win32api is not None:
+ def console_ctrl_handler(type):
+ print('\nCtrl-C detected, terminating.')
+ pool.terminate()
+ pool.join()
+ os.kill(0,9)
+ return True
+ win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
+
+ # Save the display object on the runner so that we can update it from
+ # our task completion callback.
+ self.display = display
+
+ # We need to issue many wait calls, so compute the final deadline and
+ # subtract time.time() from that as we go along.
+ deadline = None
+ if max_time:
+ deadline = time.time() + max_time
+
+ # Start a process pool. Copy over the data shared between all test runs.
+ pool = multiprocessing.Pool(jobs, worker_initializer,
+ (self.lit_config,
+ self.parallelism_semaphores))
+
+ try:
+ self.failure_count = 0
+ self.hit_max_failures = False
+ async_results = [pool.apply_async(worker_run_one_test,
+ args=(test_index, test),
+ callback=self.consume_test_result)
+ for test_index, test in enumerate(self.tests)]
+
+ # Wait for all results to come in. The callback that runs in the
+ # parent process will update the display.
+ for a in async_results:
+ if deadline:
+ a.wait(deadline - time.time())
+ else:
+ # Python condition variables cannot be interrupted unless
+ # they have a timeout. This can make lit unresponsive to
+ # KeyboardInterrupt, so do a busy wait with a timeout.
+ while not a.ready():
+ a.wait(1)
+ if not a.successful():
+ a.get() # Exceptions raised here come from the worker.
+ if self.hit_max_failures:
+ break
+ finally:
+ # Stop the workers and wait for any straggling results to come in
+ # if we exited without waiting on every async result.
+ pool.terminate()
+ pool.join()
+
+ # Mark any tests that weren't run as UNRESOLVED.
+ for test in self.tests:
+ if test.result is None:
+ test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
+
+ def consume_test_result(self, pool_result):
+ """Test completion callback for worker_run_one_test
+
+ Updates the test result status in the parent process. Each task in the
+ pool returns the test index and the result, and we use the index to look
+ up the original test object. Also updates the progress bar as tasks
+ complete.
+ """
+ # Don't add any more test results after we've hit the maximum failure
+ # count. Otherwise we're racing with the main thread, which is going
+ # to terminate the process pool soon.
+ if self.hit_max_failures:
+ return
+
+ (test_index, test_with_result) = pool_result
+ # Update the parent process copy of the test. This includes the result,
+ # XFAILS, REQUIRES, and UNSUPPORTED statuses.
+ assert self.tests[test_index].file_path == test_with_result.file_path, \
+ "parent and child disagree on test path"
+ self.tests[test_index] = test_with_result
+ self.display.update(test_with_result)
+
+ # If we've finished all the tests or too many tests have failed, notify
+ # the main thread that we've stopped testing.
+ self.failure_count += (test_with_result.result.code == lit.Test.FAIL)
+ if self.lit_config.maxFailures and \
+ self.failure_count == self.lit_config.maxFailures:
+ self.hit_max_failures = True
+
+child_lit_config = None
+child_parallelism_semaphores = None
+
+def worker_initializer(lit_config, parallelism_semaphores):
+ """Copy expensive repeated data into worker processes"""
+ global child_lit_config
+ child_lit_config = lit_config
+ global child_parallelism_semaphores
+ child_parallelism_semaphores = parallelism_semaphores
+
+def worker_run_one_test(test_index, test):
+ """Run one test in a multiprocessing.Pool
+
+ Side effects in this function and functions it calls are not visible in the
+ main lit process.
+
+ Arguments and results of this function are pickled, so they should be cheap
+ to copy. For efficiency, we copy all data needed to execute all tests into
+ each worker and store it in the child_* global variables. This reduces the
+ cost of each task.
+
+ Returns an index and a Result, which the parent process uses to update
+ the display.
+ """
+ try:
+ execute_test(test, child_lit_config, child_parallelism_semaphores)
+ return (test_index, test)
+ except KeyboardInterrupt as e:
+ # This is a sad hack. Unfortunately subprocess goes
+ # bonkers with ctrl-c and we start forking merrily.
+ print('\nCtrl-C detected, goodbye.')
+ traceback.print_exc()
+ sys.stdout.flush()
+ os.kill(0,9)
+ except:
+ traceback.print_exc()