summaryrefslogtreecommitdiff
path: root/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py
diff options
context:
space:
mode:
Diffstat (limited to 'packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py')
-rw-r--r--packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py1084
1 files changed, 1084 insertions, 0 deletions
diff --git a/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py b/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py
new file mode 100644
index 000000000000..066158d0877f
--- /dev/null
+++ b/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py
@@ -0,0 +1,1084 @@
+#!/usr/bin/env python
+
+import binascii
+import json
+import optparse
+import os
+import pprint
+import socket
+import string
+import subprocess
+import sys
+import threading
+
+
+def dump_memory(base_addr, data, num_per_line, outfile):
+
+ data_len = len(data)
+ hex_string = binascii.hexlify(data)
+ addr = base_addr
+ ascii_str = ''
+ i = 0
+ while i < data_len:
+ outfile.write('0x%8.8x: ' % (addr + i))
+ bytes_left = data_len - i
+ if bytes_left >= num_per_line:
+ curr_data_len = num_per_line
+ else:
+ curr_data_len = bytes_left
+ hex_start_idx = i * 2
+ hex_end_idx = hex_start_idx + curr_data_len * 2
+ curr_hex_str = hex_string[hex_start_idx:hex_end_idx]
+ # 'curr_hex_str' now contains the hex byte string for the
+ # current line with no spaces between bytes
+ t = iter(curr_hex_str)
+ # Print hex bytes separated by space
+ outfile.write(' '.join(a + b for a, b in zip(t, t)))
+ # Print two spaces
+ outfile.write(' ')
+ # Calculate ASCII string for bytes into 'ascii_str'
+ ascii_str = ''
+ for j in range(i, i + curr_data_len):
+ ch = data[j]
+ if ch in string.printable and ch not in string.whitespace:
+ ascii_str += '%c' % (ch)
+ else:
+ ascii_str += '.'
+ # Print ASCII representation and newline
+ outfile.write(ascii_str)
+ i = i + curr_data_len
+ outfile.write('\n')
+
+
+def read_packet(f, verbose=False, trace_file=None):
+ '''Decode a JSON packet that starts with the content length and is
+ followed by the JSON bytes from a file 'f'
+ '''
+ line = f.readline()
+ if len(line) == 0:
+ return None
+
+ # Watch for line that starts with the prefix
+ prefix = 'Content-Length: '
+ if line.startswith(prefix):
+ # Decode length of JSON bytes
+ if verbose:
+ print('content: "%s"' % (line))
+ length = int(line[len(prefix):])
+ if verbose:
+ print('length: "%u"' % (length))
+ # Skip empty line
+ line = f.readline()
+ if verbose:
+ print('empty: "%s"' % (line))
+ # Read JSON bytes
+ json_str = f.read(length)
+ if verbose:
+ print('json: "%s"' % (json_str))
+ if trace_file:
+ trace_file.write('from adaptor:\n%s\n' % (json_str))
+ # Decode the JSON bytes into a python dictionary
+ return json.loads(json_str)
+
+ return None
+
+
+def packet_type_is(packet, packet_type):
+ return 'type' in packet and packet['type'] == packet_type
+
+
+def read_packet_thread(vs_comm):
+ done = False
+ while not done:
+ packet = read_packet(vs_comm.recv, trace_file=vs_comm.trace_file)
+ if packet:
+ done = not vs_comm.handle_recv_packet(packet)
+ else:
+ done = True
+
+
+class DebugCommunication(object):
+
+ def __init__(self, recv, send):
+ self.trace_file = None
+ self.send = send
+ self.recv = recv
+ self.recv_packets = []
+ self.recv_condition = threading.Condition()
+ self.recv_thread = threading.Thread(target=read_packet_thread,
+ args=(self,))
+ self.process_event_body = None
+ self.exit_status = None
+ self.initialize_body = None
+ self.thread_stop_reasons = {}
+ self.sequence = 1
+ self.threads = None
+ self.recv_thread.start()
+ self.output_condition = threading.Condition()
+ self.output = {}
+ self.configuration_done_sent = False
+ self.frame_scopes = {}
+
+ @classmethod
+ def encode_content(cls, s):
+ return "Content-Length: %u\r\n\r\n%s" % (len(s), s)
+
+ @classmethod
+ def validate_response(cls, command, response):
+ if command['command'] != response['command']:
+ raise ValueError('command mismatch in response')
+ if command['seq'] != response['request_seq']:
+ raise ValueError('seq mismatch in response')
+
+ def get_output(self, category, timeout=0.0, clear=True):
+ self.output_condition.acquire()
+ output = None
+ if category in self.output:
+ output = self.output[category]
+ if clear:
+ del self.output[category]
+ elif timeout != 0.0:
+ self.output_condition.wait(timeout)
+ if category in self.output:
+ output = self.output[category]
+ if clear:
+ del self.output[category]
+ self.output_condition.release()
+ return output
+
+ def handle_recv_packet(self, packet):
+ '''Called by the read thread that is waiting for all incoming packets
+ to store the incoming packet in "self.recv_packets" in a thread safe
+ way. This function will then signal the "self.recv_condition" to
+ indicate a new packet is available. Returns True if the caller
+ should keep calling this function for more packets.
+ '''
+ # Check the packet to see if is an event packet
+ keepGoing = True
+ packet_type = packet['type']
+ if packet_type == 'event':
+ event = packet['event']
+ body = None
+ if 'body' in packet:
+ body = packet['body']
+ # Handle the event packet and cache information from these packets
+ # as they come in
+ if event == 'output':
+ # Store any output we receive so clients can retrieve it later.
+ category = body['category']
+ output = body['output']
+ self.output_condition.acquire()
+ if category in self.output:
+ self.output[category] += output
+ else:
+ self.output[category] = output
+ self.output_condition.notify()
+ self.output_condition.release()
+ # no need to add 'output' packets to our packets list
+ return keepGoing
+ elif event == 'process':
+ # When a new process is attached or launched, remember the
+ # details that are available in the body of the event
+ self.process_event_body = body
+ elif event == 'stopped':
+ # Each thread that stops with a reason will send a
+ # 'stopped' event. We need to remember the thread stop
+ # reasons since the 'threads' command doesn't return
+ # that information.
+ self._process_stopped()
+ tid = body['threadId']
+ self.thread_stop_reasons[tid] = body
+ elif packet_type == 'response':
+ if packet['command'] == 'disconnect':
+ keepGoing = False
+ self.recv_condition.acquire()
+ self.recv_packets.append(packet)
+ self.recv_condition.notify()
+ self.recv_condition.release()
+ return keepGoing
+
+ def send_packet(self, command_dict, set_sequence=True):
+ '''Take the "command_dict" python dictionary and encode it as a JSON
+ string and send the contents as a packet to the VSCode debug
+ adaptor'''
+ # Set the sequence ID for this command automatically
+ if set_sequence:
+ command_dict['seq'] = self.sequence
+ self.sequence += 1
+ # Encode our command dictionary as a JSON string
+ json_str = json.dumps(command_dict, separators=(',', ':'))
+ if self.trace_file:
+ self.trace_file.write('to adaptor:\n%s\n' % (json_str))
+ length = len(json_str)
+ if length > 0:
+ # Send the encoded JSON packet and flush the 'send' file
+ self.send.write(self.encode_content(json_str))
+ self.send.flush()
+
+ def recv_packet(self, filter_type=None, filter_event=None, timeout=None):
+ '''Get a JSON packet from the VSCode debug adaptor. This function
+ assumes a thread that reads packets is running and will deliver
+ any received packets by calling handle_recv_packet(...). This
+ function will wait for the packet to arrive and return it when
+ it does.'''
+ while True:
+ self.recv_condition.acquire()
+ packet = None
+ while True:
+ for (i, curr_packet) in enumerate(self.recv_packets):
+ packet_type = curr_packet['type']
+ if filter_type is None or packet_type in filter_type:
+ if (filter_event is None or
+ (packet_type == 'event' and
+ curr_packet['event'] in filter_event)):
+ packet = self.recv_packets.pop(i)
+ break
+ if packet:
+ break
+ # Sleep until packet is received
+ len_before = len(self.recv_packets)
+ self.recv_condition.wait(timeout)
+ len_after = len(self.recv_packets)
+ if len_before == len_after:
+ return None # Timed out
+ self.recv_condition.release()
+ return packet
+
+ return None
+
+ def send_recv(self, command):
+ '''Send a command python dictionary as JSON and receive the JSON
+ response. Validates that the response is the correct sequence and
+ command in the reply. Any events that are received are added to the
+ events list in this object'''
+ self.send_packet(command)
+ done = False
+ while not done:
+ response = self.recv_packet(filter_type='response')
+ if response is None:
+ desc = 'no response for "%s"' % (command['command'])
+ raise ValueError(desc)
+ self.validate_response(command, response)
+ return response
+ return None
+
+ def wait_for_event(self, filter=None, timeout=None):
+ while True:
+ return self.recv_packet(filter_type='event', filter_event=filter,
+ timeout=timeout)
+ return None
+
+ def wait_for_stopped(self, timeout=None):
+ stopped_events = []
+ stopped_event = self.wait_for_event(filter=['stopped', 'exited'],
+ timeout=timeout)
+ exited = False
+ while stopped_event:
+ stopped_events.append(stopped_event)
+ # If we exited, then we are done
+ if stopped_event['event'] == 'exited':
+ self.exit_status = stopped_event['body']['exitCode']
+ exited = True
+ break
+ # Otherwise we stopped and there might be one or more 'stopped'
+ # events for each thread that stopped with a reason, so keep
+ # checking for more 'stopped' events and return all of them
+ stopped_event = self.wait_for_event(filter='stopped', timeout=0.25)
+ if exited:
+ self.threads = []
+ return stopped_events
+
+ def wait_for_exited(self):
+ event_dict = self.wait_for_event('exited')
+ if event_dict is None:
+ raise ValueError("didn't get stopped event")
+ return event_dict
+
+ def get_initialize_value(self, key):
+ '''Get a value for the given key if it there is a key/value pair in
+ the "initialize" request response body.
+ '''
+ if self.initialize_body and key in self.initialize_body:
+ return self.initialize_body[key]
+ return None
+
+ def get_threads(self):
+ if self.threads is None:
+ self.request_threads()
+ return self.threads
+
+ def get_thread_id(self, threadIndex=0):
+ '''Utility function to get the first thread ID in the thread list.
+ If the thread list is empty, then fetch the threads.
+ '''
+ if self.threads is None:
+ self.request_threads()
+ if self.threads and threadIndex < len(self.threads):
+ return self.threads[threadIndex]['id']
+ return None
+
+ def get_stackFrame(self, frameIndex=0, threadId=None):
+ '''Get a single "StackFrame" object from a "stackTrace" request and
+ return the "StackFrame as a python dictionary, or None on failure
+ '''
+ if threadId is None:
+ threadId = self.get_thread_id()
+ if threadId is None:
+ print('invalid threadId')
+ return None
+ response = self.request_stackTrace(threadId, startFrame=frameIndex,
+ levels=1)
+ if response:
+ return response['body']['stackFrames'][0]
+ print('invalid response')
+ return None
+
+ def get_scope_variables(self, scope_name, frameIndex=0, threadId=None):
+ stackFrame = self.get_stackFrame(frameIndex=frameIndex,
+ threadId=threadId)
+ if stackFrame is None:
+ return []
+ frameId = stackFrame['id']
+ if frameId in self.frame_scopes:
+ frame_scopes = self.frame_scopes[frameId]
+ else:
+ scopes_response = self.request_scopes(frameId)
+ frame_scopes = scopes_response['body']['scopes']
+ self.frame_scopes[frameId] = frame_scopes
+ for scope in frame_scopes:
+ if scope['name'] == scope_name:
+ varRef = scope['variablesReference']
+ variables_response = self.request_variables(varRef)
+ if variables_response:
+ if 'body' in variables_response:
+ body = variables_response['body']
+ if 'variables' in body:
+ vars = body['variables']
+ return vars
+ return []
+
+ def get_global_variables(self, frameIndex=0, threadId=None):
+ return self.get_scope_variables('Globals', frameIndex=frameIndex,
+ threadId=threadId)
+
+ def get_local_variables(self, frameIndex=0, threadId=None):
+ return self.get_scope_variables('Locals', frameIndex=frameIndex,
+ threadId=threadId)
+
+ def get_local_variable(self, name, frameIndex=0, threadId=None):
+ locals = self.get_local_variables(frameIndex=frameIndex,
+ threadId=threadId)
+ for local in locals:
+ if 'name' in local and local['name'] == name:
+ return local
+ return None
+
+ def get_local_variable_value(self, name, frameIndex=0, threadId=None):
+ variable = self.get_local_variable(name, frameIndex=frameIndex,
+ threadId=threadId)
+ if variable and 'value' in variable:
+ return variable['value']
+ return None
+
+ def replay_packets(self, replay_file_path):
+ f = open(replay_file_path, 'r')
+ mode = 'invalid'
+ set_sequence = False
+ command_dict = None
+ while mode != 'eof':
+ if mode == 'invalid':
+ line = f.readline()
+ if line.startswith('to adapter:'):
+ mode = 'send'
+ elif line.startswith('from adapter:'):
+ mode = 'recv'
+ elif mode == 'send':
+ command_dict = read_packet(f)
+ # Skip the end of line that follows the JSON
+ f.readline()
+ if command_dict is None:
+ raise ValueError('decode packet failed from replay file')
+ print('Sending:')
+ pprint.PrettyPrinter(indent=2).pprint(command_dict)
+ # raw_input('Press ENTER to send:')
+ self.send_packet(command_dict, set_sequence)
+ mode = 'invalid'
+ elif mode == 'recv':
+ print('Replay response:')
+ replay_response = read_packet(f)
+ # Skip the end of line that follows the JSON
+ f.readline()
+ pprint.PrettyPrinter(indent=2).pprint(replay_response)
+ actual_response = self.recv_packet()
+ if actual_response:
+ type = actual_response['type']
+ print('Actual response:')
+ if type == 'response':
+ self.validate_response(command_dict, actual_response)
+ pprint.PrettyPrinter(indent=2).pprint(actual_response)
+ else:
+ print("error: didn't get a valid response")
+ mode = 'invalid'
+
+ def request_attach(self, program=None, pid=None, waitFor=None, trace=None,
+ initCommands=None, preRunCommands=None,
+ stopCommands=None, exitCommands=None,
+ attachCommands=None):
+ args_dict = {}
+ if pid is not None:
+ args_dict['pid'] = pid
+ if program is not None:
+ args_dict['program'] = program
+ if waitFor is not None:
+ args_dict['waitFor'] = waitFor
+ if trace:
+ args_dict['trace'] = trace
+ if initCommands:
+ args_dict['initCommands'] = initCommands
+ if preRunCommands:
+ args_dict['preRunCommands'] = preRunCommands
+ if stopCommands:
+ args_dict['stopCommands'] = stopCommands
+ if exitCommands:
+ args_dict['exitCommands'] = exitCommands
+ if attachCommands:
+ args_dict['attachCommands'] = attachCommands
+ command_dict = {
+ 'command': 'attach',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ return self.send_recv(command_dict)
+
+ def request_configurationDone(self):
+ command_dict = {
+ 'command': 'configurationDone',
+ 'type': 'request',
+ 'arguments': {}
+ }
+ response = self.send_recv(command_dict)
+ if response:
+ self.configuration_done_sent = True
+ return response
+
+ def _process_stopped(self):
+ self.threads = None
+ self.frame_scopes = {}
+
+ def request_continue(self, threadId=None):
+ if self.exit_status is not None:
+ raise ValueError('request_continue called after process exited')
+ # If we have launched or attached, then the first continue is done by
+ # sending the 'configurationDone' request
+ if not self.configuration_done_sent:
+ return self.request_configurationDone()
+ args_dict = {}
+ if threadId is None:
+ threadId = self.get_thread_id()
+ args_dict['threadId'] = threadId
+ command_dict = {
+ 'command': 'continue',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ response = self.send_recv(command_dict)
+ recv_packets = []
+ self.recv_condition.acquire()
+ for event in self.recv_packets:
+ if event['event'] != 'stopped':
+ recv_packets.append(event)
+ self.recv_packets = recv_packets
+ self.recv_condition.release()
+ return response
+
+ def request_disconnect(self, terminateDebuggee=None):
+ args_dict = {}
+ if terminateDebuggee is not None:
+ if terminateDebuggee:
+ args_dict['terminateDebuggee'] = True
+ else:
+ args_dict['terminateDebuggee'] = False
+ command_dict = {
+ 'command': 'disconnect',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ return self.send_recv(command_dict)
+
+ def request_evaluate(self, expression, frameIndex=0, threadId=None):
+ stackFrame = self.get_stackFrame(frameIndex=frameIndex,
+ threadId=threadId)
+ if stackFrame is None:
+ return []
+ args_dict = {
+ 'expression': expression,
+ 'frameId': stackFrame['id'],
+ }
+ command_dict = {
+ 'command': 'evaluate',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ return self.send_recv(command_dict)
+
+ def request_initialize(self):
+ command_dict = {
+ 'command': 'initialize',
+ 'type': 'request',
+ 'arguments': {
+ 'adapterID': 'lldb-native',
+ 'clientID': 'vscode',
+ 'columnsStartAt1': True,
+ 'linesStartAt1': True,
+ 'locale': 'en-us',
+ 'pathFormat': 'path',
+ 'supportsRunInTerminalRequest': True,
+ 'supportsVariablePaging': True,
+ 'supportsVariableType': True
+ }
+ }
+ response = self.send_recv(command_dict)
+ if response:
+ if 'body' in response:
+ self.initialize_body = response['body']
+ return response
+
+ def request_launch(self, program, args=None, cwd=None, env=None,
+ stopOnEntry=False, disableASLR=True,
+ disableSTDIO=False, shellExpandArguments=False,
+ trace=False, initCommands=None, preRunCommands=None,
+ stopCommands=None, exitCommands=None, sourcePath=None,
+ debuggerRoot=None):
+ args_dict = {
+ 'program': program
+ }
+ if args:
+ args_dict['args'] = args
+ if cwd:
+ args_dict['cwd'] = cwd
+ if env:
+ args_dict['env'] = env
+ if stopOnEntry:
+ args_dict['stopOnEntry'] = stopOnEntry
+ if disableASLR:
+ args_dict['disableASLR'] = disableASLR
+ if disableSTDIO:
+ args_dict['disableSTDIO'] = disableSTDIO
+ if shellExpandArguments:
+ args_dict['shellExpandArguments'] = shellExpandArguments
+ if trace:
+ args_dict['trace'] = trace
+ if initCommands:
+ args_dict['initCommands'] = initCommands
+ if preRunCommands:
+ args_dict['preRunCommands'] = preRunCommands
+ if stopCommands:
+ args_dict['stopCommands'] = stopCommands
+ if exitCommands:
+ args_dict['exitCommands'] = exitCommands
+ if sourcePath:
+ args_dict['sourcePath'] = sourcePath
+ if debuggerRoot:
+ args_dict['debuggerRoot'] = debuggerRoot
+ command_dict = {
+ 'command': 'launch',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ response = self.send_recv(command_dict)
+
+ # Wait for a 'process' and 'initialized' event in any order
+ self.wait_for_event(filter=['process', 'initialized'])
+ self.wait_for_event(filter=['process', 'initialized'])
+ return response
+
+ def request_next(self, threadId):
+ if self.exit_status is not None:
+ raise ValueError('request_continue called after process exited')
+ args_dict = {'threadId': threadId}
+ command_dict = {
+ 'command': 'next',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ return self.send_recv(command_dict)
+
+ def request_stepIn(self, threadId):
+ if self.exit_status is not None:
+ raise ValueError('request_continue called after process exited')
+ args_dict = {'threadId': threadId}
+ command_dict = {
+ 'command': 'stepIn',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ return self.send_recv(command_dict)
+
+ def request_stepOut(self, threadId):
+ if self.exit_status is not None:
+ raise ValueError('request_continue called after process exited')
+ args_dict = {'threadId': threadId}
+ command_dict = {
+ 'command': 'stepOut',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ return self.send_recv(command_dict)
+
+ def request_pause(self, threadId=None):
+ if self.exit_status is not None:
+ raise ValueError('request_continue called after process exited')
+ if threadId is None:
+ threadId = self.get_thread_id()
+ args_dict = {'threadId': threadId}
+ command_dict = {
+ 'command': 'pause',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ return self.send_recv(command_dict)
+
+ def request_scopes(self, frameId):
+ args_dict = {'frameId': frameId}
+ command_dict = {
+ 'command': 'scopes',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ return self.send_recv(command_dict)
+
+ def request_setBreakpoints(self, file_path, line_array, condition=None,
+ hitCondition=None):
+ (dir, base) = os.path.split(file_path)
+ breakpoints = []
+ for line in line_array:
+ bp = {'line': line}
+ if condition is not None:
+ bp['condition'] = condition
+ if hitCondition is not None:
+ bp['hitCondition'] = hitCondition
+ breakpoints.append(bp)
+ source_dict = {
+ 'name': base,
+ 'path': file_path
+ }
+ args_dict = {
+ 'source': source_dict,
+ 'breakpoints': breakpoints,
+ 'lines': '%s' % (line_array),
+ 'sourceModified': False,
+ }
+ command_dict = {
+ 'command': 'setBreakpoints',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ return self.send_recv(command_dict)
+
+ def request_setExceptionBreakpoints(self, filters):
+ args_dict = {'filters': filters}
+ command_dict = {
+ 'command': 'setExceptionBreakpoints',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ return self.send_recv(command_dict)
+
+ def request_setFunctionBreakpoints(self, names, condition=None,
+ hitCondition=None):
+ breakpoints = []
+ for name in names:
+ bp = {'name': name}
+ if condition is not None:
+ bp['condition'] = condition
+ if hitCondition is not None:
+ bp['hitCondition'] = hitCondition
+ breakpoints.append(bp)
+ args_dict = {'breakpoints': breakpoints}
+ command_dict = {
+ 'command': 'setFunctionBreakpoints',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ return self.send_recv(command_dict)
+
+ def request_stackTrace(self, threadId=None, startFrame=None, levels=None,
+ dump=False):
+ if threadId is None:
+ threadId = self.get_thread_id()
+ args_dict = {'threadId': threadId}
+ if startFrame is not None:
+ args_dict['startFrame'] = startFrame
+ if levels is not None:
+ args_dict['levels'] = levels
+ command_dict = {
+ 'command': 'stackTrace',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ response = self.send_recv(command_dict)
+ if dump:
+ for (idx, frame) in enumerate(response['body']['stackFrames']):
+ name = frame['name']
+ if 'line' in frame and 'source' in frame:
+ source = frame['source']
+ if 'sourceReference' not in source:
+ if 'name' in source:
+ source_name = source['name']
+ line = frame['line']
+ print("[%3u] %s @ %s:%u" % (idx, name, source_name,
+ line))
+ continue
+ print("[%3u] %s" % (idx, name))
+ return response
+
+ def request_threads(self):
+ '''Request a list of all threads and combine any information from any
+ "stopped" events since those contain more information about why a
+ thread actually stopped. Returns an array of thread dictionaries
+ with information about all threads'''
+ command_dict = {
+ 'command': 'threads',
+ 'type': 'request',
+ 'arguments': {}
+ }
+ response = self.send_recv(command_dict)
+ body = response['body']
+ # Fill in "self.threads" correctly so that clients that call
+ # self.get_threads() or self.get_thread_id(...) can get information
+ # on threads when the process is stopped.
+ if 'threads' in body:
+ self.threads = body['threads']
+ for thread in self.threads:
+ # Copy the thread dictionary so we can add key/value pairs to
+ # it without affecfting the original info from the "threads"
+ # command.
+ tid = thread['id']
+ if tid in self.thread_stop_reasons:
+ thread_stop_info = self.thread_stop_reasons[tid]
+ copy_keys = ['reason', 'description', 'text']
+ for key in copy_keys:
+ if key in thread_stop_info:
+ thread[key] = thread_stop_info[key]
+ else:
+ self.threads = None
+ return response
+
+ def request_variables(self, variablesReference, start=None, count=None):
+ args_dict = {'variablesReference': variablesReference}
+ if start is not None:
+ args_dict['start'] = start
+ if count is not None:
+ args_dict['count'] = count
+ command_dict = {
+ 'command': 'variables',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ return self.send_recv(command_dict)
+
+ def request_setVariable(self, containingVarRef, name, value, id=None):
+ args_dict = {
+ 'variablesReference': containingVarRef,
+ 'name': name,
+ 'value': str(value)
+ }
+ if id is not None:
+ args_dict['id'] = id
+ command_dict = {
+ 'command': 'setVariable',
+ 'type': 'request',
+ 'arguments': args_dict
+ }
+ return self.send_recv(command_dict)
+
+ def request_testGetTargetBreakpoints(self):
+ '''A request packet used in the LLDB test suite to get all currently
+ set breakpoint infos for all breakpoints currently set in the
+ target.
+ '''
+ command_dict = {
+ 'command': '_testGetTargetBreakpoints',
+ 'type': 'request',
+ 'arguments': {}
+ }
+ return self.send_recv(command_dict)
+
+ def terminate(self):
+ self.send.close()
+ # self.recv.close()
+
+
+class DebugAdaptor(DebugCommunication):
+ def __init__(self, executable=None, port=None):
+ self.process = None
+ if executable is not None:
+ self.process = subprocess.Popen([executable],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ DebugCommunication.__init__(self, self.process.stdout,
+ self.process.stdin)
+ elif port is not None:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(('127.0.0.1', port))
+ DebugCommunication.__init__(self, s.makefile('r'), s.makefile('w'))
+
+ def get_pid(self):
+ if self.process:
+ return self.process.pid
+ return -1
+
+ def terminate(self):
+ super(DebugAdaptor, self).terminate()
+ if self.process is not None:
+ self.process.terminate()
+ self.process.wait()
+ self.process = None
+
+
+def attach_options_specified(options):
+ if options.pid is not None:
+ return True
+ if options.waitFor:
+ return True
+ if options.attach:
+ return True
+ if options.attachCmds:
+ return True
+ return False
+
+
+def run_vscode(dbg, args, options):
+ dbg.request_initialize()
+ if attach_options_specified(options):
+ response = dbg.request_attach(program=options.program,
+ pid=options.pid,
+ waitFor=options.waitFor,
+ attachCommands=options.attachCmds,
+ initCommands=options.initCmds,
+ preRunCommands=options.preRunCmds,
+ stopCommands=options.stopCmds,
+ exitCommands=options.exitCmds)
+ else:
+ response = dbg.request_launch(options.program,
+ args=args,
+ env=options.envs,
+ cwd=options.workingDir,
+ debuggerRoot=options.debuggerRoot,
+ sourcePath=options.sourcePath,
+ initCommands=options.initCmds,
+ preRunCommands=options.preRunCmds,
+ stopCommands=options.stopCmds,
+ exitCommands=options.exitCmds)
+
+ if response['success']:
+ if options.sourceBreakpoints:
+ source_to_lines = {}
+ for file_line in options.sourceBreakpoints:
+ (path, line) = file_line.split(':')
+ if len(path) == 0 or len(line) == 0:
+ print('error: invalid source with line "%s"' %
+ (file_line))
+
+ else:
+ if path in source_to_lines:
+ source_to_lines[path].append(int(line))
+ else:
+ source_to_lines[path] = [int(line)]
+ for source in source_to_lines:
+ dbg.request_setBreakpoints(source, source_to_lines[source])
+ if options.funcBreakpoints:
+ dbg.request_setFunctionBreakpoints(options.funcBreakpoints)
+ dbg.request_configurationDone()
+ dbg.wait_for_stopped()
+ else:
+ if 'message' in response:
+ print(response['message'])
+ dbg.request_disconnect(terminateDebuggee=True)
+
+
+def main():
+ parser = optparse.OptionParser(
+ description=('A testing framework for the Visual Studio Code Debug '
+ 'Adaptor protocol'))
+
+ parser.add_option(
+ '--vscode',
+ type='string',
+ dest='vscode_path',
+ help=('The path to the command line program that implements the '
+ 'Visual Studio Code Debug Adaptor protocol.'),
+ default=None)
+
+ parser.add_option(
+ '--program',
+ type='string',
+ dest='program',
+ help='The path to the program to debug.',
+ default=None)
+
+ parser.add_option(
+ '--workingDir',
+ type='string',
+ dest='workingDir',
+ default=None,
+ help='Set the working directory for the process we launch.')
+
+ parser.add_option(
+ '--sourcePath',
+ type='string',
+ dest='sourcePath',
+ default=None,
+ help=('Set the relative source root for any debug info that has '
+ 'relative paths in it.'))
+
+ parser.add_option(
+ '--debuggerRoot',
+ type='string',
+ dest='debuggerRoot',
+ default=None,
+ help=('Set the working directory for lldb-vscode for any object files '
+ 'with relative paths in the Mach-o debug map.'))
+
+ parser.add_option(
+ '-r', '--replay',
+ type='string',
+ dest='replay',
+ help=('Specify a file containing a packet log to replay with the '
+ 'current Visual Studio Code Debug Adaptor executable.'),
+ default=None)
+
+ parser.add_option(
+ '-g', '--debug',
+ action='store_true',
+ dest='debug',
+ default=False,
+ help='Pause waiting for a debugger to attach to the debug adaptor')
+
+ parser.add_option(
+ '--port',
+ type='int',
+ dest='port',
+ help="Attach a socket to a port instead of using STDIN for VSCode",
+ default=None)
+
+ parser.add_option(
+ '--pid',
+ type='int',
+ dest='pid',
+ help="The process ID to attach to",
+ default=None)
+
+ parser.add_option(
+ '--attach',
+ action='store_true',
+ dest='attach',
+ default=False,
+ help=('Specify this option to attach to a process by name. The '
+ 'process name is the basanme of the executable specified with '
+ 'the --program option.'))
+
+ parser.add_option(
+ '-f', '--function-bp',
+ type='string',
+ action='append',
+ dest='funcBreakpoints',
+ help=('Specify the name of a function to break at. '
+ 'Can be specified more than once.'),
+ default=[])
+
+ parser.add_option(
+ '-s', '--source-bp',
+ type='string',
+ action='append',
+ dest='sourceBreakpoints',
+ default=[],
+ help=('Specify source breakpoints to set in the format of '
+ '<source>:<line>. '
+ 'Can be specified more than once.'))
+
+ parser.add_option(
+ '--attachCommand',
+ type='string',
+ action='append',
+ dest='attachCmds',
+ default=[],
+ help=('Specify a LLDB command that will attach to a process. '
+ 'Can be specified more than once.'))
+
+ parser.add_option(
+ '--initCommand',
+ type='string',
+ action='append',
+ dest='initCmds',
+ default=[],
+ help=('Specify a LLDB command that will be executed before the target '
+ 'is created. Can be specified more than once.'))
+
+ parser.add_option(
+ '--preRunCommand',
+ type='string',
+ action='append',
+ dest='preRunCmds',
+ default=[],
+ help=('Specify a LLDB command that will be executed after the target '
+ 'has been created. Can be specified more than once.'))
+
+ parser.add_option(
+ '--stopCommand',
+ type='string',
+ action='append',
+ dest='stopCmds',
+ default=[],
+ help=('Specify a LLDB command that will be executed each time the'
+ 'process stops. Can be specified more than once.'))
+
+ parser.add_option(
+ '--exitCommand',
+ type='string',
+ action='append',
+ dest='exitCmds',
+ default=[],
+ help=('Specify a LLDB command that will be executed when the process '
+ 'exits. Can be specified more than once.'))
+
+ parser.add_option(
+ '--env',
+ type='string',
+ action='append',
+ dest='envs',
+ default=[],
+ help=('Specify environment variables to pass to the launched '
+ 'process.'))
+
+ parser.add_option(
+ '--waitFor',
+ action='store_true',
+ dest='waitFor',
+ default=False,
+ help=('Wait for the next process to be launched whose name matches '
+ 'the basename of the program specified with the --program '
+ 'option'))
+
+ (options, args) = parser.parse_args(sys.argv[1:])
+
+ if options.vscode_path is None and options.port is None:
+ print('error: must either specify a path to a Visual Studio Code '
+ 'Debug Adaptor vscode executable path using the --vscode '
+ 'option, or a port to attach to for an existing lldb-vscode '
+ 'using the --port option')
+ return
+ dbg = DebugAdaptor(executable=options.vscode_path, port=options.port)
+ if options.debug:
+ raw_input('Waiting for debugger to attach pid "%i"' % (
+ dbg.get_pid()))
+ if options.replay:
+ dbg.replay_packets(options.replay)
+ else:
+ run_vscode(dbg, args, options)
+ dbg.terminate()
+
+
+if __name__ == '__main__':
+ main()