diff options
Diffstat (limited to 'packages/Python/lldbsuite/test/tools/lldb-server/TestGdbRemoteThreadsInStopReply.py')
| -rw-r--r-- | packages/Python/lldbsuite/test/tools/lldb-server/TestGdbRemoteThreadsInStopReply.py | 119 | 
1 files changed, 116 insertions, 3 deletions
| diff --git a/packages/Python/lldbsuite/test/tools/lldb-server/TestGdbRemoteThreadsInStopReply.py b/packages/Python/lldbsuite/test/tools/lldb-server/TestGdbRemoteThreadsInStopReply.py index 57d4d5ab4bb1c..b361b9e6d9157 100644 --- a/packages/Python/lldbsuite/test/tools/lldb-server/TestGdbRemoteThreadsInStopReply.py +++ b/packages/Python/lldbsuite/test/tools/lldb-server/TestGdbRemoteThreadsInStopReply.py @@ -1,11 +1,13 @@  from __future__ import print_function +import json +import re +  import gdbremote_testcase  from lldbsuite.test.decorators import *  from lldbsuite.test.lldbtest import *  from lldbsuite.test import lldbutil -  class TestGdbRemoteThreadsInStopReply(          gdbremote_testcase.GdbRemoteTestCaseBase): @@ -16,7 +18,8 @@ class TestGdbRemoteThreadsInStopReply(          "send packet: $OK#00",      ] -    def gather_stop_reply_threads(self, post_startup_log_lines, thread_count): +    def gather_stop_reply_fields(self, post_startup_log_lines, thread_count, +            field_names):          # Set up the inferior args.          inferior_args = []          for i in range(thread_count - 1): @@ -25,6 +28,9 @@ class TestGdbRemoteThreadsInStopReply(          procs = self.prep_debug_monitor_and_inferior(              inferior_args=inferior_args) +        self.add_register_info_collection_packets() +        self.add_process_info_collection_packets() +          # Assumes test_sequence has anything added needed to setup the initial state.          # (Like optionally enabling QThreadsInStopReply.)          if post_startup_log_lines: @@ -34,6 +40,7 @@ class TestGdbRemoteThreadsInStopReply(          ], True)          context = self.expect_gdbremote_sequence()          self.assertIsNotNone(context) +        hw_info = self.parse_hw_info(context)          # Give threads time to start up, then break.          time.sleep(1) @@ -77,14 +84,89 @@ class TestGdbRemoteThreadsInStopReply(          kv_dict = self.parse_key_val_dict(key_vals_text)          self.assertIsNotNone(kv_dict) +        result = dict(); +        result["pc_register"] = hw_info["pc_register"] +        result["little_endian"] = hw_info["little_endian"] +        for key_field in field_names: +            result[key_field] = kv_dict.get(key_field) + +        return result + +    def gather_stop_reply_threads(self, post_startup_log_lines, thread_count):          # Pull out threads from stop response. -        stop_reply_threads_text = kv_dict.get("threads") +        stop_reply_threads_text = self.gather_stop_reply_fields( +                post_startup_log_lines, thread_count, ["threads"])["threads"]          if stop_reply_threads_text:              return [int(thread_id, 16)                      for thread_id in stop_reply_threads_text.split(",")]          else:              return [] +    def gather_stop_reply_pcs(self, post_startup_log_lines, thread_count): +        results = self.gather_stop_reply_fields( post_startup_log_lines, +                thread_count, ["threads", "thread-pcs"]) +        if not results: +            return [] + +        threads_text = results["threads"] +        pcs_text = results["thread-pcs"] +        thread_ids = threads_text.split(",") +        pcs = pcs_text.split(",") +        self.assertTrue(len(thread_ids) == len(pcs)) + +        thread_pcs = dict() +        for i in range(0, len(pcs)): +            thread_pcs[int(thread_ids[i], 16)] = pcs[i] + +        result = dict() +        result["thread_pcs"] = thread_pcs +        result["pc_register"] = results["pc_register"] +        result["little_endian"] = results["little_endian"] +        return result + +    def switch_endian(self, egg): +        return "".join(reversed(re.findall("..", egg))) + +    def parse_hw_info(self, context): +        self.assertIsNotNone(context) +        process_info = self.parse_process_info_response(context) +        endian = process_info.get("endian") +        reg_info = self.parse_register_info_packets(context) +        (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_info) + +        hw_info = dict() +        hw_info["pc_register"] = pc_lldb_reg_index +        hw_info["little_endian"] = (endian == "little") +        return hw_info + +    def gather_threads_info_pcs(self, pc_register, little_endian): +        self.reset_test_sequence() +        self.test_sequence.add_log_lines( +                [ +                    "read packet: $jThreadsInfo#c1", +                    { +                        "direction": "send", +                        "regex": r"^\$(.*)#[0-9a-fA-F]{2}$", +                        "capture": { +                            1: "threads_info"}}, +                ], +                True) + +        context = self.expect_gdbremote_sequence() +        self.assertIsNotNone(context) +        threads_info = context.get("threads_info") +        register = str(pc_register) +        # The jThreadsInfo response is not valid JSON data, so we have to +        # clean it up first. +        jthreads_info = json.loads(re.sub(r"}]", "}", threads_info)) +        thread_pcs = dict() +        for thread_info in jthreads_info: +            tid = thread_info["tid"] +            pc = thread_info["registers"][register] +            thread_pcs[tid] = self.switch_endian(pc) if little_endian else pc + +        return thread_pcs +      def QListThreadsInStopReply_supported(self):          procs = self.prep_debug_monitor_and_inferior()          self.test_sequence.add_log_lines( @@ -183,3 +265,34 @@ class TestGdbRemoteThreadsInStopReply(          self.build()          self.set_inferior_startup_launch()          self.stop_reply_reports_correct_threads(5) + +    def stop_reply_contains_thread_pcs(self, thread_count): +        results = self.gather_stop_reply_pcs( +                self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count) +        stop_reply_pcs = results["thread_pcs"] +        pc_register = results["pc_register"] +        little_endian = results["little_endian"] +        self.assertEqual(len(stop_reply_pcs), thread_count) + +        threads_info_pcs = self.gather_threads_info_pcs(pc_register, +                little_endian) + +        self.assertEqual(len(threads_info_pcs), thread_count) +        for thread_id in stop_reply_pcs: +            self.assertTrue(thread_id in threads_info_pcs) +            self.assertTrue(int(stop_reply_pcs[thread_id], 16) +                    == int(threads_info_pcs[thread_id], 16)) + +    @llgs_test +    def test_stop_reply_contains_thread_pcs_llgs(self): +        self.init_llgs_test() +        self.build() +        self.set_inferior_startup_launch() +        self.stop_reply_contains_thread_pcs(5) + +    @debugserver_test +    def test_stop_reply_contains_thread_pcs_debugserver(self): +        self.init_debugserver_test() +        self.build() +        self.set_inferior_startup_launch() +        self.stop_reply_contains_thread_pcs(5) | 
