diff options
Diffstat (limited to 'packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py')
-rw-r--r-- | packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py | 1084 |
1 files changed, 1084 insertions, 0 deletions
diff --git a/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py b/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py new file mode 100644 index 000000000000..066158d0877f --- /dev/null +++ b/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py @@ -0,0 +1,1084 @@ +#!/usr/bin/env python + +import binascii +import json +import optparse +import os +import pprint +import socket +import string +import subprocess +import sys +import threading + + +def dump_memory(base_addr, data, num_per_line, outfile): + + data_len = len(data) + hex_string = binascii.hexlify(data) + addr = base_addr + ascii_str = '' + i = 0 + while i < data_len: + outfile.write('0x%8.8x: ' % (addr + i)) + bytes_left = data_len - i + if bytes_left >= num_per_line: + curr_data_len = num_per_line + else: + curr_data_len = bytes_left + hex_start_idx = i * 2 + hex_end_idx = hex_start_idx + curr_data_len * 2 + curr_hex_str = hex_string[hex_start_idx:hex_end_idx] + # 'curr_hex_str' now contains the hex byte string for the + # current line with no spaces between bytes + t = iter(curr_hex_str) + # Print hex bytes separated by space + outfile.write(' '.join(a + b for a, b in zip(t, t))) + # Print two spaces + outfile.write(' ') + # Calculate ASCII string for bytes into 'ascii_str' + ascii_str = '' + for j in range(i, i + curr_data_len): + ch = data[j] + if ch in string.printable and ch not in string.whitespace: + ascii_str += '%c' % (ch) + else: + ascii_str += '.' + # Print ASCII representation and newline + outfile.write(ascii_str) + i = i + curr_data_len + outfile.write('\n') + + +def read_packet(f, verbose=False, trace_file=None): + '''Decode a JSON packet that starts with the content length and is + followed by the JSON bytes from a file 'f' + ''' + line = f.readline() + if len(line) == 0: + return None + + # Watch for line that starts with the prefix + prefix = 'Content-Length: ' + if line.startswith(prefix): + # Decode length of JSON bytes + if verbose: + print('content: "%s"' % (line)) + length = int(line[len(prefix):]) + if verbose: + print('length: "%u"' % (length)) + # Skip empty line + line = f.readline() + if verbose: + print('empty: "%s"' % (line)) + # Read JSON bytes + json_str = f.read(length) + if verbose: + print('json: "%s"' % (json_str)) + if trace_file: + trace_file.write('from adaptor:\n%s\n' % (json_str)) + # Decode the JSON bytes into a python dictionary + return json.loads(json_str) + + return None + + +def packet_type_is(packet, packet_type): + return 'type' in packet and packet['type'] == packet_type + + +def read_packet_thread(vs_comm): + done = False + while not done: + packet = read_packet(vs_comm.recv, trace_file=vs_comm.trace_file) + if packet: + done = not vs_comm.handle_recv_packet(packet) + else: + done = True + + +class DebugCommunication(object): + + def __init__(self, recv, send): + self.trace_file = None + self.send = send + self.recv = recv + self.recv_packets = [] + self.recv_condition = threading.Condition() + self.recv_thread = threading.Thread(target=read_packet_thread, + args=(self,)) + self.process_event_body = None + self.exit_status = None + self.initialize_body = None + self.thread_stop_reasons = {} + self.sequence = 1 + self.threads = None + self.recv_thread.start() + self.output_condition = threading.Condition() + self.output = {} + self.configuration_done_sent = False + self.frame_scopes = {} + + @classmethod + def encode_content(cls, s): + return "Content-Length: %u\r\n\r\n%s" % (len(s), s) + + @classmethod + def validate_response(cls, command, response): + if command['command'] != response['command']: + raise ValueError('command mismatch in response') + if command['seq'] != response['request_seq']: + raise ValueError('seq mismatch in response') + + def get_output(self, category, timeout=0.0, clear=True): + self.output_condition.acquire() + output = None + if category in self.output: + output = self.output[category] + if clear: + del self.output[category] + elif timeout != 0.0: + self.output_condition.wait(timeout) + if category in self.output: + output = self.output[category] + if clear: + del self.output[category] + self.output_condition.release() + return output + + def handle_recv_packet(self, packet): + '''Called by the read thread that is waiting for all incoming packets + to store the incoming packet in "self.recv_packets" in a thread safe + way. This function will then signal the "self.recv_condition" to + indicate a new packet is available. Returns True if the caller + should keep calling this function for more packets. + ''' + # Check the packet to see if is an event packet + keepGoing = True + packet_type = packet['type'] + if packet_type == 'event': + event = packet['event'] + body = None + if 'body' in packet: + body = packet['body'] + # Handle the event packet and cache information from these packets + # as they come in + if event == 'output': + # Store any output we receive so clients can retrieve it later. + category = body['category'] + output = body['output'] + self.output_condition.acquire() + if category in self.output: + self.output[category] += output + else: + self.output[category] = output + self.output_condition.notify() + self.output_condition.release() + # no need to add 'output' packets to our packets list + return keepGoing + elif event == 'process': + # When a new process is attached or launched, remember the + # details that are available in the body of the event + self.process_event_body = body + elif event == 'stopped': + # Each thread that stops with a reason will send a + # 'stopped' event. We need to remember the thread stop + # reasons since the 'threads' command doesn't return + # that information. + self._process_stopped() + tid = body['threadId'] + self.thread_stop_reasons[tid] = body + elif packet_type == 'response': + if packet['command'] == 'disconnect': + keepGoing = False + self.recv_condition.acquire() + self.recv_packets.append(packet) + self.recv_condition.notify() + self.recv_condition.release() + return keepGoing + + def send_packet(self, command_dict, set_sequence=True): + '''Take the "command_dict" python dictionary and encode it as a JSON + string and send the contents as a packet to the VSCode debug + adaptor''' + # Set the sequence ID for this command automatically + if set_sequence: + command_dict['seq'] = self.sequence + self.sequence += 1 + # Encode our command dictionary as a JSON string + json_str = json.dumps(command_dict, separators=(',', ':')) + if self.trace_file: + self.trace_file.write('to adaptor:\n%s\n' % (json_str)) + length = len(json_str) + if length > 0: + # Send the encoded JSON packet and flush the 'send' file + self.send.write(self.encode_content(json_str)) + self.send.flush() + + def recv_packet(self, filter_type=None, filter_event=None, timeout=None): + '''Get a JSON packet from the VSCode debug adaptor. This function + assumes a thread that reads packets is running and will deliver + any received packets by calling handle_recv_packet(...). This + function will wait for the packet to arrive and return it when + it does.''' + while True: + self.recv_condition.acquire() + packet = None + while True: + for (i, curr_packet) in enumerate(self.recv_packets): + packet_type = curr_packet['type'] + if filter_type is None or packet_type in filter_type: + if (filter_event is None or + (packet_type == 'event' and + curr_packet['event'] in filter_event)): + packet = self.recv_packets.pop(i) + break + if packet: + break + # Sleep until packet is received + len_before = len(self.recv_packets) + self.recv_condition.wait(timeout) + len_after = len(self.recv_packets) + if len_before == len_after: + return None # Timed out + self.recv_condition.release() + return packet + + return None + + def send_recv(self, command): + '''Send a command python dictionary as JSON and receive the JSON + response. Validates that the response is the correct sequence and + command in the reply. Any events that are received are added to the + events list in this object''' + self.send_packet(command) + done = False + while not done: + response = self.recv_packet(filter_type='response') + if response is None: + desc = 'no response for "%s"' % (command['command']) + raise ValueError(desc) + self.validate_response(command, response) + return response + return None + + def wait_for_event(self, filter=None, timeout=None): + while True: + return self.recv_packet(filter_type='event', filter_event=filter, + timeout=timeout) + return None + + def wait_for_stopped(self, timeout=None): + stopped_events = [] + stopped_event = self.wait_for_event(filter=['stopped', 'exited'], + timeout=timeout) + exited = False + while stopped_event: + stopped_events.append(stopped_event) + # If we exited, then we are done + if stopped_event['event'] == 'exited': + self.exit_status = stopped_event['body']['exitCode'] + exited = True + break + # Otherwise we stopped and there might be one or more 'stopped' + # events for each thread that stopped with a reason, so keep + # checking for more 'stopped' events and return all of them + stopped_event = self.wait_for_event(filter='stopped', timeout=0.25) + if exited: + self.threads = [] + return stopped_events + + def wait_for_exited(self): + event_dict = self.wait_for_event('exited') + if event_dict is None: + raise ValueError("didn't get stopped event") + return event_dict + + def get_initialize_value(self, key): + '''Get a value for the given key if it there is a key/value pair in + the "initialize" request response body. + ''' + if self.initialize_body and key in self.initialize_body: + return self.initialize_body[key] + return None + + def get_threads(self): + if self.threads is None: + self.request_threads() + return self.threads + + def get_thread_id(self, threadIndex=0): + '''Utility function to get the first thread ID in the thread list. + If the thread list is empty, then fetch the threads. + ''' + if self.threads is None: + self.request_threads() + if self.threads and threadIndex < len(self.threads): + return self.threads[threadIndex]['id'] + return None + + def get_stackFrame(self, frameIndex=0, threadId=None): + '''Get a single "StackFrame" object from a "stackTrace" request and + return the "StackFrame as a python dictionary, or None on failure + ''' + if threadId is None: + threadId = self.get_thread_id() + if threadId is None: + print('invalid threadId') + return None + response = self.request_stackTrace(threadId, startFrame=frameIndex, + levels=1) + if response: + return response['body']['stackFrames'][0] + print('invalid response') + return None + + def get_scope_variables(self, scope_name, frameIndex=0, threadId=None): + stackFrame = self.get_stackFrame(frameIndex=frameIndex, + threadId=threadId) + if stackFrame is None: + return [] + frameId = stackFrame['id'] + if frameId in self.frame_scopes: + frame_scopes = self.frame_scopes[frameId] + else: + scopes_response = self.request_scopes(frameId) + frame_scopes = scopes_response['body']['scopes'] + self.frame_scopes[frameId] = frame_scopes + for scope in frame_scopes: + if scope['name'] == scope_name: + varRef = scope['variablesReference'] + variables_response = self.request_variables(varRef) + if variables_response: + if 'body' in variables_response: + body = variables_response['body'] + if 'variables' in body: + vars = body['variables'] + return vars + return [] + + def get_global_variables(self, frameIndex=0, threadId=None): + return self.get_scope_variables('Globals', frameIndex=frameIndex, + threadId=threadId) + + def get_local_variables(self, frameIndex=0, threadId=None): + return self.get_scope_variables('Locals', frameIndex=frameIndex, + threadId=threadId) + + def get_local_variable(self, name, frameIndex=0, threadId=None): + locals = self.get_local_variables(frameIndex=frameIndex, + threadId=threadId) + for local in locals: + if 'name' in local and local['name'] == name: + return local + return None + + def get_local_variable_value(self, name, frameIndex=0, threadId=None): + variable = self.get_local_variable(name, frameIndex=frameIndex, + threadId=threadId) + if variable and 'value' in variable: + return variable['value'] + return None + + def replay_packets(self, replay_file_path): + f = open(replay_file_path, 'r') + mode = 'invalid' + set_sequence = False + command_dict = None + while mode != 'eof': + if mode == 'invalid': + line = f.readline() + if line.startswith('to adapter:'): + mode = 'send' + elif line.startswith('from adapter:'): + mode = 'recv' + elif mode == 'send': + command_dict = read_packet(f) + # Skip the end of line that follows the JSON + f.readline() + if command_dict is None: + raise ValueError('decode packet failed from replay file') + print('Sending:') + pprint.PrettyPrinter(indent=2).pprint(command_dict) + # raw_input('Press ENTER to send:') + self.send_packet(command_dict, set_sequence) + mode = 'invalid' + elif mode == 'recv': + print('Replay response:') + replay_response = read_packet(f) + # Skip the end of line that follows the JSON + f.readline() + pprint.PrettyPrinter(indent=2).pprint(replay_response) + actual_response = self.recv_packet() + if actual_response: + type = actual_response['type'] + print('Actual response:') + if type == 'response': + self.validate_response(command_dict, actual_response) + pprint.PrettyPrinter(indent=2).pprint(actual_response) + else: + print("error: didn't get a valid response") + mode = 'invalid' + + def request_attach(self, program=None, pid=None, waitFor=None, trace=None, + initCommands=None, preRunCommands=None, + stopCommands=None, exitCommands=None, + attachCommands=None): + args_dict = {} + if pid is not None: + args_dict['pid'] = pid + if program is not None: + args_dict['program'] = program + if waitFor is not None: + args_dict['waitFor'] = waitFor + if trace: + args_dict['trace'] = trace + if initCommands: + args_dict['initCommands'] = initCommands + if preRunCommands: + args_dict['preRunCommands'] = preRunCommands + if stopCommands: + args_dict['stopCommands'] = stopCommands + if exitCommands: + args_dict['exitCommands'] = exitCommands + if attachCommands: + args_dict['attachCommands'] = attachCommands + command_dict = { + 'command': 'attach', + 'type': 'request', + 'arguments': args_dict + } + return self.send_recv(command_dict) + + def request_configurationDone(self): + command_dict = { + 'command': 'configurationDone', + 'type': 'request', + 'arguments': {} + } + response = self.send_recv(command_dict) + if response: + self.configuration_done_sent = True + return response + + def _process_stopped(self): + self.threads = None + self.frame_scopes = {} + + def request_continue(self, threadId=None): + if self.exit_status is not None: + raise ValueError('request_continue called after process exited') + # If we have launched or attached, then the first continue is done by + # sending the 'configurationDone' request + if not self.configuration_done_sent: + return self.request_configurationDone() + args_dict = {} + if threadId is None: + threadId = self.get_thread_id() + args_dict['threadId'] = threadId + command_dict = { + 'command': 'continue', + 'type': 'request', + 'arguments': args_dict + } + response = self.send_recv(command_dict) + recv_packets = [] + self.recv_condition.acquire() + for event in self.recv_packets: + if event['event'] != 'stopped': + recv_packets.append(event) + self.recv_packets = recv_packets + self.recv_condition.release() + return response + + def request_disconnect(self, terminateDebuggee=None): + args_dict = {} + if terminateDebuggee is not None: + if terminateDebuggee: + args_dict['terminateDebuggee'] = True + else: + args_dict['terminateDebuggee'] = False + command_dict = { + 'command': 'disconnect', + 'type': 'request', + 'arguments': args_dict + } + return self.send_recv(command_dict) + + def request_evaluate(self, expression, frameIndex=0, threadId=None): + stackFrame = self.get_stackFrame(frameIndex=frameIndex, + threadId=threadId) + if stackFrame is None: + return [] + args_dict = { + 'expression': expression, + 'frameId': stackFrame['id'], + } + command_dict = { + 'command': 'evaluate', + 'type': 'request', + 'arguments': args_dict + } + return self.send_recv(command_dict) + + def request_initialize(self): + command_dict = { + 'command': 'initialize', + 'type': 'request', + 'arguments': { + 'adapterID': 'lldb-native', + 'clientID': 'vscode', + 'columnsStartAt1': True, + 'linesStartAt1': True, + 'locale': 'en-us', + 'pathFormat': 'path', + 'supportsRunInTerminalRequest': True, + 'supportsVariablePaging': True, + 'supportsVariableType': True + } + } + response = self.send_recv(command_dict) + if response: + if 'body' in response: + self.initialize_body = response['body'] + return response + + def request_launch(self, program, args=None, cwd=None, env=None, + stopOnEntry=False, disableASLR=True, + disableSTDIO=False, shellExpandArguments=False, + trace=False, initCommands=None, preRunCommands=None, + stopCommands=None, exitCommands=None, sourcePath=None, + debuggerRoot=None): + args_dict = { + 'program': program + } + if args: + args_dict['args'] = args + if cwd: + args_dict['cwd'] = cwd + if env: + args_dict['env'] = env + if stopOnEntry: + args_dict['stopOnEntry'] = stopOnEntry + if disableASLR: + args_dict['disableASLR'] = disableASLR + if disableSTDIO: + args_dict['disableSTDIO'] = disableSTDIO + if shellExpandArguments: + args_dict['shellExpandArguments'] = shellExpandArguments + if trace: + args_dict['trace'] = trace + if initCommands: + args_dict['initCommands'] = initCommands + if preRunCommands: + args_dict['preRunCommands'] = preRunCommands + if stopCommands: + args_dict['stopCommands'] = stopCommands + if exitCommands: + args_dict['exitCommands'] = exitCommands + if sourcePath: + args_dict['sourcePath'] = sourcePath + if debuggerRoot: + args_dict['debuggerRoot'] = debuggerRoot + command_dict = { + 'command': 'launch', + 'type': 'request', + 'arguments': args_dict + } + response = self.send_recv(command_dict) + + # Wait for a 'process' and 'initialized' event in any order + self.wait_for_event(filter=['process', 'initialized']) + self.wait_for_event(filter=['process', 'initialized']) + return response + + def request_next(self, threadId): + if self.exit_status is not None: + raise ValueError('request_continue called after process exited') + args_dict = {'threadId': threadId} + command_dict = { + 'command': 'next', + 'type': 'request', + 'arguments': args_dict + } + return self.send_recv(command_dict) + + def request_stepIn(self, threadId): + if self.exit_status is not None: + raise ValueError('request_continue called after process exited') + args_dict = {'threadId': threadId} + command_dict = { + 'command': 'stepIn', + 'type': 'request', + 'arguments': args_dict + } + return self.send_recv(command_dict) + + def request_stepOut(self, threadId): + if self.exit_status is not None: + raise ValueError('request_continue called after process exited') + args_dict = {'threadId': threadId} + command_dict = { + 'command': 'stepOut', + 'type': 'request', + 'arguments': args_dict + } + return self.send_recv(command_dict) + + def request_pause(self, threadId=None): + if self.exit_status is not None: + raise ValueError('request_continue called after process exited') + if threadId is None: + threadId = self.get_thread_id() + args_dict = {'threadId': threadId} + command_dict = { + 'command': 'pause', + 'type': 'request', + 'arguments': args_dict + } + return self.send_recv(command_dict) + + def request_scopes(self, frameId): + args_dict = {'frameId': frameId} + command_dict = { + 'command': 'scopes', + 'type': 'request', + 'arguments': args_dict + } + return self.send_recv(command_dict) + + def request_setBreakpoints(self, file_path, line_array, condition=None, + hitCondition=None): + (dir, base) = os.path.split(file_path) + breakpoints = [] + for line in line_array: + bp = {'line': line} + if condition is not None: + bp['condition'] = condition + if hitCondition is not None: + bp['hitCondition'] = hitCondition + breakpoints.append(bp) + source_dict = { + 'name': base, + 'path': file_path + } + args_dict = { + 'source': source_dict, + 'breakpoints': breakpoints, + 'lines': '%s' % (line_array), + 'sourceModified': False, + } + command_dict = { + 'command': 'setBreakpoints', + 'type': 'request', + 'arguments': args_dict + } + return self.send_recv(command_dict) + + def request_setExceptionBreakpoints(self, filters): + args_dict = {'filters': filters} + command_dict = { + 'command': 'setExceptionBreakpoints', + 'type': 'request', + 'arguments': args_dict + } + return self.send_recv(command_dict) + + def request_setFunctionBreakpoints(self, names, condition=None, + hitCondition=None): + breakpoints = [] + for name in names: + bp = {'name': name} + if condition is not None: + bp['condition'] = condition + if hitCondition is not None: + bp['hitCondition'] = hitCondition + breakpoints.append(bp) + args_dict = {'breakpoints': breakpoints} + command_dict = { + 'command': 'setFunctionBreakpoints', + 'type': 'request', + 'arguments': args_dict + } + return self.send_recv(command_dict) + + def request_stackTrace(self, threadId=None, startFrame=None, levels=None, + dump=False): + if threadId is None: + threadId = self.get_thread_id() + args_dict = {'threadId': threadId} + if startFrame is not None: + args_dict['startFrame'] = startFrame + if levels is not None: + args_dict['levels'] = levels + command_dict = { + 'command': 'stackTrace', + 'type': 'request', + 'arguments': args_dict + } + response = self.send_recv(command_dict) + if dump: + for (idx, frame) in enumerate(response['body']['stackFrames']): + name = frame['name'] + if 'line' in frame and 'source' in frame: + source = frame['source'] + if 'sourceReference' not in source: + if 'name' in source: + source_name = source['name'] + line = frame['line'] + print("[%3u] %s @ %s:%u" % (idx, name, source_name, + line)) + continue + print("[%3u] %s" % (idx, name)) + return response + + def request_threads(self): + '''Request a list of all threads and combine any information from any + "stopped" events since those contain more information about why a + thread actually stopped. Returns an array of thread dictionaries + with information about all threads''' + command_dict = { + 'command': 'threads', + 'type': 'request', + 'arguments': {} + } + response = self.send_recv(command_dict) + body = response['body'] + # Fill in "self.threads" correctly so that clients that call + # self.get_threads() or self.get_thread_id(...) can get information + # on threads when the process is stopped. + if 'threads' in body: + self.threads = body['threads'] + for thread in self.threads: + # Copy the thread dictionary so we can add key/value pairs to + # it without affecfting the original info from the "threads" + # command. + tid = thread['id'] + if tid in self.thread_stop_reasons: + thread_stop_info = self.thread_stop_reasons[tid] + copy_keys = ['reason', 'description', 'text'] + for key in copy_keys: + if key in thread_stop_info: + thread[key] = thread_stop_info[key] + else: + self.threads = None + return response + + def request_variables(self, variablesReference, start=None, count=None): + args_dict = {'variablesReference': variablesReference} + if start is not None: + args_dict['start'] = start + if count is not None: + args_dict['count'] = count + command_dict = { + 'command': 'variables', + 'type': 'request', + 'arguments': args_dict + } + return self.send_recv(command_dict) + + def request_setVariable(self, containingVarRef, name, value, id=None): + args_dict = { + 'variablesReference': containingVarRef, + 'name': name, + 'value': str(value) + } + if id is not None: + args_dict['id'] = id + command_dict = { + 'command': 'setVariable', + 'type': 'request', + 'arguments': args_dict + } + return self.send_recv(command_dict) + + def request_testGetTargetBreakpoints(self): + '''A request packet used in the LLDB test suite to get all currently + set breakpoint infos for all breakpoints currently set in the + target. + ''' + command_dict = { + 'command': '_testGetTargetBreakpoints', + 'type': 'request', + 'arguments': {} + } + return self.send_recv(command_dict) + + def terminate(self): + self.send.close() + # self.recv.close() + + +class DebugAdaptor(DebugCommunication): + def __init__(self, executable=None, port=None): + self.process = None + if executable is not None: + self.process = subprocess.Popen([executable], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + DebugCommunication.__init__(self, self.process.stdout, + self.process.stdin) + elif port is not None: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(('127.0.0.1', port)) + DebugCommunication.__init__(self, s.makefile('r'), s.makefile('w')) + + def get_pid(self): + if self.process: + return self.process.pid + return -1 + + def terminate(self): + super(DebugAdaptor, self).terminate() + if self.process is not None: + self.process.terminate() + self.process.wait() + self.process = None + + +def attach_options_specified(options): + if options.pid is not None: + return True + if options.waitFor: + return True + if options.attach: + return True + if options.attachCmds: + return True + return False + + +def run_vscode(dbg, args, options): + dbg.request_initialize() + if attach_options_specified(options): + response = dbg.request_attach(program=options.program, + pid=options.pid, + waitFor=options.waitFor, + attachCommands=options.attachCmds, + initCommands=options.initCmds, + preRunCommands=options.preRunCmds, + stopCommands=options.stopCmds, + exitCommands=options.exitCmds) + else: + response = dbg.request_launch(options.program, + args=args, + env=options.envs, + cwd=options.workingDir, + debuggerRoot=options.debuggerRoot, + sourcePath=options.sourcePath, + initCommands=options.initCmds, + preRunCommands=options.preRunCmds, + stopCommands=options.stopCmds, + exitCommands=options.exitCmds) + + if response['success']: + if options.sourceBreakpoints: + source_to_lines = {} + for file_line in options.sourceBreakpoints: + (path, line) = file_line.split(':') + if len(path) == 0 or len(line) == 0: + print('error: invalid source with line "%s"' % + (file_line)) + + else: + if path in source_to_lines: + source_to_lines[path].append(int(line)) + else: + source_to_lines[path] = [int(line)] + for source in source_to_lines: + dbg.request_setBreakpoints(source, source_to_lines[source]) + if options.funcBreakpoints: + dbg.request_setFunctionBreakpoints(options.funcBreakpoints) + dbg.request_configurationDone() + dbg.wait_for_stopped() + else: + if 'message' in response: + print(response['message']) + dbg.request_disconnect(terminateDebuggee=True) + + +def main(): + parser = optparse.OptionParser( + description=('A testing framework for the Visual Studio Code Debug ' + 'Adaptor protocol')) + + parser.add_option( + '--vscode', + type='string', + dest='vscode_path', + help=('The path to the command line program that implements the ' + 'Visual Studio Code Debug Adaptor protocol.'), + default=None) + + parser.add_option( + '--program', + type='string', + dest='program', + help='The path to the program to debug.', + default=None) + + parser.add_option( + '--workingDir', + type='string', + dest='workingDir', + default=None, + help='Set the working directory for the process we launch.') + + parser.add_option( + '--sourcePath', + type='string', + dest='sourcePath', + default=None, + help=('Set the relative source root for any debug info that has ' + 'relative paths in it.')) + + parser.add_option( + '--debuggerRoot', + type='string', + dest='debuggerRoot', + default=None, + help=('Set the working directory for lldb-vscode for any object files ' + 'with relative paths in the Mach-o debug map.')) + + parser.add_option( + '-r', '--replay', + type='string', + dest='replay', + help=('Specify a file containing a packet log to replay with the ' + 'current Visual Studio Code Debug Adaptor executable.'), + default=None) + + parser.add_option( + '-g', '--debug', + action='store_true', + dest='debug', + default=False, + help='Pause waiting for a debugger to attach to the debug adaptor') + + parser.add_option( + '--port', + type='int', + dest='port', + help="Attach a socket to a port instead of using STDIN for VSCode", + default=None) + + parser.add_option( + '--pid', + type='int', + dest='pid', + help="The process ID to attach to", + default=None) + + parser.add_option( + '--attach', + action='store_true', + dest='attach', + default=False, + help=('Specify this option to attach to a process by name. The ' + 'process name is the basanme of the executable specified with ' + 'the --program option.')) + + parser.add_option( + '-f', '--function-bp', + type='string', + action='append', + dest='funcBreakpoints', + help=('Specify the name of a function to break at. ' + 'Can be specified more than once.'), + default=[]) + + parser.add_option( + '-s', '--source-bp', + type='string', + action='append', + dest='sourceBreakpoints', + default=[], + help=('Specify source breakpoints to set in the format of ' + '<source>:<line>. ' + 'Can be specified more than once.')) + + parser.add_option( + '--attachCommand', + type='string', + action='append', + dest='attachCmds', + default=[], + help=('Specify a LLDB command that will attach to a process. ' + 'Can be specified more than once.')) + + parser.add_option( + '--initCommand', + type='string', + action='append', + dest='initCmds', + default=[], + help=('Specify a LLDB command that will be executed before the target ' + 'is created. Can be specified more than once.')) + + parser.add_option( + '--preRunCommand', + type='string', + action='append', + dest='preRunCmds', + default=[], + help=('Specify a LLDB command that will be executed after the target ' + 'has been created. Can be specified more than once.')) + + parser.add_option( + '--stopCommand', + type='string', + action='append', + dest='stopCmds', + default=[], + help=('Specify a LLDB command that will be executed each time the' + 'process stops. Can be specified more than once.')) + + parser.add_option( + '--exitCommand', + type='string', + action='append', + dest='exitCmds', + default=[], + help=('Specify a LLDB command that will be executed when the process ' + 'exits. Can be specified more than once.')) + + parser.add_option( + '--env', + type='string', + action='append', + dest='envs', + default=[], + help=('Specify environment variables to pass to the launched ' + 'process.')) + + parser.add_option( + '--waitFor', + action='store_true', + dest='waitFor', + default=False, + help=('Wait for the next process to be launched whose name matches ' + 'the basename of the program specified with the --program ' + 'option')) + + (options, args) = parser.parse_args(sys.argv[1:]) + + if options.vscode_path is None and options.port is None: + print('error: must either specify a path to a Visual Studio Code ' + 'Debug Adaptor vscode executable path using the --vscode ' + 'option, or a port to attach to for an existing lldb-vscode ' + 'using the --port option') + return + dbg = DebugAdaptor(executable=options.vscode_path, port=options.port) + if options.debug: + raw_input('Waiting for debugger to attach pid "%i"' % ( + dbg.get_pid())) + if options.replay: + dbg.replay_packets(options.replay) + else: + run_vscode(dbg, args, options) + dbg.terminate() + + +if __name__ == '__main__': + main() |