diff options
Diffstat (limited to 'packages/Python/lldbsuite/test/python_api/event/TestEvents.py')
-rw-r--r-- | packages/Python/lldbsuite/test/python_api/event/TestEvents.py | 311 |
1 files changed, 0 insertions, 311 deletions
diff --git a/packages/Python/lldbsuite/test/python_api/event/TestEvents.py b/packages/Python/lldbsuite/test/python_api/event/TestEvents.py deleted file mode 100644 index f5d143fbd40b..000000000000 --- a/packages/Python/lldbsuite/test/python_api/event/TestEvents.py +++ /dev/null @@ -1,311 +0,0 @@ -""" -Test lldb Python event APIs. -""" - -from __future__ import print_function - - -import os -import time -import re -import lldb -from lldbsuite.test.decorators import * -from lldbsuite.test.lldbtest import * -from lldbsuite.test import lldbutil - - -@skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV -@skipIfDarwin -class EventAPITestCase(TestBase): - - mydir = TestBase.compute_mydir(__file__) - - def setUp(self): - # Call super's setUp(). - TestBase.setUp(self) - # Find the line number to of function 'c'. - self.line = line_number( - 'main.c', '// Find the line number of function "c" here.') - - @add_test_categories(['pyapi']) - @expectedFailureAll( - oslist=["linux"], - bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases") - def test_listen_for_and_print_event(self): - """Exercise SBEvent API.""" - self.build() - exe = self.getBuildArtifact("a.out") - - self.dbg.SetAsync(True) - - # Create a target by the debugger. - target = self.dbg.CreateTarget(exe) - self.assertTrue(target, VALID_TARGET) - - # Now create a breakpoint on main.c by name 'c'. - breakpoint = target.BreakpointCreateByName('c', 'a.out') - - listener = lldb.SBListener("my listener") - - # Now launch the process, and do not stop at the entry point. - error = lldb.SBError() - process = target.Launch(listener, - None, # argv - None, # envp - None, # stdin_path - None, # stdout_path - None, # stderr_path - None, # working directory - 0, # launch flags - False, # Stop at entry - error) # error - - self.assertTrue( - process.GetState() == lldb.eStateStopped, - PROCESS_STOPPED) - - # Create an empty event object. - event = lldb.SBEvent() - - traceOn = self.TraceOn() - if traceOn: - lldbutil.print_stacktraces(process) - - # Create MyListeningThread class to wait for any kind of event. - import threading - - class MyListeningThread(threading.Thread): - - def run(self): - count = 0 - # Let's only try at most 4 times to retrieve any kind of event. - # After that, the thread exits. - while not count > 3: - if traceOn: - print("Try wait for event...") - if listener.WaitForEvent(5, event): - if traceOn: - desc = lldbutil.get_description(event) - print("Event description:", desc) - print("Event data flavor:", event.GetDataFlavor()) - print( - "Process state:", - lldbutil.state_type_to_str( - process.GetState())) - print() - else: - if traceOn: - print("timeout occurred waiting for event...") - count = count + 1 - listener.Clear() - return - - # Let's start the listening thread to retrieve the events. - my_thread = MyListeningThread() - my_thread.start() - - # Use Python API to continue the process. The listening thread should be - # able to receive the state changed events. - process.Continue() - - # Use Python API to kill the process. The listening thread should be - # able to receive the state changed event, too. - process.Kill() - - # Wait until the 'MyListeningThread' terminates. - my_thread.join() - - # Shouldn't we be testing against some kind of expectation here? - - @add_test_categories(['pyapi']) - @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases - def test_wait_for_event(self): - """Exercise SBListener.WaitForEvent() API.""" - self.build() - exe = self.getBuildArtifact("a.out") - - self.dbg.SetAsync(True) - - # Create a target by the debugger. - target = self.dbg.CreateTarget(exe) - self.assertTrue(target, VALID_TARGET) - - # Now create a breakpoint on main.c by name 'c'. - breakpoint = target.BreakpointCreateByName('c', 'a.out') - #print("breakpoint:", breakpoint) - self.assertTrue(breakpoint and - breakpoint.GetNumLocations() == 1, - VALID_BREAKPOINT) - - # Get the debugger listener. - listener = self.dbg.GetListener() - - # Now launch the process, and do not stop at entry point. - error = lldb.SBError() - process = target.Launch(listener, - None, # argv - None, # envp - None, # stdin_path - None, # stdout_path - None, # stderr_path - None, # working directory - 0, # launch flags - False, # Stop at entry - error) # error - self.assertTrue(error.Success() and process, PROCESS_IS_VALID) - - # Create an empty event object. - event = lldb.SBEvent() - self.assertFalse(event, "Event should not be valid initially") - - # Create MyListeningThread to wait for any kind of event. - import threading - - class MyListeningThread(threading.Thread): - - def run(self): - count = 0 - # Let's only try at most 3 times to retrieve any kind of event. - while not count > 3: - if listener.WaitForEvent(5, event): - #print("Got a valid event:", event) - #print("Event data flavor:", event.GetDataFlavor()) - #print("Event type:", lldbutil.state_type_to_str(event.GetType())) - listener.Clear() - return - count = count + 1 - print("Timeout: listener.WaitForEvent") - listener.Clear() - return - - # Use Python API to kill the process. The listening thread should be - # able to receive a state changed event. - process.Kill() - - # Let's start the listening thread to retrieve the event. - my_thread = MyListeningThread() - my_thread.start() - - # Wait until the 'MyListeningThread' terminates. - my_thread.join() - - self.assertTrue(event, - "My listening thread successfully received an event") - - @skipIfFreeBSD # llvm.org/pr21325 - @add_test_categories(['pyapi']) - @expectedFailureAll( - oslist=["linux"], - bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases") - def test_add_listener_to_broadcaster(self): - """Exercise some SBBroadcaster APIs.""" - self.build() - exe = self.getBuildArtifact("a.out") - - self.dbg.SetAsync(True) - - # Create a target by the debugger. - target = self.dbg.CreateTarget(exe) - self.assertTrue(target, VALID_TARGET) - - # Now create a breakpoint on main.c by name 'c'. - breakpoint = target.BreakpointCreateByName('c', 'a.out') - #print("breakpoint:", breakpoint) - self.assertTrue(breakpoint and - breakpoint.GetNumLocations() == 1, - VALID_BREAKPOINT) - - listener = lldb.SBListener("my listener") - - # Now launch the process, and do not stop at the entry point. - error = lldb.SBError() - process = target.Launch(listener, - None, # argv - None, # envp - None, # stdin_path - None, # stdout_path - None, # stderr_path - None, # working directory - 0, # launch flags - False, # Stop at entry - error) # error - - # Create an empty event object. - event = lldb.SBEvent() - self.assertFalse(event, "Event should not be valid initially") - - # The finite state machine for our custom listening thread, with an - # initial state of None, which means no event has been received. - # It changes to 'connected' after 'connected' event is received (for remote platforms) - # It changes to 'running' after 'running' event is received (should happen only if the - # currentstate is either 'None' or 'connected') - # It changes to 'stopped' if a 'stopped' event is received (should happen only if the - # current state is 'running'.) - self.state = None - - # Create MyListeningThread to wait for state changed events. - # By design, a "running" event is expected following by a "stopped" - # event. - import threading - - class MyListeningThread(threading.Thread): - - def run(self): - #print("Running MyListeningThread:", self) - - # Regular expression pattern for the event description. - pattern = re.compile("data = {.*, state = (.*)}$") - - # Let's only try at most 6 times to retrieve our events. - count = 0 - while True: - if listener.WaitForEvent(5, event): - desc = lldbutil.get_description(event) - #print("Event description:", desc) - match = pattern.search(desc) - if not match: - break - if match.group(1) == 'connected': - # When debugging remote targets with lldb-server, we - # first get the 'connected' event. - self.context.assertTrue(self.context.state is None) - self.context.state = 'connected' - continue - elif match.group(1) == 'running': - self.context.assertTrue( - self.context.state is None or self.context.state == 'connected') - self.context.state = 'running' - continue - elif match.group(1) == 'stopped': - self.context.assertTrue( - self.context.state == 'running') - # Whoopee, both events have been received! - self.context.state = 'stopped' - break - else: - break - print("Timeout: listener.WaitForEvent") - count = count + 1 - if count > 6: - break - listener.Clear() - return - - # Use Python API to continue the process. The listening thread should be - # able to receive the state changed events. - process.Continue() - - # Start the listening thread to receive the "running" followed by the - # "stopped" events. - my_thread = MyListeningThread() - # Supply the enclosing context so that our listening thread can access - # the 'state' variable. - my_thread.context = self - my_thread.start() - - # Wait until the 'MyListeningThread' terminates. - my_thread.join() - - # The final judgement. :-) - self.assertTrue(self.state == 'stopped', - "Both expected state changed events received") |