summaryrefslogtreecommitdiffstats
path: root/lib/python/qmk/cli
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python/qmk/cli')
-rw-r--r--lib/python/qmk/cli/__init__.py72
-rw-r--r--lib/python/qmk/cli/config.py116
-rw-r--r--lib/python/qmk/cli/console.py302
3 files changed, 345 insertions, 145 deletions
diff --git a/lib/python/qmk/cli/__init__.py b/lib/python/qmk/cli/__init__.py
index 6fe769fe7b..02b721f342 100644
--- a/lib/python/qmk/cli/__init__.py
+++ b/lib/python/qmk/cli/__init__.py
@@ -12,6 +12,20 @@ from subprocess import run
from milc import cli, __VERSION__
from milc.questions import yesno
+import_names = {
+ # A mapping of package name to importable name
+ 'pep8-naming': 'pep8ext_naming',
+ 'pyusb': 'usb.core',
+}
+
+safe_commands = [
+ # A list of subcommands we always run, even when the module imports fail
+ 'clone',
+ 'config',
+ 'env',
+ 'setup',
+]
+
def _run_cmd(*command):
"""Run a command in a subshell.
@@ -50,8 +64,8 @@ def _find_broken_requirements(requirements):
module_import = module_name.replace('-', '_')
# Not every module is importable by its own name.
- if module_name == "pep8-naming":
- module_import = "pep8ext_naming"
+ if module_name in import_names:
+ module_import = import_names[module_name]
if not find_spec(module_import):
broken_modules.append(module_name)
@@ -107,32 +121,31 @@ if int(milc_version[0]) < 2 and int(milc_version[1]) < 3:
# Check to make sure we have all our dependencies
msg_install = 'Please run `python3 -m pip install -r %s` to install required python dependencies.'
-
-if _broken_module_imports('requirements.txt'):
- if yesno('Would you like to install the required Python modules?'):
- _run_cmd(sys.executable, '-m', 'pip', 'install', '-r', 'requirements.txt')
- else:
- print()
- print(msg_install % (str(Path('requirements.txt').resolve()),))
- print()
- exit(1)
-
-if cli.config.user.developer:
- args = sys.argv[1:]
- while args and args[0][0] == '-':
- del args[0]
- if not args or args[0] != 'config':
- if _broken_module_imports('requirements-dev.txt'):
- if yesno('Would you like to install the required developer Python modules?'):
- _run_cmd(sys.executable, '-m', 'pip', 'install', '-r', 'requirements-dev.txt')
- elif yesno('Would you like to disable developer mode?'):
- _run_cmd(sys.argv[0], 'config', 'user.developer=None')
- else:
- print()
- print(msg_install % (str(Path('requirements-dev.txt').resolve()),))
- print('You can also turn off developer mode: qmk config user.developer=None')
- print()
- exit(1)
+args = sys.argv[1:]
+while args and args[0][0] == '-':
+ del args[0]
+
+if not args or args[0] not in safe_commands:
+ if _broken_module_imports('requirements.txt'):
+ if yesno('Would you like to install the required Python modules?'):
+ _run_cmd(sys.executable, '-m', 'pip', 'install', '-r', 'requirements.txt')
+ else:
+ print()
+ print(msg_install % (str(Path('requirements.txt').resolve()),))
+ print()
+ exit(1)
+
+ if cli.config.user.developer and _broken_module_imports('requirements-dev.txt'):
+ if yesno('Would you like to install the required developer Python modules?'):
+ _run_cmd(sys.executable, '-m', 'pip', 'install', '-r', 'requirements-dev.txt')
+ elif yesno('Would you like to disable developer mode?'):
+ _run_cmd(sys.argv[0], 'config', 'user.developer=None')
+ else:
+ print()
+ print(msg_install % (str(Path('requirements-dev.txt').resolve()),))
+ print('You can also turn off developer mode: qmk config user.developer=None')
+ print()
+ exit(1)
# Import our subcommands
from . import c2json # noqa
@@ -140,7 +153,8 @@ from . import cformat # noqa
from . import chibios # noqa
from . import clean # noqa
from . import compile # noqa
-from . import config # noqa
+from milc.subcommand import config # noqa
+from . import console # noqa
from . import docs # noqa
from . import doctor # noqa
from . import fileformat # noqa
diff --git a/lib/python/qmk/cli/config.py b/lib/python/qmk/cli/config.py
deleted file mode 100644
index e17d8bb9ba..0000000000
--- a/lib/python/qmk/cli/config.py
+++ /dev/null
@@ -1,116 +0,0 @@
-"""Read and write configuration settings
-"""
-from milc import cli
-
-
-def print_config(section, key):
- """Print a single config setting to stdout.
- """
- cli.echo('%s.%s{fg_cyan}={fg_reset}%s', section, key, cli.config[section][key])
-
-
-def show_config():
- """Print the current configuration to stdout.
- """
- for section in cli.config:
- for key in cli.config[section]:
- print_config(section, key)
-
-
-def parse_config_token(config_token):
- """Split a user-supplied configuration-token into its components.
- """
- section = option = value = None
-
- if '=' in config_token and '.' not in config_token:
- cli.log.error('Invalid configuration token, the key must be of the form <section>.<option>: %s', config_token)
- return section, option, value
-
- # Separate the key (<section>.<option>) from the value
- if '=' in config_token:
- key, value = config_token.split('=')
- else:
- key = config_token
-
- # Extract the section and option from the key
- if '.' in key:
- section, option = key.split('.', 1)
- else:
- section = key
-
- return section, option, value
-
-
-def set_config(section, option, value):
- """Set a config key in the running config.
- """
- log_string = '%s.%s{fg_cyan}:{fg_reset} %s {fg_cyan}->{fg_reset} %s'
- if cli.args.read_only:
- log_string += ' {fg_red}(change not written)'
-
- cli.echo(log_string, section, option, cli.config[section][option], value)
-
- if not cli.args.read_only:
- if value == 'None':
- del cli.config[section][option]
- else:
- cli.config[section][option] = value
-
-
-@cli.argument('-ro', '--read-only', arg_only=True, action='store_true', help='Operate in read-only mode.')
-@cli.argument('configs', nargs='*', arg_only=True, help='Configuration options to read or write.')
-@cli.subcommand("Read and write configuration settings.")
-def config(cli):
- """Read and write config settings.
-
- This script iterates over the config_tokens supplied as argument. Each config_token has the following form:
-
- section[.key][=value]
-
- If only a section (EG 'compile') is supplied all keys for that section will be displayed.
-
- If section.key is supplied the value for that single key will be displayed.
-
- If section.key=value is supplied the value for that single key will be set.
-
- If section.key=None is supplied the key will be deleted.
-
- No validation is done to ensure that the supplied section.key is actually used by qmk scripts.
- """
- if not cli.args.configs:
- return show_config()
-
- # Process config_tokens
- save_config = False
-
- for argument in cli.args.configs:
- # Split on space in case they quoted multiple config tokens
- for config_token in argument.split(' '):
- section, option, value = parse_config_token(config_token)
-
- # Validation
- if option and '.' in option:
- cli.log.error('Config keys may not have more than one period! "%s" is not valid.', config_token)
- return False
-
- # Do what the user wants
- if section and option and value:
- # Write a configuration option
- set_config(section, option, value)
- if not cli.args.read_only:
- save_config = True
-
- elif section and option:
- # Display a single key
- print_config(section, option)
-
- elif section:
- # Display an entire section
- for key in cli.config[section]:
- print_config(section, key)
-
- # Ending actions
- if save_config:
- cli.save_config()
-
- return True
diff --git a/lib/python/qmk/cli/console.py b/lib/python/qmk/cli/console.py
new file mode 100644
index 0000000000..45ff0c8bee
--- /dev/null
+++ b/lib/python/qmk/cli/console.py
@@ -0,0 +1,302 @@
+"""Acquire debugging information from usb hid devices
+
+cli implementation of https://www.pjrc.com/teensy/hid_listen.html
+"""
+from pathlib import Path
+from threading import Thread
+from time import sleep, strftime
+
+import hid
+import usb.core
+
+from milc import cli
+
+LOG_COLOR = {
+ 'next': 0,
+ 'colors': [
+ '{fg_blue}',
+ '{fg_cyan}',
+ '{fg_green}',
+ '{fg_magenta}',
+ '{fg_red}',
+ '{fg_yellow}',
+ ],
+}
+
+KNOWN_BOOTLOADERS = {
+ # VID , PID
+ ('03EB', '2FEF'): 'atmel-dfu: ATmega16U2',
+ ('03EB', '2FF0'): 'atmel-dfu: ATmega32U2',
+ ('03EB', '2FF3'): 'atmel-dfu: ATmega16U4',
+ ('03EB', '2FF4'): 'atmel-dfu: ATmega32U4',
+ ('03EB', '2FF9'): 'atmel-dfu: AT90USB64',
+ ('03EB', '2FFA'): 'atmel-dfu: AT90USB162',
+ ('03EB', '2FFB'): 'atmel-dfu: AT90USB128',
+ ('03EB', '6124'): 'Microchip SAM-BA',
+ ('0483', 'DF11'): 'stm32-dfu: STM32 BOOTLOADER',
+ ('16C0', '05DC'): 'USBasp: USBaspLoader',
+ ('16C0', '05DF'): 'bootloadHID: HIDBoot',
+ ('16C0', '0478'): 'halfkay: Teensy Halfkay',
+ ('1B4F', '9203'): 'caterina: Pro Micro 3.3V',
+ ('1B4F', '9205'): 'caterina: Pro Micro 5V',
+ ('1B4F', '9207'): 'caterina: LilyPadUSB',
+ ('1C11', 'B007'): 'kiibohd: Kiibohd DFU Bootloader',
+ ('1EAF', '0003'): 'stm32duino: Maple 003',
+ ('1FFB', '0101'): 'caterina: Polou A-Star 32U4 Bootloader',
+ ('2341', '0036'): 'caterina: Arduino Leonardo',
+ ('2341', '0037'): 'caterina: Arduino Micro',
+ ('239A', '000C'): 'caterina: Adafruit Feather 32U4',
+ ('239A', '000D'): 'caterina: Adafruit ItsyBitsy 32U4 3v',
+ ('239A', '000E'): 'caterina: Adafruit ItsyBitsy 32U4 5v',
+ ('239A', '000E'): 'caterina: Adafruit ItsyBitsy 32U4 5v',
+ ('2A03', '0036'): 'caterina: Arduino Leonardo',
+ ('2A03', '0037'): 'caterina: Arduino Micro',
+ ('314B', '0106'): 'apm32-dfu: APM32 DFU ISP Mode'
+}
+
+
+class MonitorDevice(object):
+ def __init__(self, hid_device, numeric):
+ self.hid_device = hid_device
+ self.numeric = numeric
+ self.device = hid.Device(path=hid_device['path'])
+ self.current_line = ''
+
+ cli.log.info('Console Connected: %(color)s%(manufacturer_string)s %(product_string)s{style_reset_all} (%(color)s%(vendor_id)04X:%(product_id)04X:%(index)d{style_reset_all})', hid_device)
+
+ def read(self, size, encoding='ascii', timeout=1):
+ """Read size bytes from the device.
+ """
+ return self.device.read(size, timeout).decode(encoding)
+
+ def read_line(self):
+ """Read from the device's console until we get a \n.
+ """
+ while '\n' not in self.current_line:
+ self.current_line += self.read(32).replace('\x00', '')
+
+ lines = self.current_line.split('\n', 1)
+ self.current_line = lines[1]
+
+ return lines[0]
+
+ def run_forever(self):
+ while True:
+ try:
+ message = {**self.hid_device, 'text': self.read_line()}
+ identifier = (int2hex(message['vendor_id']), int2hex(message['product_id'])) if self.numeric else (message['manufacturer_string'], message['product_string'])
+ message['identifier'] = ':'.join(identifier)
+ message['ts'] = '{style_dim}{fg_green}%s{style_reset_all} ' % (strftime(cli.config.general.datetime_fmt),) if cli.args.timestamp else ''
+
+ cli.echo('%(ts)s%(color)s%(identifier)s:%(index)d{style_reset_all}: %(text)s' % message)
+
+ except hid.HIDException:
+ break
+
+
+class FindDevices(object):
+ def __init__(self, vid, pid, index, numeric):
+ self.vid = vid
+ self.pid = pid
+ self.index = index
+ self.numeric = numeric
+
+ def run_forever(self):
+ """Process messages from our queue in a loop.
+ """
+ live_devices = {}
+ live_bootloaders = {}
+
+ while True:
+ try:
+ for device in list(live_devices):
+ if not live_devices[device]['thread'].is_alive():
+ cli.log.info('Console Disconnected: %(color)s%(manufacturer_string)s %(product_string)s{style_reset_all} (%(color)s%(vendor_id)04X:%(product_id)04X:%(index)d{style_reset_all})', live_devices[device])
+ del live_devices[device]
+
+ for device in self.find_devices():
+ if device['path'] not in live_devices:
+ device['color'] = LOG_COLOR['colors'][LOG_COLOR['next']]
+ LOG_COLOR['next'] = (LOG_COLOR['next'] + 1) % len(LOG_COLOR['colors'])
+ live_devices[device['path']] = device
+
+ try:
+ monitor = MonitorDevice(device, self.numeric)
+ device['thread'] = Thread(target=monitor.run_forever, daemon=True)
+
+ device['thread'].start()
+ except Exception as e:
+ device['e'] = e
+ device['e_name'] = e.__class__.__name__
+ cli.log.error("Could not connect to %(color)s%(manufacturer_string)s %(product_string)s{style_reset_all} (%(color)s:%(vendor_id)04X:%(product_id)04X:%(index)d): %(e_name)s: %(e)s", device)
+ if cli.config.general.verbose:
+ cli.log.exception(e)
+ del live_devices[device['path']]
+
+ if cli.args.bootloaders:
+ for device in self.find_bootloaders():
+ if device.address in live_bootloaders:
+ live_bootloaders[device.address]._qmk_found = True
+ else:
+ name = KNOWN_BOOTLOADERS[(int2hex(device.idVendor), int2hex(device.idProduct))]
+ cli.log.info('Bootloader Connected: {style_bright}{fg_magenta}%s', name)
+ device._qmk_found = True
+ live_bootloaders[device.address] = device
+
+ for device in list(live_bootloaders):
+ if live_bootloaders[device]._qmk_found:
+ live_bootloaders[device]._qmk_found = False
+ else:
+ name = KNOWN_BOOTLOADERS[(int2hex(live_bootloaders[device].idVendor), int2hex(live_bootloaders[device].idProduct))]
+ cli.log.info('Bootloader Disconnected: {style_bright}{fg_magenta}%s', name)
+ del live_bootloaders[device]
+
+ sleep(.1)
+
+ except KeyboardInterrupt:
+ break
+
+ def is_bootloader(self, hid_device):
+ """Returns true if the device in question matches a known bootloader vid/pid.
+ """
+ return (int2hex(hid_device.idVendor), int2hex(hid_device.idProduct)) in KNOWN_BOOTLOADERS
+
+ def is_console_hid(self, hid_device):
+ """Returns true when the usage page indicates it's a teensy-style console.
+ """
+ return hid_device['usage_page'] == 0xFF31 and hid_device['usage'] == 0x0074
+
+ def is_filtered_device(self, hid_device):
+ """Returns True if the device should be included in the list of available consoles.
+ """
+ return int2hex(hid_device['vendor_id']) == self.vid and int2hex(hid_device['product_id']) == self.pid
+
+ def find_devices_by_report(self, hid_devices):
+ """Returns a list of available teensy-style consoles by doing a brute-force search.
+
+ Some versions of linux don't report usage and usage_page. In that case we fallback to reading the report (possibly inaccurately) ourselves.
+ """
+ devices = []
+
+ for device in hid_devices:
+ path = device['path'].decode('utf-8')
+
+ if path.startswith('/dev/hidraw'):
+ number = path[11:]
+ report = Path(f'/sys/class/hidraw/hidraw{number}/device/report_descriptor')
+
+ if report.exists():
+ rp = report.read_bytes()
+
+ if rp[1] == 0x31 and rp[3] == 0x09:
+ devices.append(device)
+
+ return devices
+
+ def find_bootloaders(self):
+ """Returns a list of available bootloader devices.
+ """
+ return list(filter(self.is_bootloader, usb.core.find(find_all=True)))
+
+ def find_devices(self):
+ """Returns a list of available teensy-style consoles.
+ """
+ hid_devices = hid.enumerate()
+ devices = list(filter(self.is_console_hid, hid_devices))
+
+ if not devices:
+ devices = self.find_devices_by_report(hid_devices)
+
+ if self.vid and self.pid:
+ devices = list(filter(self.is_filtered_device, devices))
+
+ # Add index numbers
+ device_index = {}
+ for device in devices:
+ id = ':'.join((int2hex(device['vendor_id']), int2hex(device['product_id'])))
+
+ if id not in device_index:
+ device_index[id] = 0
+
+ device_index[id] += 1
+ device['index'] = device_index[id]
+
+ return devices
+
+
+def int2hex(number):
+ """Returns a string representation of the number as hex.
+ """
+ return "%04X" % number
+
+
+def list_devices(device_finder):
+ """Show the user a nicely formatted list of devices.
+ """
+ devices = device_finder.find_devices()
+
+ if devices:
+ cli.log.info('Available devices:')
+ for dev in devices:
+ color = LOG_COLOR['colors'][LOG_COLOR['next']]
+ LOG_COLOR['next'] = (LOG_COLOR['next'] + 1) % len(LOG_COLOR['colors'])
+ cli.log.info("\t%s%s:%s:%d{style_reset_all}\t%s %s", color, int2hex(dev['vendor_id']), int2hex(dev['product_id']), dev['index'], dev['manufacturer_string'], dev['product_string'])
+
+ if cli.args.bootloaders:
+ bootloaders = device_finder.find_bootloaders()
+
+ if bootloaders:
+ cli.log.info('Available Bootloaders:')
+
+ for dev in bootloaders:
+ cli.log.info("\t%s:%s\t%s", int2hex(dev.idVendor), int2hex(dev.idProduct), KNOWN_BOOTLOADERS[(int2hex(dev.idVendor), int2hex(dev.idProduct))])
+
+
+@cli.argument('--bootloaders', arg_only=True, default=True, action='store_boolean', help='displaying bootloaders.')
+@cli.argument('-d', '--device', help='Device to select - uses format <pid>:<vid>[:<index>].')
+@cli.argument('-l', '--list', arg_only=True, action='store_true', help='List available hid_listen devices.')
+@cli.argument('-n', '--numeric', arg_only=True, action='store_true', help='Show VID/PID instead of names.')
+@cli.argument('-t', '--timestamp', arg_only=True, action='store_true', help='Print the timestamp for received messages as well.')
+@cli.argument('-w', '--wait', type=int, default=1, help="How many seconds to wait between checks (Default: 1)")
+@cli.subcommand('Acquire debugging information from usb hid devices.', hidden=False if cli.config.user.developer else True)
+def console(cli):
+ """Acquire debugging information from usb hid devices
+ """
+ vid = None
+ pid = None
+ index = 1
+
+ if cli.config.console.device:
+ device = cli.config.console.device.split(':')
+
+ if len(device) == 2:
+ vid, pid = device
+
+ elif len(device) == 3:
+ vid, pid, index = device
+
+ if not index.isdigit():
+ cli.log.error('Device index must be a number! Got "%s" instead.', index)
+ exit(1)
+
+ index = int(index)
+
+ if index < 1:
+ cli.log.error('Device index must be greater than 0! Got %s', index)
+ exit(1)
+
+ else:
+ cli.log.error('Invalid format for device, expected "<pid>:<vid>[:<index>]" but got "%s".', cli.config.console.device)
+ cli.print_help()
+ exit(1)
+
+ vid = vid.upper()
+ pid = pid.upper()
+
+ device_finder = FindDevices(vid, pid, index, cli.args.numeric)
+
+ if cli.args.list:
+ return list_devices(device_finder)
+
+ print('Looking for devices...', flush=True)
+ device_finder.run_forever()