summaryrefslogtreecommitdiff
path: root/packages/Python/lldbsuite/test/python_api/event/TestEvents.py
diff options
context:
space:
mode:
Diffstat (limited to 'packages/Python/lldbsuite/test/python_api/event/TestEvents.py')
-rw-r--r--packages/Python/lldbsuite/test/python_api/event/TestEvents.py311
1 files changed, 0 insertions, 311 deletions
diff --git a/packages/Python/lldbsuite/test/python_api/event/TestEvents.py b/packages/Python/lldbsuite/test/python_api/event/TestEvents.py
deleted file mode 100644
index f5d143fbd40b..000000000000
--- a/packages/Python/lldbsuite/test/python_api/event/TestEvents.py
+++ /dev/null
@@ -1,311 +0,0 @@
-"""
-Test lldb Python event APIs.
-"""
-
-from __future__ import print_function
-
-
-import os
-import time
-import re
-import lldb
-from lldbsuite.test.decorators import *
-from lldbsuite.test.lldbtest import *
-from lldbsuite.test import lldbutil
-
-
-@skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV
-@skipIfDarwin
-class EventAPITestCase(TestBase):
-
- mydir = TestBase.compute_mydir(__file__)
-
- def setUp(self):
- # Call super's setUp().
- TestBase.setUp(self)
- # Find the line number to of function 'c'.
- self.line = line_number(
- 'main.c', '// Find the line number of function "c" here.')
-
- @add_test_categories(['pyapi'])
- @expectedFailureAll(
- oslist=["linux"],
- bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases")
- def test_listen_for_and_print_event(self):
- """Exercise SBEvent API."""
- self.build()
- exe = self.getBuildArtifact("a.out")
-
- self.dbg.SetAsync(True)
-
- # Create a target by the debugger.
- target = self.dbg.CreateTarget(exe)
- self.assertTrue(target, VALID_TARGET)
-
- # Now create a breakpoint on main.c by name 'c'.
- breakpoint = target.BreakpointCreateByName('c', 'a.out')
-
- listener = lldb.SBListener("my listener")
-
- # Now launch the process, and do not stop at the entry point.
- error = lldb.SBError()
- process = target.Launch(listener,
- None, # argv
- None, # envp
- None, # stdin_path
- None, # stdout_path
- None, # stderr_path
- None, # working directory
- 0, # launch flags
- False, # Stop at entry
- error) # error
-
- self.assertTrue(
- process.GetState() == lldb.eStateStopped,
- PROCESS_STOPPED)
-
- # Create an empty event object.
- event = lldb.SBEvent()
-
- traceOn = self.TraceOn()
- if traceOn:
- lldbutil.print_stacktraces(process)
-
- # Create MyListeningThread class to wait for any kind of event.
- import threading
-
- class MyListeningThread(threading.Thread):
-
- def run(self):
- count = 0
- # Let's only try at most 4 times to retrieve any kind of event.
- # After that, the thread exits.
- while not count > 3:
- if traceOn:
- print("Try wait for event...")
- if listener.WaitForEvent(5, event):
- if traceOn:
- desc = lldbutil.get_description(event)
- print("Event description:", desc)
- print("Event data flavor:", event.GetDataFlavor())
- print(
- "Process state:",
- lldbutil.state_type_to_str(
- process.GetState()))
- print()
- else:
- if traceOn:
- print("timeout occurred waiting for event...")
- count = count + 1
- listener.Clear()
- return
-
- # Let's start the listening thread to retrieve the events.
- my_thread = MyListeningThread()
- my_thread.start()
-
- # Use Python API to continue the process. The listening thread should be
- # able to receive the state changed events.
- process.Continue()
-
- # Use Python API to kill the process. The listening thread should be
- # able to receive the state changed event, too.
- process.Kill()
-
- # Wait until the 'MyListeningThread' terminates.
- my_thread.join()
-
- # Shouldn't we be testing against some kind of expectation here?
-
- @add_test_categories(['pyapi'])
- @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases
- def test_wait_for_event(self):
- """Exercise SBListener.WaitForEvent() API."""
- self.build()
- exe = self.getBuildArtifact("a.out")
-
- self.dbg.SetAsync(True)
-
- # Create a target by the debugger.
- target = self.dbg.CreateTarget(exe)
- self.assertTrue(target, VALID_TARGET)
-
- # Now create a breakpoint on main.c by name 'c'.
- breakpoint = target.BreakpointCreateByName('c', 'a.out')
- #print("breakpoint:", breakpoint)
- self.assertTrue(breakpoint and
- breakpoint.GetNumLocations() == 1,
- VALID_BREAKPOINT)
-
- # Get the debugger listener.
- listener = self.dbg.GetListener()
-
- # Now launch the process, and do not stop at entry point.
- error = lldb.SBError()
- process = target.Launch(listener,
- None, # argv
- None, # envp
- None, # stdin_path
- None, # stdout_path
- None, # stderr_path
- None, # working directory
- 0, # launch flags
- False, # Stop at entry
- error) # error
- self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
-
- # Create an empty event object.
- event = lldb.SBEvent()
- self.assertFalse(event, "Event should not be valid initially")
-
- # Create MyListeningThread to wait for any kind of event.
- import threading
-
- class MyListeningThread(threading.Thread):
-
- def run(self):
- count = 0
- # Let's only try at most 3 times to retrieve any kind of event.
- while not count > 3:
- if listener.WaitForEvent(5, event):
- #print("Got a valid event:", event)
- #print("Event data flavor:", event.GetDataFlavor())
- #print("Event type:", lldbutil.state_type_to_str(event.GetType()))
- listener.Clear()
- return
- count = count + 1
- print("Timeout: listener.WaitForEvent")
- listener.Clear()
- return
-
- # Use Python API to kill the process. The listening thread should be
- # able to receive a state changed event.
- process.Kill()
-
- # Let's start the listening thread to retrieve the event.
- my_thread = MyListeningThread()
- my_thread.start()
-
- # Wait until the 'MyListeningThread' terminates.
- my_thread.join()
-
- self.assertTrue(event,
- "My listening thread successfully received an event")
-
- @skipIfFreeBSD # llvm.org/pr21325
- @add_test_categories(['pyapi'])
- @expectedFailureAll(
- oslist=["linux"],
- bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases")
- def test_add_listener_to_broadcaster(self):
- """Exercise some SBBroadcaster APIs."""
- self.build()
- exe = self.getBuildArtifact("a.out")
-
- self.dbg.SetAsync(True)
-
- # Create a target by the debugger.
- target = self.dbg.CreateTarget(exe)
- self.assertTrue(target, VALID_TARGET)
-
- # Now create a breakpoint on main.c by name 'c'.
- breakpoint = target.BreakpointCreateByName('c', 'a.out')
- #print("breakpoint:", breakpoint)
- self.assertTrue(breakpoint and
- breakpoint.GetNumLocations() == 1,
- VALID_BREAKPOINT)
-
- listener = lldb.SBListener("my listener")
-
- # Now launch the process, and do not stop at the entry point.
- error = lldb.SBError()
- process = target.Launch(listener,
- None, # argv
- None, # envp
- None, # stdin_path
- None, # stdout_path
- None, # stderr_path
- None, # working directory
- 0, # launch flags
- False, # Stop at entry
- error) # error
-
- # Create an empty event object.
- event = lldb.SBEvent()
- self.assertFalse(event, "Event should not be valid initially")
-
- # The finite state machine for our custom listening thread, with an
- # initial state of None, which means no event has been received.
- # It changes to 'connected' after 'connected' event is received (for remote platforms)
- # It changes to 'running' after 'running' event is received (should happen only if the
- # currentstate is either 'None' or 'connected')
- # It changes to 'stopped' if a 'stopped' event is received (should happen only if the
- # current state is 'running'.)
- self.state = None
-
- # Create MyListeningThread to wait for state changed events.
- # By design, a "running" event is expected following by a "stopped"
- # event.
- import threading
-
- class MyListeningThread(threading.Thread):
-
- def run(self):
- #print("Running MyListeningThread:", self)
-
- # Regular expression pattern for the event description.
- pattern = re.compile("data = {.*, state = (.*)}$")
-
- # Let's only try at most 6 times to retrieve our events.
- count = 0
- while True:
- if listener.WaitForEvent(5, event):
- desc = lldbutil.get_description(event)
- #print("Event description:", desc)
- match = pattern.search(desc)
- if not match:
- break
- if match.group(1) == 'connected':
- # When debugging remote targets with lldb-server, we
- # first get the 'connected' event.
- self.context.assertTrue(self.context.state is None)
- self.context.state = 'connected'
- continue
- elif match.group(1) == 'running':
- self.context.assertTrue(
- self.context.state is None or self.context.state == 'connected')
- self.context.state = 'running'
- continue
- elif match.group(1) == 'stopped':
- self.context.assertTrue(
- self.context.state == 'running')
- # Whoopee, both events have been received!
- self.context.state = 'stopped'
- break
- else:
- break
- print("Timeout: listener.WaitForEvent")
- count = count + 1
- if count > 6:
- break
- listener.Clear()
- return
-
- # Use Python API to continue the process. The listening thread should be
- # able to receive the state changed events.
- process.Continue()
-
- # Start the listening thread to receive the "running" followed by the
- # "stopped" events.
- my_thread = MyListeningThread()
- # Supply the enclosing context so that our listening thread can access
- # the 'state' variable.
- my_thread.context = self
- my_thread.start()
-
- # Wait until the 'MyListeningThread' terminates.
- my_thread.join()
-
- # The final judgement. :-)
- self.assertTrue(self.state == 'stopped',
- "Both expected state changed events received")