diff options
Diffstat (limited to 'packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py')
| -rw-r--r-- | packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py | 1084 | 
1 files changed, 0 insertions, 1084 deletions
| diff --git a/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py b/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py deleted file mode 100644 index 066158d0877fb..0000000000000 --- a/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py +++ /dev/null @@ -1,1084 +0,0 @@ -#!/usr/bin/env python - -import binascii -import json -import optparse -import os -import pprint -import socket -import string -import subprocess -import sys -import threading - - -def dump_memory(base_addr, data, num_per_line, outfile): - -    data_len = len(data) -    hex_string = binascii.hexlify(data) -    addr = base_addr -    ascii_str = '' -    i = 0 -    while i < data_len: -        outfile.write('0x%8.8x: ' % (addr + i)) -        bytes_left = data_len - i -        if bytes_left >= num_per_line: -            curr_data_len = num_per_line -        else: -            curr_data_len = bytes_left -        hex_start_idx = i * 2 -        hex_end_idx = hex_start_idx + curr_data_len * 2 -        curr_hex_str = hex_string[hex_start_idx:hex_end_idx] -        # 'curr_hex_str' now contains the hex byte string for the -        # current line with no spaces between bytes -        t = iter(curr_hex_str) -        # Print hex bytes separated by space -        outfile.write(' '.join(a + b for a, b in zip(t, t))) -        # Print two spaces -        outfile.write('  ') -        # Calculate ASCII string for bytes into 'ascii_str' -        ascii_str = '' -        for j in range(i, i + curr_data_len): -            ch = data[j] -            if ch in string.printable and ch not in string.whitespace: -                ascii_str += '%c' % (ch) -            else: -                ascii_str += '.' -        # Print ASCII representation and newline -        outfile.write(ascii_str) -        i = i + curr_data_len -        outfile.write('\n') - - -def read_packet(f, verbose=False, trace_file=None): -    '''Decode a JSON packet that starts with the content length and is -       followed by the JSON bytes from a file 'f' -    ''' -    line = f.readline() -    if len(line) == 0: -        return None - -    # Watch for line that starts with the prefix -    prefix = 'Content-Length: ' -    if line.startswith(prefix): -        # Decode length of JSON bytes -        if verbose: -            print('content: "%s"' % (line)) -        length = int(line[len(prefix):]) -        if verbose: -            print('length: "%u"' % (length)) -        # Skip empty line -        line = f.readline() -        if verbose: -            print('empty: "%s"' % (line)) -        # Read JSON bytes -        json_str = f.read(length) -        if verbose: -            print('json: "%s"' % (json_str)) -        if trace_file: -            trace_file.write('from adaptor:\n%s\n' % (json_str)) -        # Decode the JSON bytes into a python dictionary -        return json.loads(json_str) - -    return None - - -def packet_type_is(packet, packet_type): -    return 'type' in packet and packet['type'] == packet_type - - -def read_packet_thread(vs_comm): -    done = False -    while not done: -        packet = read_packet(vs_comm.recv, trace_file=vs_comm.trace_file) -        if packet: -            done = not vs_comm.handle_recv_packet(packet) -        else: -            done = True - - -class DebugCommunication(object): - -    def __init__(self, recv, send): -        self.trace_file = None -        self.send = send -        self.recv = recv -        self.recv_packets = [] -        self.recv_condition = threading.Condition() -        self.recv_thread = threading.Thread(target=read_packet_thread, -                                            args=(self,)) -        self.process_event_body = None -        self.exit_status = None -        self.initialize_body = None -        self.thread_stop_reasons = {} -        self.sequence = 1 -        self.threads = None -        self.recv_thread.start() -        self.output_condition = threading.Condition() -        self.output = {} -        self.configuration_done_sent = False -        self.frame_scopes = {} - -    @classmethod -    def encode_content(cls, s): -        return "Content-Length: %u\r\n\r\n%s" % (len(s), s) - -    @classmethod -    def validate_response(cls, command, response): -        if command['command'] != response['command']: -            raise ValueError('command mismatch in response') -        if command['seq'] != response['request_seq']: -            raise ValueError('seq mismatch in response') - -    def get_output(self, category, timeout=0.0, clear=True): -        self.output_condition.acquire() -        output = None -        if category in self.output: -            output = self.output[category] -            if clear: -                del self.output[category] -        elif timeout != 0.0: -            self.output_condition.wait(timeout) -            if category in self.output: -                output = self.output[category] -                if clear: -                    del self.output[category] -        self.output_condition.release() -        return output - -    def handle_recv_packet(self, packet): -        '''Called by the read thread that is waiting for all incoming packets -           to store the incoming packet in "self.recv_packets" in a thread safe -           way. This function will then signal the "self.recv_condition" to -           indicate a new packet is available. Returns True if the caller -           should keep calling this function for more packets. -        ''' -        # Check the packet to see if is an event packet -        keepGoing = True -        packet_type = packet['type'] -        if packet_type == 'event': -            event = packet['event'] -            body = None -            if 'body' in packet: -                body = packet['body'] -            # Handle the event packet and cache information from these packets -            # as they come in -            if event == 'output': -                # Store any output we receive so clients can retrieve it later. -                category = body['category'] -                output = body['output'] -                self.output_condition.acquire() -                if category in self.output: -                    self.output[category] += output -                else: -                    self.output[category] = output -                self.output_condition.notify() -                self.output_condition.release() -                # no need to add 'output' packets to our packets list -                return keepGoing -            elif event == 'process': -                # When a new process is attached or launched, remember the -                # details that are available in the body of the event -                self.process_event_body = body -            elif event == 'stopped': -                # Each thread that stops with a reason will send a -                # 'stopped' event. We need to remember the thread stop -                # reasons since the 'threads' command doesn't return -                # that information. -                self._process_stopped() -                tid = body['threadId'] -                self.thread_stop_reasons[tid] = body -        elif packet_type == 'response': -            if packet['command'] == 'disconnect': -                keepGoing = False -        self.recv_condition.acquire() -        self.recv_packets.append(packet) -        self.recv_condition.notify() -        self.recv_condition.release() -        return keepGoing - -    def send_packet(self, command_dict, set_sequence=True): -        '''Take the "command_dict" python dictionary and encode it as a JSON -           string and send the contents as a packet to the VSCode debug -           adaptor''' -        # Set the sequence ID for this command automatically -        if set_sequence: -            command_dict['seq'] = self.sequence -            self.sequence += 1 -        # Encode our command dictionary as a JSON string -        json_str = json.dumps(command_dict, separators=(',', ':')) -        if self.trace_file: -            self.trace_file.write('to adaptor:\n%s\n' % (json_str)) -        length = len(json_str) -        if length > 0: -            # Send the encoded JSON packet and flush the 'send' file -            self.send.write(self.encode_content(json_str)) -            self.send.flush() - -    def recv_packet(self, filter_type=None, filter_event=None, timeout=None): -        '''Get a JSON packet from the VSCode debug adaptor. This function -           assumes a thread that reads packets is running and will deliver -           any received packets by calling handle_recv_packet(...). This -           function will wait for the packet to arrive and return it when -           it does.''' -        while True: -            self.recv_condition.acquire() -            packet = None -            while True: -                for (i, curr_packet) in enumerate(self.recv_packets): -                    packet_type = curr_packet['type'] -                    if filter_type is None or packet_type in filter_type: -                        if (filter_event is None or -                            (packet_type == 'event' and -                             curr_packet['event'] in filter_event)): -                            packet = self.recv_packets.pop(i) -                            break -                if packet: -                    break -                # Sleep until packet is received -                len_before = len(self.recv_packets) -                self.recv_condition.wait(timeout) -                len_after = len(self.recv_packets) -                if len_before == len_after: -                    return None  # Timed out -            self.recv_condition.release() -            return packet - -        return None - -    def send_recv(self, command): -        '''Send a command python dictionary as JSON and receive the JSON -           response. Validates that the response is the correct sequence and -           command in the reply. Any events that are received are added to the -           events list in this object''' -        self.send_packet(command) -        done = False -        while not done: -            response = self.recv_packet(filter_type='response') -            if response is None: -                desc = 'no response for "%s"' % (command['command']) -                raise ValueError(desc) -            self.validate_response(command, response) -            return response -        return None - -    def wait_for_event(self, filter=None, timeout=None): -        while True: -            return self.recv_packet(filter_type='event', filter_event=filter, -                                    timeout=timeout) -        return None - -    def wait_for_stopped(self, timeout=None): -        stopped_events = [] -        stopped_event = self.wait_for_event(filter=['stopped', 'exited'], -                                            timeout=timeout) -        exited = False -        while stopped_event: -            stopped_events.append(stopped_event) -            # If we exited, then we are done -            if stopped_event['event'] == 'exited': -                self.exit_status = stopped_event['body']['exitCode'] -                exited = True -                break -            # Otherwise we stopped and there might be one or more 'stopped' -            # events for each thread that stopped with a reason, so keep -            # checking for more 'stopped' events and return all of them -            stopped_event = self.wait_for_event(filter='stopped', timeout=0.25) -        if exited: -            self.threads = [] -        return stopped_events - -    def wait_for_exited(self): -        event_dict = self.wait_for_event('exited') -        if event_dict is None: -            raise ValueError("didn't get stopped event") -        return event_dict - -    def get_initialize_value(self, key): -        '''Get a value for the given key if it there is a key/value pair in -           the "initialize" request response body. -        ''' -        if self.initialize_body and key in self.initialize_body: -            return self.initialize_body[key] -        return None - -    def get_threads(self): -        if self.threads is None: -            self.request_threads() -        return self.threads - -    def get_thread_id(self, threadIndex=0): -        '''Utility function to get the first thread ID in the thread list. -           If the thread list is empty, then fetch the threads. -        ''' -        if self.threads is None: -            self.request_threads() -        if self.threads and threadIndex < len(self.threads): -            return self.threads[threadIndex]['id'] -        return None - -    def get_stackFrame(self, frameIndex=0, threadId=None): -        '''Get a single "StackFrame" object from a "stackTrace" request and -           return the "StackFrame as a python dictionary, or None on failure -        ''' -        if threadId is None: -            threadId = self.get_thread_id() -        if threadId is None: -            print('invalid threadId') -            return None -        response = self.request_stackTrace(threadId, startFrame=frameIndex, -                                           levels=1) -        if response: -            return response['body']['stackFrames'][0] -        print('invalid response') -        return None - -    def get_scope_variables(self, scope_name, frameIndex=0, threadId=None): -        stackFrame = self.get_stackFrame(frameIndex=frameIndex, -                                         threadId=threadId) -        if stackFrame is None: -            return [] -        frameId = stackFrame['id'] -        if frameId in self.frame_scopes: -            frame_scopes = self.frame_scopes[frameId] -        else: -            scopes_response = self.request_scopes(frameId) -            frame_scopes = scopes_response['body']['scopes'] -            self.frame_scopes[frameId] = frame_scopes -        for scope in frame_scopes: -            if scope['name'] == scope_name: -                varRef = scope['variablesReference'] -                variables_response = self.request_variables(varRef) -                if variables_response: -                    if 'body' in variables_response: -                        body = variables_response['body'] -                        if 'variables' in body: -                            vars = body['variables'] -                            return vars -        return [] - -    def get_global_variables(self, frameIndex=0, threadId=None): -        return self.get_scope_variables('Globals', frameIndex=frameIndex, -                                        threadId=threadId) - -    def get_local_variables(self, frameIndex=0, threadId=None): -        return self.get_scope_variables('Locals', frameIndex=frameIndex, -                                        threadId=threadId) - -    def get_local_variable(self, name, frameIndex=0, threadId=None): -        locals = self.get_local_variables(frameIndex=frameIndex, -                                          threadId=threadId) -        for local in locals: -            if 'name' in local and local['name'] == name: -                return local -        return None - -    def get_local_variable_value(self, name, frameIndex=0, threadId=None): -        variable = self.get_local_variable(name, frameIndex=frameIndex, -                                           threadId=threadId) -        if variable and 'value' in variable: -            return variable['value'] -        return None - -    def replay_packets(self, replay_file_path): -        f = open(replay_file_path, 'r') -        mode = 'invalid' -        set_sequence = False -        command_dict = None -        while mode != 'eof': -            if mode == 'invalid': -                line = f.readline() -                if line.startswith('to adapter:'): -                    mode = 'send' -                elif line.startswith('from adapter:'): -                    mode = 'recv' -            elif mode == 'send': -                command_dict = read_packet(f) -                # Skip the end of line that follows the JSON -                f.readline() -                if command_dict is None: -                    raise ValueError('decode packet failed from replay file') -                print('Sending:') -                pprint.PrettyPrinter(indent=2).pprint(command_dict) -                # raw_input('Press ENTER to send:') -                self.send_packet(command_dict, set_sequence) -                mode = 'invalid' -            elif mode == 'recv': -                print('Replay response:') -                replay_response = read_packet(f) -                # Skip the end of line that follows the JSON -                f.readline() -                pprint.PrettyPrinter(indent=2).pprint(replay_response) -                actual_response = self.recv_packet() -                if actual_response: -                    type = actual_response['type'] -                    print('Actual response:') -                    if type == 'response': -                        self.validate_response(command_dict, actual_response) -                    pprint.PrettyPrinter(indent=2).pprint(actual_response) -                else: -                    print("error: didn't get a valid response") -                mode = 'invalid' - -    def request_attach(self, program=None, pid=None, waitFor=None, trace=None, -                       initCommands=None, preRunCommands=None, -                       stopCommands=None, exitCommands=None, -                       attachCommands=None): -        args_dict = {} -        if pid is not None: -            args_dict['pid'] = pid -        if program is not None: -            args_dict['program'] = program -        if waitFor is not None: -            args_dict['waitFor'] = waitFor -        if trace: -            args_dict['trace'] = trace -        if initCommands: -            args_dict['initCommands'] = initCommands -        if preRunCommands: -            args_dict['preRunCommands'] = preRunCommands -        if stopCommands: -            args_dict['stopCommands'] = stopCommands -        if exitCommands: -            args_dict['exitCommands'] = exitCommands -        if attachCommands: -            args_dict['attachCommands'] = attachCommands -        command_dict = { -            'command': 'attach', -            'type': 'request', -            'arguments': args_dict -        } -        return self.send_recv(command_dict) - -    def request_configurationDone(self): -        command_dict = { -            'command': 'configurationDone', -            'type': 'request', -            'arguments': {} -        } -        response = self.send_recv(command_dict) -        if response: -            self.configuration_done_sent = True -        return response - -    def _process_stopped(self): -        self.threads = None -        self.frame_scopes = {} - -    def request_continue(self, threadId=None): -        if self.exit_status is not None: -            raise ValueError('request_continue called after process exited') -        # If we have launched or attached, then the first continue is done by -        # sending the 'configurationDone' request -        if not self.configuration_done_sent: -            return self.request_configurationDone() -        args_dict = {} -        if threadId is None: -            threadId = self.get_thread_id() -        args_dict['threadId'] = threadId -        command_dict = { -            'command': 'continue', -            'type': 'request', -            'arguments': args_dict -        } -        response = self.send_recv(command_dict) -        recv_packets = [] -        self.recv_condition.acquire() -        for event in self.recv_packets: -            if event['event'] != 'stopped': -                recv_packets.append(event) -        self.recv_packets = recv_packets -        self.recv_condition.release() -        return response - -    def request_disconnect(self, terminateDebuggee=None): -        args_dict = {} -        if terminateDebuggee is not None: -            if terminateDebuggee: -                args_dict['terminateDebuggee'] = True -            else: -                args_dict['terminateDebuggee'] = False -        command_dict = { -            'command': 'disconnect', -            'type': 'request', -            'arguments': args_dict -        } -        return self.send_recv(command_dict) - -    def request_evaluate(self, expression, frameIndex=0, threadId=None): -        stackFrame = self.get_stackFrame(frameIndex=frameIndex, -                                         threadId=threadId) -        if stackFrame is None: -            return [] -        args_dict = { -            'expression': expression, -            'frameId': stackFrame['id'], -        } -        command_dict = { -            'command': 'evaluate', -            'type': 'request', -            'arguments': args_dict -        } -        return self.send_recv(command_dict) - -    def request_initialize(self): -        command_dict = { -            'command': 'initialize', -            'type': 'request', -            'arguments': { -                'adapterID': 'lldb-native', -                'clientID': 'vscode', -                'columnsStartAt1': True, -                'linesStartAt1': True, -                'locale': 'en-us', -                'pathFormat': 'path', -                'supportsRunInTerminalRequest': True, -                'supportsVariablePaging': True, -                'supportsVariableType': True -            } -        } -        response = self.send_recv(command_dict) -        if response: -            if 'body' in response: -                self.initialize_body = response['body'] -        return response - -    def request_launch(self, program, args=None, cwd=None, env=None, -                       stopOnEntry=False, disableASLR=True, -                       disableSTDIO=False, shellExpandArguments=False, -                       trace=False, initCommands=None, preRunCommands=None, -                       stopCommands=None, exitCommands=None, sourcePath=None, -                       debuggerRoot=None): -        args_dict = { -            'program': program -        } -        if args: -            args_dict['args'] = args -        if cwd: -            args_dict['cwd'] = cwd -        if env: -            args_dict['env'] = env -        if stopOnEntry: -            args_dict['stopOnEntry'] = stopOnEntry -        if disableASLR: -            args_dict['disableASLR'] = disableASLR -        if disableSTDIO: -            args_dict['disableSTDIO'] = disableSTDIO -        if shellExpandArguments: -            args_dict['shellExpandArguments'] = shellExpandArguments -        if trace: -            args_dict['trace'] = trace -        if initCommands: -            args_dict['initCommands'] = initCommands -        if preRunCommands: -            args_dict['preRunCommands'] = preRunCommands -        if stopCommands: -            args_dict['stopCommands'] = stopCommands -        if exitCommands: -            args_dict['exitCommands'] = exitCommands -        if sourcePath: -            args_dict['sourcePath'] = sourcePath -        if debuggerRoot: -            args_dict['debuggerRoot'] = debuggerRoot -        command_dict = { -            'command': 'launch', -            'type': 'request', -            'arguments': args_dict -        } -        response = self.send_recv(command_dict) - -        # Wait for a 'process' and 'initialized' event in any order -        self.wait_for_event(filter=['process', 'initialized']) -        self.wait_for_event(filter=['process', 'initialized']) -        return response - -    def request_next(self, threadId): -        if self.exit_status is not None: -            raise ValueError('request_continue called after process exited') -        args_dict = {'threadId': threadId} -        command_dict = { -            'command': 'next', -            'type': 'request', -            'arguments': args_dict -        } -        return self.send_recv(command_dict) - -    def request_stepIn(self, threadId): -        if self.exit_status is not None: -            raise ValueError('request_continue called after process exited') -        args_dict = {'threadId': threadId} -        command_dict = { -            'command': 'stepIn', -            'type': 'request', -            'arguments': args_dict -        } -        return self.send_recv(command_dict) - -    def request_stepOut(self, threadId): -        if self.exit_status is not None: -            raise ValueError('request_continue called after process exited') -        args_dict = {'threadId': threadId} -        command_dict = { -            'command': 'stepOut', -            'type': 'request', -            'arguments': args_dict -        } -        return self.send_recv(command_dict) - -    def request_pause(self, threadId=None): -        if self.exit_status is not None: -            raise ValueError('request_continue called after process exited') -        if threadId is None: -            threadId = self.get_thread_id() -        args_dict = {'threadId': threadId} -        command_dict = { -            'command': 'pause', -            'type': 'request', -            'arguments': args_dict -        } -        return self.send_recv(command_dict) - -    def request_scopes(self, frameId): -        args_dict = {'frameId': frameId} -        command_dict = { -            'command': 'scopes', -            'type': 'request', -            'arguments': args_dict -        } -        return self.send_recv(command_dict) - -    def request_setBreakpoints(self, file_path, line_array, condition=None, -                               hitCondition=None): -        (dir, base) = os.path.split(file_path) -        breakpoints = [] -        for line in line_array: -            bp = {'line': line} -            if condition is not None: -                bp['condition'] = condition -            if hitCondition is not None: -                bp['hitCondition'] = hitCondition -            breakpoints.append(bp) -        source_dict = { -            'name': base, -            'path': file_path -        } -        args_dict = { -            'source': source_dict, -            'breakpoints': breakpoints, -            'lines': '%s' % (line_array), -            'sourceModified': False, -        } -        command_dict = { -            'command': 'setBreakpoints', -            'type': 'request', -            'arguments': args_dict -        } -        return self.send_recv(command_dict) - -    def request_setExceptionBreakpoints(self, filters): -        args_dict = {'filters': filters} -        command_dict = { -            'command': 'setExceptionBreakpoints', -            'type': 'request', -            'arguments': args_dict -        } -        return self.send_recv(command_dict) - -    def request_setFunctionBreakpoints(self, names, condition=None, -                                       hitCondition=None): -        breakpoints = [] -        for name in names: -            bp = {'name': name} -            if condition is not None: -                bp['condition'] = condition -            if hitCondition is not None: -                bp['hitCondition'] = hitCondition -            breakpoints.append(bp) -        args_dict = {'breakpoints': breakpoints} -        command_dict = { -            'command': 'setFunctionBreakpoints', -            'type': 'request', -            'arguments': args_dict -        } -        return self.send_recv(command_dict) - -    def request_stackTrace(self, threadId=None, startFrame=None, levels=None, -                           dump=False): -        if threadId is None: -            threadId = self.get_thread_id() -        args_dict = {'threadId': threadId} -        if startFrame is not None: -            args_dict['startFrame'] = startFrame -        if levels is not None: -            args_dict['levels'] = levels -        command_dict = { -            'command': 'stackTrace', -            'type': 'request', -            'arguments': args_dict -        } -        response = self.send_recv(command_dict) -        if dump: -            for (idx, frame) in enumerate(response['body']['stackFrames']): -                name = frame['name'] -                if 'line' in frame and 'source' in frame: -                    source = frame['source'] -                    if 'sourceReference' not in source: -                        if 'name' in source: -                            source_name = source['name'] -                            line = frame['line'] -                            print("[%3u] %s @ %s:%u" % (idx, name, source_name, -                                                        line)) -                            continue -                print("[%3u] %s" % (idx, name)) -        return response - -    def request_threads(self): -        '''Request a list of all threads and combine any information from any -           "stopped" events since those contain more information about why a -           thread actually stopped. Returns an array of thread dictionaries -           with information about all threads''' -        command_dict = { -            'command': 'threads', -            'type': 'request', -            'arguments': {} -        } -        response = self.send_recv(command_dict) -        body = response['body'] -        # Fill in "self.threads" correctly so that clients that call -        # self.get_threads() or self.get_thread_id(...) can get information -        # on threads when the process is stopped. -        if 'threads' in body: -            self.threads = body['threads'] -            for thread in self.threads: -                # Copy the thread dictionary so we can add key/value pairs to -                # it without affecfting the original info from the "threads" -                # command. -                tid = thread['id'] -                if tid in self.thread_stop_reasons: -                    thread_stop_info = self.thread_stop_reasons[tid] -                    copy_keys = ['reason', 'description', 'text'] -                    for key in copy_keys: -                        if key in thread_stop_info: -                            thread[key] = thread_stop_info[key] -        else: -            self.threads = None -        return response - -    def request_variables(self, variablesReference, start=None, count=None): -        args_dict = {'variablesReference': variablesReference} -        if start is not None: -            args_dict['start'] = start -        if count is not None: -            args_dict['count'] = count -        command_dict = { -            'command': 'variables', -            'type': 'request', -            'arguments': args_dict -        } -        return self.send_recv(command_dict) - -    def request_setVariable(self, containingVarRef, name, value, id=None): -        args_dict = { -            'variablesReference': containingVarRef, -            'name': name, -            'value': str(value) -        } -        if id is not None: -            args_dict['id'] = id -        command_dict = { -            'command': 'setVariable', -            'type': 'request', -            'arguments': args_dict -        } -        return self.send_recv(command_dict) - -    def request_testGetTargetBreakpoints(self): -        '''A request packet used in the LLDB test suite to get all currently -           set breakpoint infos for all breakpoints currently set in the -           target. -        ''' -        command_dict = { -            'command': '_testGetTargetBreakpoints', -            'type': 'request', -            'arguments': {} -        } -        return self.send_recv(command_dict) - -    def terminate(self): -        self.send.close() -        # self.recv.close() - - -class DebugAdaptor(DebugCommunication): -    def __init__(self, executable=None, port=None): -        self.process = None -        if executable is not None: -            self.process = subprocess.Popen([executable], -                                            stdin=subprocess.PIPE, -                                            stdout=subprocess.PIPE, -                                            stderr=subprocess.PIPE) -            DebugCommunication.__init__(self, self.process.stdout, -                                        self.process.stdin) -        elif port is not None: -            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -            s.connect(('127.0.0.1', port)) -            DebugCommunication.__init__(self, s.makefile('r'), s.makefile('w')) - -    def get_pid(self): -        if self.process: -            return self.process.pid -        return -1 - -    def terminate(self): -        super(DebugAdaptor, self).terminate() -        if self.process is not None: -            self.process.terminate() -            self.process.wait() -            self.process = None - - -def attach_options_specified(options): -    if options.pid is not None: -        return True -    if options.waitFor: -        return True -    if options.attach: -        return True -    if options.attachCmds: -        return True -    return False - - -def run_vscode(dbg, args, options): -    dbg.request_initialize() -    if attach_options_specified(options): -        response = dbg.request_attach(program=options.program, -                                      pid=options.pid, -                                      waitFor=options.waitFor, -                                      attachCommands=options.attachCmds, -                                      initCommands=options.initCmds, -                                      preRunCommands=options.preRunCmds, -                                      stopCommands=options.stopCmds, -                                      exitCommands=options.exitCmds) -    else: -        response = dbg.request_launch(options.program, -                                      args=args, -                                      env=options.envs, -                                      cwd=options.workingDir, -                                      debuggerRoot=options.debuggerRoot, -                                      sourcePath=options.sourcePath, -                                      initCommands=options.initCmds, -                                      preRunCommands=options.preRunCmds, -                                      stopCommands=options.stopCmds, -                                      exitCommands=options.exitCmds) - -    if response['success']: -        if options.sourceBreakpoints: -            source_to_lines = {} -            for file_line in options.sourceBreakpoints: -                (path, line) = file_line.split(':') -                if len(path) == 0 or len(line) == 0: -                    print('error: invalid source with line "%s"' % -                          (file_line)) - -                else: -                    if path in source_to_lines: -                        source_to_lines[path].append(int(line)) -                    else: -                        source_to_lines[path] = [int(line)] -            for source in source_to_lines: -                dbg.request_setBreakpoints(source, source_to_lines[source]) -        if options.funcBreakpoints: -            dbg.request_setFunctionBreakpoints(options.funcBreakpoints) -        dbg.request_configurationDone() -        dbg.wait_for_stopped() -    else: -        if 'message' in response: -            print(response['message']) -    dbg.request_disconnect(terminateDebuggee=True) - - -def main(): -    parser = optparse.OptionParser( -        description=('A testing framework for the Visual Studio Code Debug ' -                     'Adaptor protocol')) - -    parser.add_option( -        '--vscode', -        type='string', -        dest='vscode_path', -        help=('The path to the command line program that implements the ' -              'Visual Studio Code Debug Adaptor protocol.'), -        default=None) - -    parser.add_option( -        '--program', -        type='string', -        dest='program', -        help='The path to the program to debug.', -        default=None) - -    parser.add_option( -        '--workingDir', -        type='string', -        dest='workingDir', -        default=None, -        help='Set the working directory for the process we launch.') - -    parser.add_option( -        '--sourcePath', -        type='string', -        dest='sourcePath', -        default=None, -        help=('Set the relative source root for any debug info that has ' -              'relative paths in it.')) - -    parser.add_option( -        '--debuggerRoot', -        type='string', -        dest='debuggerRoot', -        default=None, -        help=('Set the working directory for lldb-vscode for any object files ' -              'with relative paths in the Mach-o debug map.')) - -    parser.add_option( -        '-r', '--replay', -        type='string', -        dest='replay', -        help=('Specify a file containing a packet log to replay with the ' -              'current Visual Studio Code Debug Adaptor executable.'), -        default=None) - -    parser.add_option( -        '-g', '--debug', -        action='store_true', -        dest='debug', -        default=False, -        help='Pause waiting for a debugger to attach to the debug adaptor') - -    parser.add_option( -        '--port', -        type='int', -        dest='port', -        help="Attach a socket to a port instead of using STDIN for VSCode", -        default=None) - -    parser.add_option( -        '--pid', -        type='int', -        dest='pid', -        help="The process ID to attach to", -        default=None) - -    parser.add_option( -        '--attach', -        action='store_true', -        dest='attach', -        default=False, -        help=('Specify this option to attach to a process by name. The ' -              'process name is the basanme of the executable specified with ' -              'the --program option.')) - -    parser.add_option( -        '-f', '--function-bp', -        type='string', -        action='append', -        dest='funcBreakpoints', -        help=('Specify the name of a function to break at. ' -              'Can be specified more than once.'), -        default=[]) - -    parser.add_option( -        '-s', '--source-bp', -        type='string', -        action='append', -        dest='sourceBreakpoints', -        default=[], -        help=('Specify source breakpoints to set in the format of ' -              '<source>:<line>. ' -              'Can be specified more than once.')) - -    parser.add_option( -        '--attachCommand', -        type='string', -        action='append', -        dest='attachCmds', -        default=[], -        help=('Specify a LLDB command that will attach to a process. ' -              'Can be specified more than once.')) - -    parser.add_option( -        '--initCommand', -        type='string', -        action='append', -        dest='initCmds', -        default=[], -        help=('Specify a LLDB command that will be executed before the target ' -              'is created. Can be specified more than once.')) - -    parser.add_option( -        '--preRunCommand', -        type='string', -        action='append', -        dest='preRunCmds', -        default=[], -        help=('Specify a LLDB command that will be executed after the target ' -              'has been created. Can be specified more than once.')) - -    parser.add_option( -        '--stopCommand', -        type='string', -        action='append', -        dest='stopCmds', -        default=[], -        help=('Specify a LLDB command that will be executed each time the' -              'process stops. Can be specified more than once.')) - -    parser.add_option( -        '--exitCommand', -        type='string', -        action='append', -        dest='exitCmds', -        default=[], -        help=('Specify a LLDB command that will be executed when the process ' -              'exits. Can be specified more than once.')) - -    parser.add_option( -        '--env', -        type='string', -        action='append', -        dest='envs', -        default=[], -        help=('Specify environment variables to pass to the launched ' -              'process.')) - -    parser.add_option( -        '--waitFor', -        action='store_true', -        dest='waitFor', -        default=False, -        help=('Wait for the next process to be launched whose name matches ' -              'the basename of the program specified with the --program ' -              'option')) - -    (options, args) = parser.parse_args(sys.argv[1:]) - -    if options.vscode_path is None and options.port is None: -        print('error: must either specify a path to a Visual Studio Code ' -              'Debug Adaptor vscode executable path using the --vscode ' -              'option, or a port to attach to for an existing lldb-vscode ' -              'using the --port option') -        return -    dbg = DebugAdaptor(executable=options.vscode_path, port=options.port) -    if options.debug: -        raw_input('Waiting for debugger to attach pid "%i"' % ( -                  dbg.get_pid())) -    if options.replay: -        dbg.replay_packets(options.replay) -    else: -        run_vscode(dbg, args, options) -    dbg.terminate() - - -if __name__ == '__main__': -    main() | 
