diff --git a/src/assets/plugins/autopep8.py b/src/assets/plugins/autopep8.py new file mode 100644 index 0000000..1e9218e --- /dev/null +++ b/src/assets/plugins/autopep8.py @@ -0,0 +1,4196 @@ +#!/usr/bin/env python + +# Copyright (C) 2010-2011 Hideo Hattori +# Copyright (C) 2011-2013 Hideo Hattori, Steven Myint +# Copyright (C) 2013-2016 Hideo Hattori, Steven Myint, Bill Wendling +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Automatically formats Python code to conform to the PEP 8 style guide. + +Fixes that only need be done once can be added by adding a function of the form +"fix_(source)" to this module. They should return the fixed source code. +These fixes are picked up by apply_global_fixes(). + +Fixes that depend on pycodestyle should be added as methods to FixPEP8. See the +class documentation for more information. + +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import argparse +import codecs +import collections +import copy +import difflib +import fnmatch +import inspect +import io +import keyword +import locale +import os +import re +import signal +import sys +import textwrap +import token +import tokenize + +import pycodestyle + + +try: + unicode +except NameError: + unicode = str + + +__version__ = '1.4.3' + + +CR = '\r' +LF = '\n' +CRLF = '\r\n' + + +PYTHON_SHEBANG_REGEX = re.compile(r'^#!.*\bpython[23]?\b\s*$') +LAMBDA_REGEX = re.compile(r'([\w.]+)\s=\slambda\s*([\(\)=\w,\s.]*):') +COMPARE_NEGATIVE_REGEX = re.compile(r'\b(not)\s+([^][)(}{]+?)\s+(in|is)\s') +COMPARE_NEGATIVE_REGEX_THROUGH = re.compile(r'\b(not\s+in|is\s+not)\s') +BARE_EXCEPT_REGEX = re.compile(r'except\s*:') +STARTSWITH_DEF_REGEX = re.compile(r'^(async\s+def|def)\s.*\):') + +EXIT_CODE_OK = 0 +EXIT_CODE_ERROR = 1 +EXIT_CODE_EXISTS_DIFF = 2 + +# For generating line shortening candidates. +SHORTEN_OPERATOR_GROUPS = frozenset([ + frozenset([',']), + frozenset(['%']), + frozenset([',', '(', '[', '{']), + frozenset(['%', '(', '[', '{']), + frozenset([',', '(', '[', '{', '%', '+', '-', '*', '/', '//']), + frozenset(['%', '+', '-', '*', '/', '//']), +]) + + +DEFAULT_IGNORE = 'E226,E24,W50,W690' # TODO: use pycodestyle.DEFAULT_IGNORE +DEFAULT_INDENT_SIZE = 4 + +SELECTED_GLOBAL_FIXED_METHOD_CODES = ['W602', ] + +# W602 is handled separately due to the need to avoid "with_traceback". +CODE_TO_2TO3 = { + 'E231': ['ws_comma'], + 'E721': ['idioms'], + 'W601': ['has_key'], + 'W603': ['ne'], + 'W604': ['repr'], + 'W690': ['apply', + 'except', + 'exitfunc', + 'numliterals', + 'operator', + 'paren', + 'reduce', + 'renames', + 'standarderror', + 'sys_exc', + 'throw', + 'tuple_params', + 'xreadlines']} + + +if sys.platform == 'win32': # pragma: no cover + DEFAULT_CONFIG = os.path.expanduser(r'~\.pycodestyle') +else: + DEFAULT_CONFIG = os.path.join(os.getenv('XDG_CONFIG_HOME') or + os.path.expanduser('~/.config'), + 'pycodestyle') +# fallback, use .pep8 +if not os.path.exists(DEFAULT_CONFIG): # pragma: no cover + if sys.platform == 'win32': + DEFAULT_CONFIG = os.path.expanduser(r'~\.pep8') + else: + DEFAULT_CONFIG = os.path.join(os.path.expanduser('~/.config'), 'pep8') +PROJECT_CONFIG = ('setup.cfg', 'tox.ini', '.pep8', '.flake8') + + +MAX_PYTHON_FILE_DETECTION_BYTES = 1024 + + +def open_with_encoding(filename, + encoding=None, mode='r', limit_byte_check=-1): + """Return opened file with a specific encoding.""" + if not encoding: + encoding = detect_encoding(filename, limit_byte_check=limit_byte_check) + + return io.open(filename, mode=mode, encoding=encoding, + newline='') # Preserve line endings + + +def detect_encoding(filename, limit_byte_check=-1): + """Return file encoding.""" + try: + with open(filename, 'rb') as input_file: + from lib2to3.pgen2 import tokenize as lib2to3_tokenize + encoding = lib2to3_tokenize.detect_encoding(input_file.readline)[0] + + with open_with_encoding(filename, encoding) as test_file: + test_file.read(limit_byte_check) + + return encoding + except (LookupError, SyntaxError, UnicodeDecodeError): + return 'latin-1' + + +def readlines_from_file(filename): + """Return contents of file.""" + with open_with_encoding(filename) as input_file: + return input_file.readlines() + + +def extended_blank_lines(logical_line, + blank_lines, + blank_before, + indent_level, + previous_logical): + """Check for missing blank lines after class declaration.""" + if previous_logical.startswith('def '): + if blank_lines and pycodestyle.DOCSTRING_REGEX.match(logical_line): + yield (0, 'E303 too many blank lines ({})'.format(blank_lines)) + elif pycodestyle.DOCSTRING_REGEX.match(previous_logical): + # Missing blank line between class docstring and method declaration. + if ( + indent_level and + not blank_lines and + not blank_before and + logical_line.startswith(('def ')) and + '(self' in logical_line + ): + yield (0, 'E301 expected 1 blank line, found 0') + + +pycodestyle.register_check(extended_blank_lines) + + +def continued_indentation(logical_line, tokens, indent_level, hang_closing, + indent_char, noqa): + """Override pycodestyle's function to provide indentation information.""" + first_row = tokens[0][2][0] + nrows = 1 + tokens[-1][2][0] - first_row + if noqa or nrows == 1: + return + + # indent_next tells us whether the next block is indented. Assuming + # that it is indented by 4 spaces, then we should not allow 4-space + # indents on the final continuation line. In turn, some other + # indents are allowed to have an extra 4 spaces. + indent_next = logical_line.endswith(':') + + row = depth = 0 + valid_hangs = ( + (DEFAULT_INDENT_SIZE,) + if indent_char != '\t' else (DEFAULT_INDENT_SIZE, + 2 * DEFAULT_INDENT_SIZE) + ) + + # Remember how many brackets were opened on each line. + parens = [0] * nrows + + # Relative indents of physical lines. + rel_indent = [0] * nrows + + # For each depth, collect a list of opening rows. + open_rows = [[0]] + # For each depth, memorize the hanging indentation. + hangs = [None] + + # Visual indents. + indent_chances = {} + last_indent = tokens[0][2] + indent = [last_indent[1]] + + last_token_multiline = None + line = None + last_line = '' + last_line_begins_with_multiline = False + for token_type, text, start, end, line in tokens: + + newline = row < start[0] - first_row + if newline: + row = start[0] - first_row + newline = (not last_token_multiline and + token_type not in (tokenize.NL, tokenize.NEWLINE)) + last_line_begins_with_multiline = last_token_multiline + + if newline: + # This is the beginning of a continuation line. + last_indent = start + + # Record the initial indent. + rel_indent[row] = pycodestyle.expand_indent(line) - indent_level + + # Identify closing bracket. + close_bracket = (token_type == tokenize.OP and text in ']})') + + # Is the indent relative to an opening bracket line? + for open_row in reversed(open_rows[depth]): + hang = rel_indent[row] - rel_indent[open_row] + hanging_indent = hang in valid_hangs + if hanging_indent: + break + if hangs[depth]: + hanging_indent = (hang == hangs[depth]) + + visual_indent = (not close_bracket and hang > 0 and + indent_chances.get(start[1])) + + if close_bracket and indent[depth]: + # Closing bracket for visual indent. + if start[1] != indent[depth]: + yield (start, 'E124 {}'.format(indent[depth])) + elif close_bracket and not hang: + # closing bracket matches indentation of opening bracket's line + if hang_closing: + yield (start, 'E133 {}'.format(indent[depth])) + elif indent[depth] and start[1] < indent[depth]: + # Visual indent is broken. + yield (start, 'E128 {}'.format(indent[depth])) + elif (hanging_indent or + (indent_next and + rel_indent[row] == 2 * DEFAULT_INDENT_SIZE)): + # Hanging indent is verified. + if close_bracket and not hang_closing: + yield (start, 'E123 {}'.format(indent_level + + rel_indent[open_row])) + hangs[depth] = hang + elif visual_indent is True: + # Visual indent is verified. + indent[depth] = start[1] + elif visual_indent in (text, unicode): + # Ignore token lined up with matching one from a previous line. + pass + else: + one_indented = (indent_level + rel_indent[open_row] + + DEFAULT_INDENT_SIZE) + # Indent is broken. + if hang <= 0: + error = ('E122', one_indented) + elif indent[depth]: + error = ('E127', indent[depth]) + elif not close_bracket and hangs[depth]: + error = ('E131', one_indented) + elif hang > DEFAULT_INDENT_SIZE: + error = ('E126', one_indented) + else: + hangs[depth] = hang + error = ('E121', one_indented) + + yield (start, '{} {}'.format(*error)) + + # Look for visual indenting. + if ( + parens[row] and + token_type not in (tokenize.NL, tokenize.COMMENT) and + not indent[depth] + ): + indent[depth] = start[1] + indent_chances[start[1]] = True + # Deal with implicit string concatenation. + elif (token_type in (tokenize.STRING, tokenize.COMMENT) or + text in ('u', 'ur', 'b', 'br')): + indent_chances[start[1]] = unicode + # Special case for the "if" statement because len("if (") is equal to + # 4. + elif not indent_chances and not row and not depth and text == 'if': + indent_chances[end[1] + 1] = True + elif text == ':' and line[end[1]:].isspace(): + open_rows[depth].append(row) + + # Keep track of bracket depth. + if token_type == tokenize.OP: + if text in '([{': + depth += 1 + indent.append(0) + hangs.append(None) + if len(open_rows) == depth: + open_rows.append([]) + open_rows[depth].append(row) + parens[row] += 1 + elif text in ')]}' and depth > 0: + # Parent indents should not be more than this one. + prev_indent = indent.pop() or last_indent[1] + hangs.pop() + for d in range(depth): + if indent[d] > prev_indent: + indent[d] = 0 + for ind in list(indent_chances): + if ind >= prev_indent: + del indent_chances[ind] + del open_rows[depth + 1:] + depth -= 1 + if depth: + indent_chances[indent[depth]] = True + for idx in range(row, -1, -1): + if parens[idx]: + parens[idx] -= 1 + break + assert len(indent) == depth + 1 + if ( + start[1] not in indent_chances and + # This is for purposes of speeding up E121 (GitHub #90). + not last_line.rstrip().endswith(',') + ): + # Allow to line up tokens. + indent_chances[start[1]] = text + + last_token_multiline = (start[0] != end[0]) + if last_token_multiline: + rel_indent[end[0] - first_row] = rel_indent[row] + + last_line = line + + if ( + indent_next and + not last_line_begins_with_multiline and + pycodestyle.expand_indent(line) == indent_level + DEFAULT_INDENT_SIZE + ): + pos = (start[0], indent[0] + 4) + desired_indent = indent_level + 2 * DEFAULT_INDENT_SIZE + if visual_indent: + yield (pos, 'E129 {}'.format(desired_indent)) + else: + yield (pos, 'E125 {}'.format(desired_indent)) + + +del pycodestyle._checks['logical_line'][pycodestyle.continued_indentation] +pycodestyle.register_check(continued_indentation) + + +class FixPEP8(object): + + """Fix invalid code. + + Fixer methods are prefixed "fix_". The _fix_source() method looks for these + automatically. + + The fixer method can take either one or two arguments (in addition to + self). The first argument is "result", which is the error information from + pycodestyle. The second argument, "logical", is required only for + logical-line fixes. + + The fixer method can return the list of modified lines or None. An empty + list would mean that no changes were made. None would mean that only the + line reported in the pycodestyle error was modified. Note that the modified + line numbers that are returned are indexed at 1. This typically would + correspond with the line number reported in the pycodestyle error + information. + + [fixed method list] + - e111,e114,e115,e116 + - e121,e122,e123,e124,e125,e126,e127,e128,e129 + - e201,e202,e203 + - e211 + - e221,e222,e223,e224,e225 + - e231 + - e251,e252 + - e261,e262 + - e271,e272,e273,e274 + - e301,e302,e303,e304,e305,e306 + - e401,e402 + - e502 + - e701,e702,e703,e704 + - e711,e712,e713,e714 + - e722 + - e731 + - w291 + - w503,504 + + """ + + def __init__(self, filename, + options, + contents=None, + long_line_ignore_cache=None): + self.filename = filename + if contents is None: + self.source = readlines_from_file(filename) + else: + sio = io.StringIO(contents) + self.source = sio.readlines() + self.options = options + self.indent_word = _get_indentword(''.join(self.source)) + + # collect imports line + self.imports = {} + for i, line in enumerate(self.source): + if (line.find("import ") == 0 or line.find("from ") == 0) and \ + line not in self.imports: + # collect only import statements that first appeared + self.imports[line] = i + + self.long_line_ignore_cache = ( + set() if long_line_ignore_cache is None + else long_line_ignore_cache) + + # Many fixers are the same even though pycodestyle categorizes them + # differently. + self.fix_e115 = self.fix_e112 + self.fix_e121 = self._fix_reindent + self.fix_e122 = self._fix_reindent + self.fix_e123 = self._fix_reindent + self.fix_e124 = self._fix_reindent + self.fix_e126 = self._fix_reindent + self.fix_e127 = self._fix_reindent + self.fix_e128 = self._fix_reindent + self.fix_e129 = self._fix_reindent + self.fix_e133 = self.fix_e131 + self.fix_e202 = self.fix_e201 + self.fix_e203 = self.fix_e201 + self.fix_e211 = self.fix_e201 + self.fix_e221 = self.fix_e271 + self.fix_e222 = self.fix_e271 + self.fix_e223 = self.fix_e271 + self.fix_e226 = self.fix_e225 + self.fix_e227 = self.fix_e225 + self.fix_e228 = self.fix_e225 + self.fix_e241 = self.fix_e271 + self.fix_e242 = self.fix_e224 + self.fix_e252 = self.fix_e225 + self.fix_e261 = self.fix_e262 + self.fix_e272 = self.fix_e271 + self.fix_e273 = self.fix_e271 + self.fix_e274 = self.fix_e271 + self.fix_e306 = self.fix_e301 + self.fix_e501 = ( + self.fix_long_line_logically if + options and (options.aggressive >= 2 or options.experimental) else + self.fix_long_line_physically) + self.fix_e703 = self.fix_e702 + self.fix_w293 = self.fix_w291 + + def _fix_source(self, results): + try: + (logical_start, logical_end) = _find_logical(self.source) + logical_support = True + except (SyntaxError, tokenize.TokenError): # pragma: no cover + logical_support = False + + completed_lines = set() + for result in sorted(results, key=_priority_key): + if result['line'] in completed_lines: + continue + + fixed_methodname = 'fix_' + result['id'].lower() + if hasattr(self, fixed_methodname): + fix = getattr(self, fixed_methodname) + + line_index = result['line'] - 1 + original_line = self.source[line_index] + + is_logical_fix = len(_get_parameters(fix)) > 2 + if is_logical_fix: + logical = None + if logical_support: + logical = _get_logical(self.source, + result, + logical_start, + logical_end) + if logical and set(range( + logical[0][0] + 1, + logical[1][0] + 1)).intersection( + completed_lines): + continue + + modified_lines = fix(result, logical) + else: + modified_lines = fix(result) + + if modified_lines is None: + # Force logical fixes to report what they modified. + assert not is_logical_fix + + if self.source[line_index] == original_line: + modified_lines = [] + + if modified_lines: + completed_lines.update(modified_lines) + elif modified_lines == []: # Empty list means no fix + if self.options.verbose >= 2: + print( + '---> Not fixing {error} on line {line}'.format( + error=result['id'], line=result['line']), + file=sys.stderr) + else: # We assume one-line fix when None. + completed_lines.add(result['line']) + else: + if self.options.verbose >= 3: + print( + "---> '{}' is not defined.".format(fixed_methodname), + file=sys.stderr) + + info = result['info'].strip() + print('---> {}:{}:{}:{}'.format(self.filename, + result['line'], + result['column'], + info), + file=sys.stderr) + + def fix(self): + """Return a version of the source code with PEP 8 violations fixed.""" + pep8_options = { + 'ignore': self.options.ignore, + 'select': self.options.select, + 'max_line_length': self.options.max_line_length, + 'hang_closing': self.options.hang_closing, + } + results = _execute_pep8(pep8_options, self.source) + + if self.options.verbose: + progress = {} + for r in results: + if r['id'] not in progress: + progress[r['id']] = set() + progress[r['id']].add(r['line']) + print('---> {n} issue(s) to fix {progress}'.format( + n=len(results), progress=progress), file=sys.stderr) + + if self.options.line_range: + start, end = self.options.line_range + results = [r for r in results + if start <= r['line'] <= end] + + self._fix_source(filter_results(source=''.join(self.source), + results=results, + aggressive=self.options.aggressive)) + + if self.options.line_range: + # If number of lines has changed then change line_range. + count = sum(sline.count('\n') + for sline in self.source[start - 1:end]) + self.options.line_range[1] = start + count - 1 + + return ''.join(self.source) + + def _fix_reindent(self, result): + """Fix a badly indented line. + + This is done by adding or removing from its initial indent only. + + """ + num_indent_spaces = int(result['info'].split()[1]) + line_index = result['line'] - 1 + target = self.source[line_index] + + self.source[line_index] = ' ' * num_indent_spaces + target.lstrip() + + def fix_e112(self, result): + """Fix under-indented comments.""" + line_index = result['line'] - 1 + target = self.source[line_index] + + if not target.lstrip().startswith('#'): + # Don't screw with invalid syntax. + return [] + + self.source[line_index] = self.indent_word + target + + def fix_e113(self, result): + """Fix unexpected indentation.""" + line_index = result['line'] - 1 + target = self.source[line_index] + indent = _get_indentation(target) + stripped = target.lstrip() + self.source[line_index] = indent[1:] + stripped + + def fix_e116(self, result): + """Fix over-indented comments.""" + line_index = result['line'] - 1 + target = self.source[line_index] + + indent = _get_indentation(target) + stripped = target.lstrip() + + if not stripped.startswith('#'): + # Don't screw with invalid syntax. + return [] + + self.source[line_index] = indent[1:] + stripped + + def fix_e125(self, result): + """Fix indentation undistinguish from the next logical line.""" + num_indent_spaces = int(result['info'].split()[1]) + line_index = result['line'] - 1 + target = self.source[line_index] + + spaces_to_add = num_indent_spaces - len(_get_indentation(target)) + indent = len(_get_indentation(target)) + modified_lines = [] + + while len(_get_indentation(self.source[line_index])) >= indent: + self.source[line_index] = (' ' * spaces_to_add + + self.source[line_index]) + modified_lines.append(1 + line_index) # Line indexed at 1. + line_index -= 1 + + return modified_lines + + def fix_e131(self, result): + """Fix indentation undistinguish from the next logical line.""" + num_indent_spaces = int(result['info'].split()[1]) + line_index = result['line'] - 1 + target = self.source[line_index] + + spaces_to_add = num_indent_spaces - len(_get_indentation(target)) + + if spaces_to_add >= 0: + self.source[line_index] = (' ' * spaces_to_add + + self.source[line_index]) + else: + offset = abs(spaces_to_add) + self.source[line_index] = self.source[line_index][offset:] + + def fix_e201(self, result): + """Remove extraneous whitespace.""" + line_index = result['line'] - 1 + target = self.source[line_index] + offset = result['column'] - 1 + + fixed = fix_whitespace(target, + offset=offset, + replacement='') + + self.source[line_index] = fixed + + def fix_e224(self, result): + """Remove extraneous whitespace around operator.""" + target = self.source[result['line'] - 1] + offset = result['column'] - 1 + fixed = target[:offset] + target[offset:].replace('\t', ' ') + self.source[result['line'] - 1] = fixed + + def fix_e225(self, result): + """Fix missing whitespace around operator.""" + target = self.source[result['line'] - 1] + offset = result['column'] - 1 + fixed = target[:offset] + ' ' + target[offset:] + + # Only proceed if non-whitespace characters match. + # And make sure we don't break the indentation. + if ( + fixed.replace(' ', '') == target.replace(' ', '') and + _get_indentation(fixed) == _get_indentation(target) + ): + self.source[result['line'] - 1] = fixed + error_code = result.get('id', 0) + try: + ts = generate_tokens(fixed) + except (SyntaxError, tokenize.TokenError): + return + if not check_syntax(fixed.lstrip()): + return + errors = list( + pycodestyle.missing_whitespace_around_operator(fixed, ts)) + for e in reversed(errors): + if error_code != e[1].split()[0]: + continue + offset = e[0][1] + fixed = fixed[:offset] + ' ' + fixed[offset:] + self.source[result['line'] - 1] = fixed + else: + return [] + + def fix_e231(self, result): + """Add missing whitespace.""" + line_index = result['line'] - 1 + target = self.source[line_index] + offset = result['column'] + fixed = target[:offset].rstrip() + ' ' + target[offset:].lstrip() + self.source[line_index] = fixed + + def fix_e251(self, result): + """Remove whitespace around parameter '=' sign.""" + line_index = result['line'] - 1 + target = self.source[line_index] + + # This is necessary since pycodestyle sometimes reports columns that + # goes past the end of the physical line. This happens in cases like, + # foo(bar\n=None) + c = min(result['column'] - 1, + len(target) - 1) + + if target[c].strip(): + fixed = target + else: + fixed = target[:c].rstrip() + target[c:].lstrip() + + # There could be an escaped newline + # + # def foo(a=\ + # 1) + if fixed.endswith(('=\\\n', '=\\\r\n', '=\\\r')): + self.source[line_index] = fixed.rstrip('\n\r \t\\') + self.source[line_index + 1] = self.source[line_index + 1].lstrip() + return [line_index + 1, line_index + 2] # Line indexed at 1 + + self.source[result['line'] - 1] = fixed + + def fix_e262(self, result): + """Fix spacing after comment hash.""" + target = self.source[result['line'] - 1] + offset = result['column'] + + code = target[:offset].rstrip(' \t#') + comment = target[offset:].lstrip(' \t#') + + fixed = code + (' # ' + comment if comment.strip() else '\n') + + self.source[result['line'] - 1] = fixed + + def fix_e271(self, result): + """Fix extraneous whitespace around keywords.""" + line_index = result['line'] - 1 + target = self.source[line_index] + offset = result['column'] - 1 + + fixed = fix_whitespace(target, + offset=offset, + replacement=' ') + + if fixed == target: + return [] + else: + self.source[line_index] = fixed + + def fix_e301(self, result): + """Add missing blank line.""" + cr = '\n' + self.source[result['line'] - 1] = cr + self.source[result['line'] - 1] + + def fix_e302(self, result): + """Add missing 2 blank lines.""" + add_linenum = 2 - int(result['info'].split()[-1]) + cr = '\n' * add_linenum + self.source[result['line'] - 1] = cr + self.source[result['line'] - 1] + + def fix_e303(self, result): + """Remove extra blank lines.""" + delete_linenum = int(result['info'].split('(')[1].split(')')[0]) - 2 + delete_linenum = max(1, delete_linenum) + + # We need to count because pycodestyle reports an offset line number if + # there are comments. + cnt = 0 + line = result['line'] - 2 + modified_lines = [] + while cnt < delete_linenum and line >= 0: + if not self.source[line].strip(): + self.source[line] = '' + modified_lines.append(1 + line) # Line indexed at 1 + cnt += 1 + line -= 1 + + return modified_lines + + def fix_e304(self, result): + """Remove blank line following function decorator.""" + line = result['line'] - 2 + if not self.source[line].strip(): + self.source[line] = '' + + def fix_e305(self, result): + """Add missing 2 blank lines after end of function or class.""" + add_delete_linenum = 2 - int(result['info'].split()[-1]) + cnt = 0 + offset = result['line'] - 2 + modified_lines = [] + if add_delete_linenum < 0: + # delete cr + add_delete_linenum = abs(add_delete_linenum) + while cnt < add_delete_linenum and offset >= 0: + if not self.source[offset].strip(): + self.source[offset] = '' + modified_lines.append(1 + offset) # Line indexed at 1 + cnt += 1 + offset -= 1 + else: + # add cr + cr = '\n' + # check comment line + while True: + if offset < 0: + break + line = self.source[offset].lstrip() + if not line: + break + if line[0] != '#': + break + offset -= 1 + offset += 1 + self.source[offset] = cr + self.source[offset] + modified_lines.append(1 + offset) # Line indexed at 1. + return modified_lines + + def fix_e401(self, result): + """Put imports on separate lines.""" + line_index = result['line'] - 1 + target = self.source[line_index] + offset = result['column'] - 1 + + if not target.lstrip().startswith('import'): + return [] + + indentation = re.split(pattern=r'\bimport\b', + string=target, maxsplit=1)[0] + fixed = (target[:offset].rstrip('\t ,') + '\n' + + indentation + 'import ' + target[offset:].lstrip('\t ,')) + self.source[line_index] = fixed + + def fix_e402(self, result): + (line_index, offset, target) = get_index_offset_contents(result, + self.source) + for i in range(1, 100): + line = "".join(self.source[line_index:line_index+i]) + try: + generate_tokens("".join(line)) + except (SyntaxError, tokenize.TokenError): + continue + break + if not (target in self.imports and self.imports[target] != line_index): + mod_offset = get_module_imports_on_top_of_file(self.source, + line_index) + self.source[mod_offset] = line + self.source[mod_offset] + for offset in range(i): + self.source[line_index+offset] = '' + + def fix_long_line_logically(self, result, logical): + """Try to make lines fit within --max-line-length characters.""" + if ( + not logical or + len(logical[2]) == 1 or + self.source[result['line'] - 1].lstrip().startswith('#') + ): + return self.fix_long_line_physically(result) + + start_line_index = logical[0][0] + end_line_index = logical[1][0] + logical_lines = logical[2] + + previous_line = get_item(self.source, start_line_index - 1, default='') + next_line = get_item(self.source, end_line_index + 1, default='') + + single_line = join_logical_line(''.join(logical_lines)) + + try: + fixed = self.fix_long_line( + target=single_line, + previous_line=previous_line, + next_line=next_line, + original=''.join(logical_lines)) + except (SyntaxError, tokenize.TokenError): + return self.fix_long_line_physically(result) + + if fixed: + for line_index in range(start_line_index, end_line_index + 1): + self.source[line_index] = '' + self.source[start_line_index] = fixed + return range(start_line_index + 1, end_line_index + 1) + + return [] + + def fix_long_line_physically(self, result): + """Try to make lines fit within --max-line-length characters.""" + line_index = result['line'] - 1 + target = self.source[line_index] + + previous_line = get_item(self.source, line_index - 1, default='') + next_line = get_item(self.source, line_index + 1, default='') + + try: + fixed = self.fix_long_line( + target=target, + previous_line=previous_line, + next_line=next_line, + original=target) + except (SyntaxError, tokenize.TokenError): + return [] + + if fixed: + self.source[line_index] = fixed + return [line_index + 1] + + return [] + + def fix_long_line(self, target, previous_line, + next_line, original): + cache_entry = (target, previous_line, next_line) + if cache_entry in self.long_line_ignore_cache: + return [] + + if target.lstrip().startswith('#'): + if self.options.aggressive: + # Wrap commented lines. + return shorten_comment( + line=target, + max_line_length=self.options.max_line_length, + last_comment=not next_line.lstrip().startswith('#')) + return [] + + fixed = get_fixed_long_line( + target=target, + previous_line=previous_line, + original=original, + indent_word=self.indent_word, + max_line_length=self.options.max_line_length, + aggressive=self.options.aggressive, + experimental=self.options.experimental, + verbose=self.options.verbose) + + if fixed and not code_almost_equal(original, fixed): + return fixed + + self.long_line_ignore_cache.add(cache_entry) + return None + + def fix_e502(self, result): + """Remove extraneous escape of newline.""" + (line_index, _, target) = get_index_offset_contents(result, + self.source) + self.source[line_index] = target.rstrip('\n\r \t\\') + '\n' + + def fix_e701(self, result): + """Put colon-separated compound statement on separate lines.""" + line_index = result['line'] - 1 + target = self.source[line_index] + c = result['column'] + + fixed_source = (target[:c] + '\n' + + _get_indentation(target) + self.indent_word + + target[c:].lstrip('\n\r \t\\')) + self.source[result['line'] - 1] = fixed_source + return [result['line'], result['line'] + 1] + + def fix_e702(self, result, logical): + """Put semicolon-separated compound statement on separate lines.""" + if not logical: + return [] # pragma: no cover + logical_lines = logical[2] + + # Avoid applying this when indented. + # https://docs.python.org/reference/compound_stmts.html + for line in logical_lines: + if (result['id'] == 'E702' and ':' in line + and STARTSWITH_DEF_REGEX.match(line)): + return [] + + line_index = result['line'] - 1 + target = self.source[line_index] + + if target.rstrip().endswith('\\'): + # Normalize '1; \\\n2' into '1; 2'. + self.source[line_index] = target.rstrip('\n \r\t\\') + self.source[line_index + 1] = self.source[line_index + 1].lstrip() + return [line_index + 1, line_index + 2] + + if target.rstrip().endswith(';'): + self.source[line_index] = target.rstrip('\n \r\t;') + '\n' + return [line_index + 1] + + offset = result['column'] - 1 + first = target[:offset].rstrip(';').rstrip() + second = (_get_indentation(logical_lines[0]) + + target[offset:].lstrip(';').lstrip()) + + # Find inline comment. + inline_comment = None + if target[offset:].lstrip(';').lstrip()[:2] == '# ': + inline_comment = target[offset:].lstrip(';') + + if inline_comment: + self.source[line_index] = first + inline_comment + else: + self.source[line_index] = first + '\n' + second + return [line_index + 1] + + def fix_e704(self, result): + """Fix multiple statements on one line def""" + (line_index, _, target) = get_index_offset_contents(result, + self.source) + match = STARTSWITH_DEF_REGEX.match(target) + if match: + self.source[line_index] = '{}\n{}{}'.format( + match.group(0), + _get_indentation(target) + self.indent_word, + target[match.end(0):].lstrip()) + + def fix_e711(self, result): + """Fix comparison with None.""" + (line_index, offset, target) = get_index_offset_contents(result, + self.source) + + right_offset = offset + 2 + if right_offset >= len(target): + return [] + + left = target[:offset].rstrip() + center = target[offset:right_offset] + right = target[right_offset:].lstrip() + + if not right.startswith('None'): + return [] + + if center.strip() == '==': + new_center = 'is' + elif center.strip() == '!=': + new_center = 'is not' + else: + return [] + + self.source[line_index] = ' '.join([left, new_center, right]) + + def fix_e712(self, result): + """Fix (trivial case of) comparison with boolean.""" + (line_index, offset, target) = get_index_offset_contents(result, + self.source) + + # Handle very easy "not" special cases. + if re.match(r'^\s*if [\w."\'\[\]]+ == False:$', target): + self.source[line_index] = re.sub(r'if ([\w."\'\[\]]+) == False:', + r'if not \1:', target, count=1) + elif re.match(r'^\s*if [\w."\'\[\]]+ != True:$', target): + self.source[line_index] = re.sub(r'if ([\w."\'\[\]]+) != True:', + r'if not \1:', target, count=1) + else: + right_offset = offset + 2 + if right_offset >= len(target): + return [] + + left = target[:offset].rstrip() + center = target[offset:right_offset] + right = target[right_offset:].lstrip() + + # Handle simple cases only. + new_right = None + if center.strip() == '==': + if re.match(r'\bTrue\b', right): + new_right = re.sub(r'\bTrue\b *', '', right, count=1) + elif center.strip() == '!=': + if re.match(r'\bFalse\b', right): + new_right = re.sub(r'\bFalse\b *', '', right, count=1) + + if new_right is None: + return [] + + if new_right[0].isalnum(): + new_right = ' ' + new_right + + self.source[line_index] = left + new_right + + def fix_e713(self, result): + """Fix (trivial case of) non-membership check.""" + (line_index, offset, target) = get_index_offset_contents(result, + self.source) + + # to convert once 'not in' -> 'in' + before_target = target[:offset] + target = target[offset:] + match_notin = COMPARE_NEGATIVE_REGEX_THROUGH.search(target) + notin_pos_start, notin_pos_end = 0, 0 + if match_notin: + notin_pos_start = match_notin.start(1) + notin_pos_end = match_notin.end() + target = '{}{} {}'.format( + target[:notin_pos_start], 'in', target[notin_pos_end:]) + + # fix 'not in' + match = COMPARE_NEGATIVE_REGEX.search(target) + if match: + if match.group(3) == 'in': + pos_start = match.start(1) + new_target = '{5}{0}{1} {2} {3} {4}'.format( + target[:pos_start], match.group(2), match.group(1), + match.group(3), target[match.end():], before_target) + if match_notin: + # revert 'in' -> 'not in' + pos_start = notin_pos_start + offset + pos_end = notin_pos_end + offset - 4 # len('not ') + new_target = '{}{} {}'.format( + new_target[:pos_start], 'not in', new_target[pos_end:]) + self.source[line_index] = new_target + + def fix_e714(self, result): + """Fix object identity should be 'is not' case.""" + (line_index, offset, target) = get_index_offset_contents(result, + self.source) + + # to convert once 'is not' -> 'is' + before_target = target[:offset] + target = target[offset:] + match_isnot = COMPARE_NEGATIVE_REGEX_THROUGH.search(target) + isnot_pos_start, isnot_pos_end = 0, 0 + if match_isnot: + isnot_pos_start = match_isnot.start(1) + isnot_pos_end = match_isnot.end() + target = '{}{} {}'.format( + target[:isnot_pos_start], 'in', target[isnot_pos_end:]) + + match = COMPARE_NEGATIVE_REGEX.search(target) + if match: + if match.group(3).startswith('is'): + pos_start = match.start(1) + new_target = '{5}{0}{1} {2} {3} {4}'.format( + target[:pos_start], match.group(2), match.group(3), + match.group(1), target[match.end():], before_target) + if match_isnot: + # revert 'is' -> 'is not' + pos_start = isnot_pos_start + offset + pos_end = isnot_pos_end + offset - 4 # len('not ') + new_target = '{}{} {}'.format( + new_target[:pos_start], 'is not', new_target[pos_end:]) + self.source[line_index] = new_target + + def fix_e722(self, result): + """fix bare except""" + (line_index, _, target) = get_index_offset_contents(result, + self.source) + match = BARE_EXCEPT_REGEX.search(target) + if match: + self.source[line_index] = '{}{}{}'.format( + target[:result['column'] - 1], "except BaseException:", + target[match.end():]) + + def fix_e731(self, result): + """Fix do not assign a lambda expression check.""" + (line_index, _, target) = get_index_offset_contents(result, + self.source) + match = LAMBDA_REGEX.search(target) + if match: + end = match.end() + self.source[line_index] = '{}def {}({}): return {}'.format( + target[:match.start(0)], match.group(1), match.group(2), + target[end:].lstrip()) + + def fix_w291(self, result): + """Remove trailing whitespace.""" + fixed_line = self.source[result['line'] - 1].rstrip() + self.source[result['line'] - 1] = fixed_line + '\n' + + def fix_w391(self, _): + """Remove trailing blank lines.""" + blank_count = 0 + for line in reversed(self.source): + line = line.rstrip() + if line: + break + else: + blank_count += 1 + + original_length = len(self.source) + self.source = self.source[:original_length - blank_count] + return range(1, 1 + original_length) + + def fix_w503(self, result): + (line_index, _, target) = get_index_offset_contents(result, + self.source) + one_string_token = target.split()[0] + try: + ts = generate_tokens(one_string_token) + except (SyntaxError, tokenize.TokenError): + return + if not _is_binary_operator(ts[0][0], one_string_token): + return + # find comment + comment_index = 0 + found_not_comment_only_line = False + comment_only_linenum = 0 + for i in range(5): + # NOTE: try to parse code in 5 times + if (line_index - i) < 0: + break + from_index = line_index - i - 1 + if from_index < 0 or len(self.source) <= from_index: + break + to_index = line_index + 1 + strip_line = self.source[from_index].lstrip() + if ( + not found_not_comment_only_line and + strip_line and strip_line[0] == '#' + ): + comment_only_linenum += 1 + continue + found_not_comment_only_line = True + try: + ts = generate_tokens("".join(self.source[from_index:to_index])) + except (SyntaxError, tokenize.TokenError): + continue + newline_count = 0 + newline_index = [] + for index, t in enumerate(ts): + if t[0] in (tokenize.NEWLINE, tokenize.NL): + newline_index.append(index) + newline_count += 1 + if newline_count > 2: + tts = ts[newline_index[-3]:] + else: + tts = ts + old = None + for t in tts: + if t[0] in (tokenize.NEWLINE, tokenize.NL): + newline_count -= 1 + if newline_count <= 1: + break + if tokenize.COMMENT == t[0] and old and old[0] != tokenize.NL: + comment_index = old[3][1] + break + old = t + break + i = target.index(one_string_token) + fix_target_line = line_index - 1 - comment_only_linenum + self.source[line_index] = '{}{}'.format( + target[:i], target[i + len(one_string_token):].lstrip()) + nl = find_newline(self.source[fix_target_line:line_index]) + before_line = self.source[fix_target_line] + bl = before_line.index(nl) + if comment_index: + self.source[fix_target_line] = '{} {} {}'.format( + before_line[:comment_index], one_string_token, + before_line[comment_index + 1:]) + else: + self.source[fix_target_line] = '{} {}{}'.format( + before_line[:bl], one_string_token, before_line[bl:]) + + def fix_w504(self, result): + (line_index, _, target) = get_index_offset_contents(result, + self.source) + # NOTE: is not collect pointed out in pycodestyle==2.4.0 + comment_index = 0 + operator_position = None # (start_position, end_position) + for i in range(1, 6): + to_index = line_index + i + try: + ts = generate_tokens("".join(self.source[line_index:to_index])) + except (SyntaxError, tokenize.TokenError): + continue + newline_count = 0 + newline_index = [] + for index, t in enumerate(ts): + if _is_binary_operator(t[0], t[1]): + if t[2][0] == 1 and t[3][0] == 1: + operator_position = (t[2][1], t[3][1]) + elif t[0] == tokenize.NAME and t[1] in ("and", "or"): + if t[2][0] == 1 and t[3][0] == 1: + operator_position = (t[2][1], t[3][1]) + elif t[0] in (tokenize.NEWLINE, tokenize.NL): + newline_index.append(index) + newline_count += 1 + if newline_count > 2: + tts = ts[:newline_index[-3]] + else: + tts = ts + old = [] + for t in tts: + if tokenize.COMMENT == t[0] and old: + comment_index = old[3][1] + break + old = t + break + if not operator_position: + return + target_operator = target[operator_position[0]:operator_position[1]] + if comment_index: + self.source[line_index] = '{}{}'.format( + target[:operator_position[0]].rstrip(), + target[comment_index:]) + else: + self.source[line_index] = '{}{}{}'.format( + target[:operator_position[0]].rstrip(), + target[operator_position[1]:].lstrip(), + target[operator_position[1]:]) + next_line = self.source[line_index + 1] + next_line_indent = 0 + m = re.match(r'\s*', next_line) + if m: + next_line_indent = m.span()[1] + self.source[line_index + 1] = '{}{} {}'.format( + next_line[:next_line_indent], target_operator, + next_line[next_line_indent:]) + + def fix_w605(self, result): + (line_index, _, target) = get_index_offset_contents(result, + self.source) + try: + tokens = list(generate_tokens(target)) + except (SyntaxError, tokenize.TokenError): + return + for (pos, _msg) in get_w605_position(tokens): + self.source[line_index] = '{}r{}'.format( + target[:pos], target[pos:]) + + +def get_w605_position(tokens): + """workaround get pointing out position by W605.""" + # TODO: When this PR(*) change is released, use pos of pycodestyle + # *: https://github.com/PyCQA/pycodestyle/pull/747 + valid = [ + '\n', '\\', '\'', '"', 'a', 'b', 'f', 'n', 'r', 't', 'v', + '0', '1', '2', '3', '4', '5', '6', '7', 'x', + + # Escape sequences only recognized in string literals + 'N', 'u', 'U', + ] + + for token_type, text, start_pos, end_pos, line in tokens: + if token_type == tokenize.STRING: + quote = text[-3:] if text[-3:] in ('"""', "'''") else text[-1] + # Extract string modifiers (e.g. u or r) + quote_pos = text.index(quote) + prefix = text[:quote_pos].lower() + start = quote_pos + len(quote) + string = text[start:-len(quote)] + + if 'r' not in prefix: + pos = string.find('\\') + while pos >= 0: + pos += 1 + if string[pos] not in valid: + yield ( + # No need to search line, token stores position + start_pos[1], + "W605 invalid escape sequence '\\%s'" % + string[pos], + ) + pos = string.find('\\', pos + 1) + + +def get_module_imports_on_top_of_file(source, import_line_index): + """return import or from keyword position + + example: + > 0: import sys + 1: import os + 2: + 3: def function(): + """ + def is_string_literal(line): + if line[0] in 'uUbB': + line = line[1:] + if line and line[0] in 'rR': + line = line[1:] + return line and (line[0] == '"' or line[0] == "'") + allowed_try_keywords = ('try', 'except', 'else', 'finally') + for cnt, line in enumerate(source): + if not line.rstrip(): + continue + elif line.startswith('#'): + continue + if line.startswith('import ') or line.startswith('from '): + if cnt == import_line_index: + continue + return cnt + elif pycodestyle.DUNDER_REGEX.match(line): + continue + elif any(line.startswith(kw) for kw in allowed_try_keywords): + continue + elif is_string_literal(line): + return cnt + else: + return cnt + return 0 + + +def get_index_offset_contents(result, source): + """Return (line_index, column_offset, line_contents).""" + line_index = result['line'] - 1 + return (line_index, + result['column'] - 1, + source[line_index]) + + +def get_fixed_long_line(target, previous_line, original, + indent_word=' ', max_line_length=79, + aggressive=False, experimental=False, verbose=False): + """Break up long line and return result. + + Do this by generating multiple reformatted candidates and then + ranking the candidates to heuristically select the best option. + + """ + indent = _get_indentation(target) + source = target[len(indent):] + assert source.lstrip() == source + assert not target.lstrip().startswith('#') + + # Check for partial multiline. + tokens = list(generate_tokens(source)) + + candidates = shorten_line( + tokens, source, indent, + indent_word, + max_line_length, + aggressive=aggressive, + experimental=experimental, + previous_line=previous_line) + + # Also sort alphabetically as a tie breaker (for determinism). + candidates = sorted( + sorted(set(candidates).union([target, original])), + key=lambda x: line_shortening_rank( + x, + indent_word, + max_line_length, + experimental=experimental)) + + if verbose >= 4: + print(('-' * 79 + '\n').join([''] + candidates + ['']), + file=wrap_output(sys.stderr, 'utf-8')) + + if candidates: + best_candidate = candidates[0] + + # Don't allow things to get longer. + if longest_line_length(best_candidate) > longest_line_length(original): + return None + + return best_candidate + + +def longest_line_length(code): + """Return length of longest line.""" + return max(len(line) for line in code.splitlines()) + + +def join_logical_line(logical_line): + """Return single line based on logical line input.""" + indentation = _get_indentation(logical_line) + + return indentation + untokenize_without_newlines( + generate_tokens(logical_line.lstrip())) + '\n' + + +def untokenize_without_newlines(tokens): + """Return source code based on tokens.""" + text = '' + last_row = 0 + last_column = -1 + + for t in tokens: + token_string = t[1] + (start_row, start_column) = t[2] + (end_row, end_column) = t[3] + + if start_row > last_row: + last_column = 0 + if ( + (start_column > last_column or token_string == '\n') and + not text.endswith(' ') + ): + text += ' ' + + if token_string != '\n': + text += token_string + + last_row = end_row + last_column = end_column + + return text.rstrip() + + +def _find_logical(source_lines): + # Make a variable which is the index of all the starts of lines. + logical_start = [] + logical_end = [] + last_newline = True + parens = 0 + for t in generate_tokens(''.join(source_lines)): + if t[0] in [tokenize.COMMENT, tokenize.DEDENT, + tokenize.INDENT, tokenize.NL, + tokenize.ENDMARKER]: + continue + if not parens and t[0] in [tokenize.NEWLINE, tokenize.SEMI]: + last_newline = True + logical_end.append((t[3][0] - 1, t[2][1])) + continue + if last_newline and not parens: + logical_start.append((t[2][0] - 1, t[2][1])) + last_newline = False + if t[0] == tokenize.OP: + if t[1] in '([{': + parens += 1 + elif t[1] in '}])': + parens -= 1 + return (logical_start, logical_end) + + +def _get_logical(source_lines, result, logical_start, logical_end): + """Return the logical line corresponding to the result. + + Assumes input is already E702-clean. + + """ + row = result['line'] - 1 + col = result['column'] - 1 + ls = None + le = None + for i in range(0, len(logical_start), 1): + assert logical_end + x = logical_end[i] + if x[0] > row or (x[0] == row and x[1] > col): + le = x + ls = logical_start[i] + break + if ls is None: + return None + original = source_lines[ls[0]:le[0] + 1] + return ls, le, original + + +def get_item(items, index, default=None): + if 0 <= index < len(items): + return items[index] + + return default + + +def reindent(source, indent_size): + """Reindent all lines.""" + reindenter = Reindenter(source) + return reindenter.run(indent_size) + + +def code_almost_equal(a, b): + """Return True if code is similar. + + Ignore whitespace when comparing specific line. + + """ + split_a = split_and_strip_non_empty_lines(a) + split_b = split_and_strip_non_empty_lines(b) + + if len(split_a) != len(split_b): + return False + + for (index, _) in enumerate(split_a): + if ''.join(split_a[index].split()) != ''.join(split_b[index].split()): + return False + + return True + + +def split_and_strip_non_empty_lines(text): + """Return lines split by newline. + + Ignore empty lines. + + """ + return [line.strip() for line in text.splitlines() if line.strip()] + + +def fix_e265(source, aggressive=False): # pylint: disable=unused-argument + """Format block comments.""" + if '#' not in source: + # Optimization. + return source + + ignored_line_numbers = multiline_string_lines( + source, + include_docstrings=True) | set(commented_out_code_lines(source)) + + fixed_lines = [] + sio = io.StringIO(source) + for (line_number, line) in enumerate(sio.readlines(), start=1): + if ( + line.lstrip().startswith('#') and + line_number not in ignored_line_numbers and + not pycodestyle.noqa(line) + ): + indentation = _get_indentation(line) + line = line.lstrip() + + # Normalize beginning if not a shebang. + if len(line) > 1: + pos = next((index for index, c in enumerate(line) + if c != '#')) + if ( + # Leave multiple spaces like '# ' alone. + (line[:pos].count('#') > 1 or line[1].isalnum() or + not line[1].isspace()) and + line[1] not in ':!' and + # Leave stylistic outlined blocks alone. + not line.rstrip().endswith('#') + ): + line = '# ' + line.lstrip('# \t') + + fixed_lines.append(indentation + line) + else: + fixed_lines.append(line) + + return ''.join(fixed_lines) + + +def refactor(source, fixer_names, ignore=None, filename=''): + """Return refactored code using lib2to3. + + Skip if ignore string is produced in the refactored code. + + """ + from lib2to3 import pgen2 + try: + new_text = refactor_with_2to3(source, + fixer_names=fixer_names, + filename=filename) + except (pgen2.parse.ParseError, + SyntaxError, + UnicodeDecodeError, + UnicodeEncodeError): + return source + + if ignore: + if ignore in new_text and ignore not in source: + return source + + return new_text + + +def code_to_2to3(select, ignore, where='', verbose=False): + fixes = set() + for code, fix in CODE_TO_2TO3.items(): + if code_match(code, select=select, ignore=ignore): + if verbose: + print('---> Applying {} fix for {}'.format(where, + code.upper()), + file=sys.stderr) + fixes |= set(fix) + return fixes + + +def fix_2to3(source, + aggressive=True, select=None, ignore=None, filename='', + where='global', verbose=False): + """Fix various deprecated code (via lib2to3).""" + if not aggressive: + return source + + select = select or [] + ignore = ignore or [] + + return refactor(source, + code_to_2to3(select=select, + ignore=ignore, + where=where, + verbose=verbose), + filename=filename) + + +def fix_w602(source, aggressive=True): + """Fix deprecated form of raising exception.""" + if not aggressive: + return source + + return refactor(source, ['raise'], ignore='with_traceback') + + +def find_newline(source): + """Return type of newline used in source. + + Input is a list of lines. + + """ + assert not isinstance(source, unicode) + + counter = collections.defaultdict(int) + for line in source: + if line.endswith(CRLF): + counter[CRLF] += 1 + elif line.endswith(CR): + counter[CR] += 1 + elif line.endswith(LF): + counter[LF] += 1 + + return (sorted(counter, key=counter.get, reverse=True) or [LF])[0] + + +def _get_indentword(source): + """Return indentation type.""" + indent_word = ' ' # Default in case source has no indentation + try: + for t in generate_tokens(source): + if t[0] == token.INDENT: + indent_word = t[1] + break + except (SyntaxError, tokenize.TokenError): + pass + return indent_word + + +def _get_indentation(line): + """Return leading whitespace.""" + if line.strip(): + non_whitespace_index = len(line) - len(line.lstrip()) + return line[:non_whitespace_index] + + return '' + + +def get_diff_text(old, new, filename): + """Return text of unified diff between old and new.""" + newline = '\n' + diff = difflib.unified_diff( + old, new, + 'original/' + filename, + 'fixed/' + filename, + lineterm=newline) + + text = '' + for line in diff: + text += line + + # Work around missing newline (http://bugs.python.org/issue2142). + if text and not line.endswith(newline): + text += newline + r'\ No newline at end of file' + newline + + return text + + +def _priority_key(pep8_result): + """Key for sorting PEP8 results. + + Global fixes should be done first. This is important for things like + indentation. + + """ + priority = [ + # Fix multiline colon-based before semicolon based. + 'e701', + # Break multiline statements early. + 'e702', + # Things that make lines longer. + 'e225', 'e231', + # Remove extraneous whitespace before breaking lines. + 'e201', + # Shorten whitespace in comment before resorting to wrapping. + 'e262' + ] + middle_index = 10000 + lowest_priority = [ + # We need to shorten lines last since the logical fixer can get in a + # loop, which causes us to exit early. + 'e501', + ] + key = pep8_result['id'].lower() + try: + return priority.index(key) + except ValueError: + try: + return middle_index + lowest_priority.index(key) + 1 + except ValueError: + return middle_index + + +def shorten_line(tokens, source, indentation, indent_word, max_line_length, + aggressive=False, experimental=False, previous_line=''): + """Separate line at OPERATOR. + + Multiple candidates will be yielded. + + """ + for candidate in _shorten_line(tokens=tokens, + source=source, + indentation=indentation, + indent_word=indent_word, + aggressive=aggressive, + previous_line=previous_line): + yield candidate + + if aggressive: + for key_token_strings in SHORTEN_OPERATOR_GROUPS: + shortened = _shorten_line_at_tokens( + tokens=tokens, + source=source, + indentation=indentation, + indent_word=indent_word, + key_token_strings=key_token_strings, + aggressive=aggressive) + + if shortened is not None and shortened != source: + yield shortened + + if experimental: + for shortened in _shorten_line_at_tokens_new( + tokens=tokens, + source=source, + indentation=indentation, + max_line_length=max_line_length): + + yield shortened + + +def _shorten_line(tokens, source, indentation, indent_word, + aggressive=False, previous_line=''): + """Separate line at OPERATOR. + + The input is expected to be free of newlines except for inside multiline + strings and at the end. + + Multiple candidates will be yielded. + + """ + for (token_type, + token_string, + start_offset, + end_offset) in token_offsets(tokens): + + if ( + token_type == tokenize.COMMENT and + not is_probably_part_of_multiline(previous_line) and + not is_probably_part_of_multiline(source) and + not source[start_offset + 1:].strip().lower().startswith( + ('noqa', 'pragma:', 'pylint:')) + ): + # Move inline comments to previous line. + first = source[:start_offset] + second = source[start_offset:] + yield (indentation + second.strip() + '\n' + + indentation + first.strip() + '\n') + elif token_type == token.OP and token_string != '=': + # Don't break on '=' after keyword as this violates PEP 8. + + assert token_type != token.INDENT + + first = source[:end_offset] + + second_indent = indentation + if (first.rstrip().endswith('(') and + source[end_offset:].lstrip().startswith(')')): + pass + elif first.rstrip().endswith('('): + second_indent += indent_word + elif '(' in first: + second_indent += ' ' * (1 + first.find('(')) + else: + second_indent += indent_word + + second = (second_indent + source[end_offset:].lstrip()) + if ( + not second.strip() or + second.lstrip().startswith('#') + ): + continue + + # Do not begin a line with a comma + if second.lstrip().startswith(','): + continue + # Do end a line with a dot + if first.rstrip().endswith('.'): + continue + if token_string in '+-*/': + fixed = first + ' \\' + '\n' + second + else: + fixed = first + '\n' + second + + # Only fix if syntax is okay. + if check_syntax(normalize_multiline(fixed) + if aggressive else fixed): + yield indentation + fixed + + +def _is_binary_operator(token_type, text): + return ((token_type == tokenize.OP or text in ['and', 'or']) and + text not in '()[]{},:.;@=%~') + + +# A convenient way to handle tokens. +Token = collections.namedtuple('Token', ['token_type', 'token_string', + 'spos', 'epos', 'line']) + + +class ReformattedLines(object): + + """The reflowed lines of atoms. + + Each part of the line is represented as an "atom." They can be moved + around when need be to get the optimal formatting. + + """ + + ########################################################################### + # Private Classes + + class _Indent(object): + + """Represent an indentation in the atom stream.""" + + def __init__(self, indent_amt): + self._indent_amt = indent_amt + + def emit(self): + return ' ' * self._indent_amt + + @property + def size(self): + return self._indent_amt + + class _Space(object): + + """Represent a space in the atom stream.""" + + def emit(self): + return ' ' + + @property + def size(self): + return 1 + + class _LineBreak(object): + + """Represent a line break in the atom stream.""" + + def emit(self): + return '\n' + + @property + def size(self): + return 0 + + def __init__(self, max_line_length): + self._max_line_length = max_line_length + self._lines = [] + self._bracket_depth = 0 + self._prev_item = None + self._prev_prev_item = None + + def __repr__(self): + return self.emit() + + ########################################################################### + # Public Methods + + def add(self, obj, indent_amt, break_after_open_bracket): + if isinstance(obj, Atom): + self._add_item(obj, indent_amt) + return + + self._add_container(obj, indent_amt, break_after_open_bracket) + + def add_comment(self, item): + num_spaces = 2 + if len(self._lines) > 1: + if isinstance(self._lines[-1], self._Space): + num_spaces -= 1 + if len(self._lines) > 2: + if isinstance(self._lines[-2], self._Space): + num_spaces -= 1 + + while num_spaces > 0: + self._lines.append(self._Space()) + num_spaces -= 1 + self._lines.append(item) + + def add_indent(self, indent_amt): + self._lines.append(self._Indent(indent_amt)) + + def add_line_break(self, indent): + self._lines.append(self._LineBreak()) + self.add_indent(len(indent)) + + def add_line_break_at(self, index, indent_amt): + self._lines.insert(index, self._LineBreak()) + self._lines.insert(index + 1, self._Indent(indent_amt)) + + def add_space_if_needed(self, curr_text, equal=False): + if ( + not self._lines or isinstance( + self._lines[-1], (self._LineBreak, self._Indent, self._Space)) + ): + return + + prev_text = unicode(self._prev_item) + prev_prev_text = ( + unicode(self._prev_prev_item) if self._prev_prev_item else '') + + if ( + # The previous item was a keyword or identifier and the current + # item isn't an operator that doesn't require a space. + ((self._prev_item.is_keyword or self._prev_item.is_string or + self._prev_item.is_name or self._prev_item.is_number) and + (curr_text[0] not in '([{.,:}])' or + (curr_text[0] == '=' and equal))) or + + # Don't place spaces around a '.', unless it's in an 'import' + # statement. + ((prev_prev_text != 'from' and prev_text[-1] != '.' and + curr_text != 'import') and + + # Don't place a space before a colon. + curr_text[0] != ':' and + + # Don't split up ending brackets by spaces. + ((prev_text[-1] in '}])' and curr_text[0] not in '.,}])') or + + # Put a space after a colon or comma. + prev_text[-1] in ':,' or + + # Put space around '=' if asked to. + (equal and prev_text == '=') or + + # Put spaces around non-unary arithmetic operators. + ((self._prev_prev_item and + (prev_text not in '+-' and + (self._prev_prev_item.is_name or + self._prev_prev_item.is_number or + self._prev_prev_item.is_string)) and + prev_text in ('+', '-', '%', '*', '/', '//', '**', 'in'))))) + ): + self._lines.append(self._Space()) + + def previous_item(self): + """Return the previous non-whitespace item.""" + return self._prev_item + + def fits_on_current_line(self, item_extent): + return self.current_size() + item_extent <= self._max_line_length + + def current_size(self): + """The size of the current line minus the indentation.""" + size = 0 + for item in reversed(self._lines): + size += item.size + if isinstance(item, self._LineBreak): + break + + return size + + def line_empty(self): + return (self._lines and + isinstance(self._lines[-1], + (self._LineBreak, self._Indent))) + + def emit(self): + string = '' + for item in self._lines: + if isinstance(item, self._LineBreak): + string = string.rstrip() + string += item.emit() + + return string.rstrip() + '\n' + + ########################################################################### + # Private Methods + + def _add_item(self, item, indent_amt): + """Add an item to the line. + + Reflow the line to get the best formatting after the item is + inserted. The bracket depth indicates if the item is being + inserted inside of a container or not. + + """ + if self._prev_item and self._prev_item.is_string and item.is_string: + # Place consecutive string literals on separate lines. + self._lines.append(self._LineBreak()) + self._lines.append(self._Indent(indent_amt)) + + item_text = unicode(item) + if self._lines and self._bracket_depth: + # Adding the item into a container. + self._prevent_default_initializer_splitting(item, indent_amt) + + if item_text in '.,)]}': + self._split_after_delimiter(item, indent_amt) + + elif self._lines and not self.line_empty(): + # Adding the item outside of a container. + if self.fits_on_current_line(len(item_text)): + self._enforce_space(item) + + else: + # Line break for the new item. + self._lines.append(self._LineBreak()) + self._lines.append(self._Indent(indent_amt)) + + self._lines.append(item) + self._prev_item, self._prev_prev_item = item, self._prev_item + + if item_text in '([{': + self._bracket_depth += 1 + + elif item_text in '}])': + self._bracket_depth -= 1 + assert self._bracket_depth >= 0 + + def _add_container(self, container, indent_amt, break_after_open_bracket): + actual_indent = indent_amt + 1 + + if ( + unicode(self._prev_item) != '=' and + not self.line_empty() and + not self.fits_on_current_line( + container.size + self._bracket_depth + 2) + ): + + if unicode(container)[0] == '(' and self._prev_item.is_name: + # Don't split before the opening bracket of a call. + break_after_open_bracket = True + actual_indent = indent_amt + 4 + elif ( + break_after_open_bracket or + unicode(self._prev_item) not in '([{' + ): + # If the container doesn't fit on the current line and the + # current line isn't empty, place the container on the next + # line. + self._lines.append(self._LineBreak()) + self._lines.append(self._Indent(indent_amt)) + break_after_open_bracket = False + else: + actual_indent = self.current_size() + 1 + break_after_open_bracket = False + + if isinstance(container, (ListComprehension, IfExpression)): + actual_indent = indent_amt + + # Increase the continued indentation only if recursing on a + # container. + container.reflow(self, ' ' * actual_indent, + break_after_open_bracket=break_after_open_bracket) + + def _prevent_default_initializer_splitting(self, item, indent_amt): + """Prevent splitting between a default initializer. + + When there is a default initializer, it's best to keep it all on + the same line. It's nicer and more readable, even if it goes + over the maximum allowable line length. This goes back along the + current line to determine if we have a default initializer, and, + if so, to remove extraneous whitespaces and add a line + break/indent before it if needed. + + """ + if unicode(item) == '=': + # This is the assignment in the initializer. Just remove spaces for + # now. + self._delete_whitespace() + return + + if (not self._prev_item or not self._prev_prev_item or + unicode(self._prev_item) != '='): + return + + self._delete_whitespace() + prev_prev_index = self._lines.index(self._prev_prev_item) + + if ( + isinstance(self._lines[prev_prev_index - 1], self._Indent) or + self.fits_on_current_line(item.size + 1) + ): + # The default initializer is already the only item on this line. + # Don't insert a newline here. + return + + # Replace the space with a newline/indent combo. + if isinstance(self._lines[prev_prev_index - 1], self._Space): + del self._lines[prev_prev_index - 1] + + self.add_line_break_at(self._lines.index(self._prev_prev_item), + indent_amt) + + def _split_after_delimiter(self, item, indent_amt): + """Split the line only after a delimiter.""" + self._delete_whitespace() + + if self.fits_on_current_line(item.size): + return + + last_space = None + for current_item in reversed(self._lines): + if ( + last_space and + (not isinstance(current_item, Atom) or + not current_item.is_colon) + ): + break + else: + last_space = None + if isinstance(current_item, self._Space): + last_space = current_item + if isinstance(current_item, (self._LineBreak, self._Indent)): + return + + if not last_space: + return + + self.add_line_break_at(self._lines.index(last_space), indent_amt) + + def _enforce_space(self, item): + """Enforce a space in certain situations. + + There are cases where we will want a space where normally we + wouldn't put one. This just enforces the addition of a space. + + """ + if isinstance(self._lines[-1], + (self._Space, self._LineBreak, self._Indent)): + return + + if not self._prev_item: + return + + item_text = unicode(item) + prev_text = unicode(self._prev_item) + + # Prefer a space around a '.' in an import statement, and between the + # 'import' and '('. + if ( + (item_text == '.' and prev_text == 'from') or + (item_text == 'import' and prev_text == '.') or + (item_text == '(' and prev_text == 'import') + ): + self._lines.append(self._Space()) + + def _delete_whitespace(self): + """Delete all whitespace from the end of the line.""" + while isinstance(self._lines[-1], (self._Space, self._LineBreak, + self._Indent)): + del self._lines[-1] + + +class Atom(object): + + """The smallest unbreakable unit that can be reflowed.""" + + def __init__(self, atom): + self._atom = atom + + def __repr__(self): + return self._atom.token_string + + def __len__(self): + return self.size + + def reflow( + self, reflowed_lines, continued_indent, extent, + break_after_open_bracket=False, + is_list_comp_or_if_expr=False, + next_is_dot=False + ): + if self._atom.token_type == tokenize.COMMENT: + reflowed_lines.add_comment(self) + return + + total_size = extent if extent else self.size + + if self._atom.token_string not in ',:([{}])': + # Some atoms will need an extra 1-sized space token after them. + total_size += 1 + + prev_item = reflowed_lines.previous_item() + if ( + not is_list_comp_or_if_expr and + not reflowed_lines.fits_on_current_line(total_size) and + not (next_is_dot and + reflowed_lines.fits_on_current_line(self.size + 1)) and + not reflowed_lines.line_empty() and + not self.is_colon and + not (prev_item and prev_item.is_name and + unicode(self) == '(') + ): + # Start a new line if there is already something on the line and + # adding this atom would make it go over the max line length. + reflowed_lines.add_line_break(continued_indent) + else: + reflowed_lines.add_space_if_needed(unicode(self)) + + reflowed_lines.add(self, len(continued_indent), + break_after_open_bracket) + + def emit(self): + return self.__repr__() + + @property + def is_keyword(self): + return keyword.iskeyword(self._atom.token_string) + + @property + def is_string(self): + return self._atom.token_type == tokenize.STRING + + @property + def is_name(self): + return self._atom.token_type == tokenize.NAME + + @property + def is_number(self): + return self._atom.token_type == tokenize.NUMBER + + @property + def is_comma(self): + return self._atom.token_string == ',' + + @property + def is_colon(self): + return self._atom.token_string == ':' + + @property + def size(self): + return len(self._atom.token_string) + + +class Container(object): + + """Base class for all container types.""" + + def __init__(self, items): + self._items = items + + def __repr__(self): + string = '' + last_was_keyword = False + + for item in self._items: + if item.is_comma: + string += ', ' + elif item.is_colon: + string += ': ' + else: + item_string = unicode(item) + if ( + string and + (last_was_keyword or + (not string.endswith(tuple('([{,.:}]) ')) and + not item_string.startswith(tuple('([{,.:}])')))) + ): + string += ' ' + string += item_string + + last_was_keyword = item.is_keyword + return string + + def __iter__(self): + for element in self._items: + yield element + + def __getitem__(self, idx): + return self._items[idx] + + def reflow(self, reflowed_lines, continued_indent, + break_after_open_bracket=False): + last_was_container = False + for (index, item) in enumerate(self._items): + next_item = get_item(self._items, index + 1) + + if isinstance(item, Atom): + is_list_comp_or_if_expr = ( + isinstance(self, (ListComprehension, IfExpression))) + item.reflow(reflowed_lines, continued_indent, + self._get_extent(index), + is_list_comp_or_if_expr=is_list_comp_or_if_expr, + next_is_dot=(next_item and + unicode(next_item) == '.')) + if last_was_container and item.is_comma: + reflowed_lines.add_line_break(continued_indent) + last_was_container = False + else: # isinstance(item, Container) + reflowed_lines.add(item, len(continued_indent), + break_after_open_bracket) + last_was_container = not isinstance(item, (ListComprehension, + IfExpression)) + + if ( + break_after_open_bracket and index == 0 and + # Prefer to keep empty containers together instead of + # separating them. + unicode(item) == self.open_bracket and + (not next_item or unicode(next_item) != self.close_bracket) and + (len(self._items) != 3 or not isinstance(next_item, Atom)) + ): + reflowed_lines.add_line_break(continued_indent) + break_after_open_bracket = False + else: + next_next_item = get_item(self._items, index + 2) + if ( + unicode(item) not in ['.', '%', 'in'] and + next_item and not isinstance(next_item, Container) and + unicode(next_item) != ':' and + next_next_item and (not isinstance(next_next_item, Atom) or + unicode(next_item) == 'not') and + not reflowed_lines.line_empty() and + not reflowed_lines.fits_on_current_line( + self._get_extent(index + 1) + 2) + ): + reflowed_lines.add_line_break(continued_indent) + + def _get_extent(self, index): + """The extent of the full element. + + E.g., the length of a function call or keyword. + + """ + extent = 0 + prev_item = get_item(self._items, index - 1) + seen_dot = prev_item and unicode(prev_item) == '.' + while index < len(self._items): + item = get_item(self._items, index) + index += 1 + + if isinstance(item, (ListComprehension, IfExpression)): + break + + if isinstance(item, Container): + if prev_item and prev_item.is_name: + if seen_dot: + extent += 1 + else: + extent += item.size + + prev_item = item + continue + elif (unicode(item) not in ['.', '=', ':', 'not'] and + not item.is_name and not item.is_string): + break + + if unicode(item) == '.': + seen_dot = True + + extent += item.size + prev_item = item + + return extent + + @property + def is_string(self): + return False + + @property + def size(self): + return len(self.__repr__()) + + @property + def is_keyword(self): + return False + + @property + def is_name(self): + return False + + @property + def is_comma(self): + return False + + @property + def is_colon(self): + return False + + @property + def open_bracket(self): + return None + + @property + def close_bracket(self): + return None + + +class Tuple(Container): + + """A high-level representation of a tuple.""" + + @property + def open_bracket(self): + return '(' + + @property + def close_bracket(self): + return ')' + + +class List(Container): + + """A high-level representation of a list.""" + + @property + def open_bracket(self): + return '[' + + @property + def close_bracket(self): + return ']' + + +class DictOrSet(Container): + + """A high-level representation of a dictionary or set.""" + + @property + def open_bracket(self): + return '{' + + @property + def close_bracket(self): + return '}' + + +class ListComprehension(Container): + + """A high-level representation of a list comprehension.""" + + @property + def size(self): + length = 0 + for item in self._items: + if isinstance(item, IfExpression): + break + length += item.size + return length + + +class IfExpression(Container): + + """A high-level representation of an if-expression.""" + + +def _parse_container(tokens, index, for_or_if=None): + """Parse a high-level container, such as a list, tuple, etc.""" + + # Store the opening bracket. + items = [Atom(Token(*tokens[index]))] + index += 1 + + num_tokens = len(tokens) + while index < num_tokens: + tok = Token(*tokens[index]) + + if tok.token_string in ',)]}': + # First check if we're at the end of a list comprehension or + # if-expression. Don't add the ending token as part of the list + # comprehension or if-expression, because they aren't part of those + # constructs. + if for_or_if == 'for': + return (ListComprehension(items), index - 1) + + elif for_or_if == 'if': + return (IfExpression(items), index - 1) + + # We've reached the end of a container. + items.append(Atom(tok)) + + # If not, then we are at the end of a container. + if tok.token_string == ')': + # The end of a tuple. + return (Tuple(items), index) + + elif tok.token_string == ']': + # The end of a list. + return (List(items), index) + + elif tok.token_string == '}': + # The end of a dictionary or set. + return (DictOrSet(items), index) + + elif tok.token_string in '([{': + # A sub-container is being defined. + (container, index) = _parse_container(tokens, index) + items.append(container) + + elif tok.token_string == 'for': + (container, index) = _parse_container(tokens, index, 'for') + items.append(container) + + elif tok.token_string == 'if': + (container, index) = _parse_container(tokens, index, 'if') + items.append(container) + + else: + items.append(Atom(tok)) + + index += 1 + + return (None, None) + + +def _parse_tokens(tokens): + """Parse the tokens. + + This converts the tokens into a form where we can manipulate them + more easily. + + """ + + index = 0 + parsed_tokens = [] + + num_tokens = len(tokens) + while index < num_tokens: + tok = Token(*tokens[index]) + + assert tok.token_type != token.INDENT + if tok.token_type == tokenize.NEWLINE: + # There's only one newline and it's at the end. + break + + if tok.token_string in '([{': + (container, index) = _parse_container(tokens, index) + if not container: + return None + parsed_tokens.append(container) + else: + parsed_tokens.append(Atom(tok)) + + index += 1 + + return parsed_tokens + + +def _reflow_lines(parsed_tokens, indentation, max_line_length, + start_on_prefix_line): + """Reflow the lines so that it looks nice.""" + + if unicode(parsed_tokens[0]) == 'def': + # A function definition gets indented a bit more. + continued_indent = indentation + ' ' * 2 * DEFAULT_INDENT_SIZE + else: + continued_indent = indentation + ' ' * DEFAULT_INDENT_SIZE + + break_after_open_bracket = not start_on_prefix_line + + lines = ReformattedLines(max_line_length) + lines.add_indent(len(indentation.lstrip('\r\n'))) + + if not start_on_prefix_line: + # If splitting after the opening bracket will cause the first element + # to be aligned weirdly, don't try it. + first_token = get_item(parsed_tokens, 0) + second_token = get_item(parsed_tokens, 1) + + if ( + first_token and second_token and + unicode(second_token)[0] == '(' and + len(indentation) + len(first_token) + 1 == len(continued_indent) + ): + return None + + for item in parsed_tokens: + lines.add_space_if_needed(unicode(item), equal=True) + + save_continued_indent = continued_indent + if start_on_prefix_line and isinstance(item, Container): + start_on_prefix_line = False + continued_indent = ' ' * (lines.current_size() + 1) + + item.reflow(lines, continued_indent, break_after_open_bracket) + continued_indent = save_continued_indent + + return lines.emit() + + +def _shorten_line_at_tokens_new(tokens, source, indentation, + max_line_length): + """Shorten the line taking its length into account. + + The input is expected to be free of newlines except for inside + multiline strings and at the end. + + """ + # Yield the original source so to see if it's a better choice than the + # shortened candidate lines we generate here. + yield indentation + source + + parsed_tokens = _parse_tokens(tokens) + + if parsed_tokens: + # Perform two reflows. The first one starts on the same line as the + # prefix. The second starts on the line after the prefix. + fixed = _reflow_lines(parsed_tokens, indentation, max_line_length, + start_on_prefix_line=True) + if fixed and check_syntax(normalize_multiline(fixed.lstrip())): + yield fixed + + fixed = _reflow_lines(parsed_tokens, indentation, max_line_length, + start_on_prefix_line=False) + if fixed and check_syntax(normalize_multiline(fixed.lstrip())): + yield fixed + + +def _shorten_line_at_tokens(tokens, source, indentation, indent_word, + key_token_strings, aggressive): + """Separate line by breaking at tokens in key_token_strings. + + The input is expected to be free of newlines except for inside + multiline strings and at the end. + + """ + offsets = [] + for (index, _t) in enumerate(token_offsets(tokens)): + (token_type, + token_string, + start_offset, + end_offset) = _t + + assert token_type != token.INDENT + + if token_string in key_token_strings: + # Do not break in containers with zero or one items. + unwanted_next_token = { + '(': ')', + '[': ']', + '{': '}'}.get(token_string) + if unwanted_next_token: + if ( + get_item(tokens, + index + 1, + default=[None, None])[1] == unwanted_next_token or + get_item(tokens, + index + 2, + default=[None, None])[1] == unwanted_next_token + ): + continue + + if ( + index > 2 and token_string == '(' and + tokens[index - 1][1] in ',(%[' + ): + # Don't split after a tuple start, or before a tuple start if + # the tuple is in a list. + continue + + if end_offset < len(source) - 1: + # Don't split right before newline. + offsets.append(end_offset) + else: + # Break at adjacent strings. These were probably meant to be on + # separate lines in the first place. + previous_token = get_item(tokens, index - 1) + if ( + token_type == tokenize.STRING and + previous_token and previous_token[0] == tokenize.STRING + ): + offsets.append(start_offset) + + current_indent = None + fixed = None + for line in split_at_offsets(source, offsets): + if fixed: + fixed += '\n' + current_indent + line + + for symbol in '([{': + if line.endswith(symbol): + current_indent += indent_word + else: + # First line. + fixed = line + assert not current_indent + current_indent = indent_word + + assert fixed is not None + + if check_syntax(normalize_multiline(fixed) + if aggressive > 1 else fixed): + return indentation + fixed + + return None + + +def token_offsets(tokens): + """Yield tokens and offsets.""" + end_offset = 0 + previous_end_row = 0 + previous_end_column = 0 + for t in tokens: + token_type = t[0] + token_string = t[1] + (start_row, start_column) = t[2] + (end_row, end_column) = t[3] + + # Account for the whitespace between tokens. + end_offset += start_column + if previous_end_row == start_row: + end_offset -= previous_end_column + + # Record the start offset of the token. + start_offset = end_offset + + # Account for the length of the token itself. + end_offset += len(token_string) + + yield (token_type, + token_string, + start_offset, + end_offset) + + previous_end_row = end_row + previous_end_column = end_column + + +def normalize_multiline(line): + """Normalize multiline-related code that will cause syntax error. + + This is for purposes of checking syntax. + + """ + if line.startswith('def ') and line.rstrip().endswith(':'): + return line + ' pass' + elif line.startswith('return '): + return 'def _(): ' + line + elif line.startswith('@'): + return line + 'def _(): pass' + elif line.startswith('class '): + return line + ' pass' + elif line.startswith(('if ', 'elif ', 'for ', 'while ')): + return line + ' pass' + + return line + + +def fix_whitespace(line, offset, replacement): + """Replace whitespace at offset and return fixed line.""" + # Replace escaped newlines too + left = line[:offset].rstrip('\n\r \t\\') + right = line[offset:].lstrip('\n\r \t\\') + if right.startswith('#'): + return line + + return left + replacement + right + + +def _execute_pep8(pep8_options, source): + """Execute pycodestyle via python method calls.""" + class QuietReport(pycodestyle.BaseReport): + + """Version of checker that does not print.""" + + def __init__(self, options): + super(QuietReport, self).__init__(options) + self.__full_error_results = [] + + def error(self, line_number, offset, text, check): + """Collect errors.""" + code = super(QuietReport, self).error(line_number, + offset, + text, + check) + if code: + self.__full_error_results.append( + {'id': code, + 'line': line_number, + 'column': offset + 1, + 'info': text}) + + def full_error_results(self): + """Return error results in detail. + + Results are in the form of a list of dictionaries. Each + dictionary contains 'id', 'line', 'column', and 'info'. + + """ + return self.__full_error_results + + checker = pycodestyle.Checker('', lines=source, reporter=QuietReport, + **pep8_options) + checker.check_all() + return checker.report.full_error_results() + + +def _remove_leading_and_normalize(line): + # ignore FF in first lstrip() + return line.lstrip(' \t\v').rstrip(CR + LF) + '\n' + + +class Reindenter(object): + + """Reindents badly-indented code to uniformly use four-space indentation. + + Released to the public domain, by Tim Peters, 03 October 2000. + + """ + + def __init__(self, input_text): + sio = io.StringIO(input_text) + source_lines = sio.readlines() + + self.string_content_line_numbers = multiline_string_lines(input_text) + + # File lines, rstripped & tab-expanded. Dummy at start is so + # that we can use tokenize's 1-based line numbering easily. + # Note that a line is all-blank iff it is a newline. + self.lines = [] + for line_number, line in enumerate(source_lines, start=1): + # Do not modify if inside a multiline string. + if line_number in self.string_content_line_numbers: + self.lines.append(line) + else: + # Only expand leading tabs. + self.lines.append(_get_indentation(line).expandtabs() + + _remove_leading_and_normalize(line)) + + self.lines.insert(0, None) + self.index = 1 # index into self.lines of next line + self.input_text = input_text + + def run(self, indent_size=DEFAULT_INDENT_SIZE): + """Fix indentation and return modified line numbers. + + Line numbers are indexed at 1. + + """ + if indent_size < 1: + return self.input_text + + try: + stats = _reindent_stats(tokenize.generate_tokens(self.getline)) + except (SyntaxError, tokenize.TokenError): + return self.input_text + # Remove trailing empty lines. + lines = self.lines + # Sentinel. + stats.append((len(lines), 0)) + # Map count of leading spaces to # we want. + have2want = {} + # Program after transformation. + after = [] + # Copy over initial empty lines -- there's nothing to do until + # we see a line with *something* on it. + i = stats[0][0] + after.extend(lines[1:i]) + for i in range(len(stats) - 1): + thisstmt, thislevel = stats[i] + nextstmt = stats[i + 1][0] + have = _leading_space_count(lines[thisstmt]) + want = thislevel * indent_size + if want < 0: + # A comment line. + if have: + # An indented comment line. If we saw the same + # indentation before, reuse what it most recently + # mapped to. + want = have2want.get(have, -1) + if want < 0: + # Then it probably belongs to the next real stmt. + for j in range(i + 1, len(stats) - 1): + jline, jlevel = stats[j] + if jlevel >= 0: + if have == _leading_space_count(lines[jline]): + want = jlevel * indent_size + break + if want < 0: # Maybe it's a hanging + # comment like this one, + # in which case we should shift it like its base + # line got shifted. + for j in range(i - 1, -1, -1): + jline, jlevel = stats[j] + if jlevel >= 0: + want = (have + _leading_space_count( + after[jline - 1]) - + _leading_space_count(lines[jline])) + break + if want < 0: + # Still no luck -- leave it alone. + want = have + else: + want = 0 + assert want >= 0 + have2want[have] = want + diff = want - have + if diff == 0 or have == 0: + after.extend(lines[thisstmt:nextstmt]) + else: + for line_number, line in enumerate(lines[thisstmt:nextstmt], + start=thisstmt): + if line_number in self.string_content_line_numbers: + after.append(line) + elif diff > 0: + if line == '\n': + after.append(line) + else: + after.append(' ' * diff + line) + else: + remove = min(_leading_space_count(line), -diff) + after.append(line[remove:]) + + return ''.join(after) + + def getline(self): + """Line-getter for tokenize.""" + if self.index >= len(self.lines): + line = '' + else: + line = self.lines[self.index] + self.index += 1 + return line + + +def _reindent_stats(tokens): + """Return list of (lineno, indentlevel) pairs. + + One for each stmt and comment line. indentlevel is -1 for comment + lines, as a signal that tokenize doesn't know what to do about them; + indeed, they're our headache! + + """ + find_stmt = 1 # Next token begins a fresh stmt? + level = 0 # Current indent level. + stats = [] + + for t in tokens: + token_type = t[0] + sline = t[2][0] + line = t[4] + + if token_type == tokenize.NEWLINE: + # A program statement, or ENDMARKER, will eventually follow, + # after some (possibly empty) run of tokens of the form + # (NL | COMMENT)* (INDENT | DEDENT+)? + find_stmt = 1 + + elif token_type == tokenize.INDENT: + find_stmt = 1 + level += 1 + + elif token_type == tokenize.DEDENT: + find_stmt = 1 + level -= 1 + + elif token_type == tokenize.COMMENT: + if find_stmt: + stats.append((sline, -1)) + # But we're still looking for a new stmt, so leave + # find_stmt alone. + + elif token_type == tokenize.NL: + pass + + elif find_stmt: + # This is the first "real token" following a NEWLINE, so it + # must be the first token of the next program statement, or an + # ENDMARKER. + find_stmt = 0 + if line: # Not endmarker. + stats.append((sline, level)) + + return stats + + +def _leading_space_count(line): + """Return number of leading spaces in line.""" + i = 0 + while i < len(line) and line[i] == ' ': + i += 1 + return i + + +def refactor_with_2to3(source_text, fixer_names, filename=''): + """Use lib2to3 to refactor the source. + + Return the refactored source code. + + """ + from lib2to3.refactor import RefactoringTool + fixers = ['lib2to3.fixes.fix_' + name for name in fixer_names] + tool = RefactoringTool(fixer_names=fixers, explicit=fixers) + + from lib2to3.pgen2 import tokenize as lib2to3_tokenize + try: + # The name parameter is necessary particularly for the "import" fixer. + return unicode(tool.refactor_string(source_text, name=filename)) + except lib2to3_tokenize.TokenError: + return source_text + + +def check_syntax(code): + """Return True if syntax is okay.""" + try: + return compile(code, '', 'exec', dont_inherit=True) + except (SyntaxError, TypeError, ValueError): + return False + + +def filter_results(source, results, aggressive): + """Filter out spurious reports from pycodestyle. + + If aggressive is True, we allow possibly unsafe fixes (E711, E712). + + """ + non_docstring_string_line_numbers = multiline_string_lines( + source, include_docstrings=False) + all_string_line_numbers = multiline_string_lines( + source, include_docstrings=True) + + commented_out_code_line_numbers = commented_out_code_lines(source) + + has_e901 = any(result['id'].lower() == 'e901' for result in results) + + for r in results: + issue_id = r['id'].lower() + + if r['line'] in non_docstring_string_line_numbers: + if issue_id.startswith(('e1', 'e501', 'w191')): + continue + + if r['line'] in all_string_line_numbers: + if issue_id in ['e501']: + continue + + # We must offset by 1 for lines that contain the trailing contents of + # multiline strings. + if not aggressive and (r['line'] + 1) in all_string_line_numbers: + # Do not modify multiline strings in non-aggressive mode. Remove + # trailing whitespace could break doctests. + if issue_id.startswith(('w29', 'w39')): + continue + + if aggressive <= 0: + if issue_id.startswith(('e711', 'e72', 'w6')): + continue + + if aggressive <= 1: + if issue_id.startswith(('e712', 'e713', 'e714')): + continue + + if aggressive <= 2: + if issue_id.startswith(('e704')): + continue + + if r['line'] in commented_out_code_line_numbers: + if issue_id.startswith(('e26', 'e501')): + continue + + # Do not touch indentation if there is a token error caused by + # incomplete multi-line statement. Otherwise, we risk screwing up the + # indentation. + if has_e901: + if issue_id.startswith(('e1', 'e7')): + continue + + yield r + + +def multiline_string_lines(source, include_docstrings=False): + """Return line numbers that are within multiline strings. + + The line numbers are indexed at 1. + + Docstrings are ignored. + + """ + line_numbers = set() + previous_token_type = '' + try: + for t in generate_tokens(source): + token_type = t[0] + start_row = t[2][0] + end_row = t[3][0] + + if token_type == tokenize.STRING and start_row != end_row: + if ( + include_docstrings or + previous_token_type != tokenize.INDENT + ): + # We increment by one since we want the contents of the + # string. + line_numbers |= set(range(1 + start_row, 1 + end_row)) + + previous_token_type = token_type + except (SyntaxError, tokenize.TokenError): + pass + + return line_numbers + + +def commented_out_code_lines(source): + """Return line numbers of comments that are likely code. + + Commented-out code is bad practice, but modifying it just adds even + more clutter. + + """ + line_numbers = [] + try: + for t in generate_tokens(source): + token_type = t[0] + token_string = t[1] + start_row = t[2][0] + line = t[4] + + # Ignore inline comments. + if not line.lstrip().startswith('#'): + continue + + if token_type == tokenize.COMMENT: + stripped_line = token_string.lstrip('#').strip() + if ( + ' ' in stripped_line and + '#' not in stripped_line and + check_syntax(stripped_line) + ): + line_numbers.append(start_row) + except (SyntaxError, tokenize.TokenError): + pass + + return line_numbers + + +def shorten_comment(line, max_line_length, last_comment=False): + """Return trimmed or split long comment line. + + If there are no comments immediately following it, do a text wrap. + Doing this wrapping on all comments in general would lead to jagged + comment text. + + """ + assert len(line) > max_line_length + line = line.rstrip() + + # PEP 8 recommends 72 characters for comment text. + indentation = _get_indentation(line) + '# ' + max_line_length = min(max_line_length, + len(indentation) + 72) + + MIN_CHARACTER_REPEAT = 5 + if ( + len(line) - len(line.rstrip(line[-1])) >= MIN_CHARACTER_REPEAT and + not line[-1].isalnum() + ): + # Trim comments that end with things like --------- + return line[:max_line_length] + '\n' + elif last_comment and re.match(r'\s*#+\s*\w+', line): + split_lines = textwrap.wrap(line.lstrip(' \t#'), + initial_indent=indentation, + subsequent_indent=indentation, + width=max_line_length, + break_long_words=False, + break_on_hyphens=False) + return '\n'.join(split_lines) + '\n' + + return line + '\n' + + +def normalize_line_endings(lines, newline): + """Return fixed line endings. + + All lines will be modified to use the most common line ending. + + """ + return [line.rstrip('\n\r') + newline for line in lines] + + +def mutual_startswith(a, b): + return b.startswith(a) or a.startswith(b) + + +def code_match(code, select, ignore): + if ignore: + assert not isinstance(ignore, unicode) + for ignored_code in [c.strip() for c in ignore]: + if mutual_startswith(code.lower(), ignored_code.lower()): + return False + + if select: + assert not isinstance(select, unicode) + for selected_code in [c.strip() for c in select]: + if mutual_startswith(code.lower(), selected_code.lower()): + return True + return False + + return True + + +def fix_code(source, options=None, encoding=None, apply_config=False): + """Return fixed source code. + + "encoding" will be used to decode "source" if it is a byte string. + + """ + options = _get_options(options, apply_config) + + if not isinstance(source, unicode): + source = source.decode(encoding or get_encoding()) + + sio = io.StringIO(source) + return fix_lines(sio.readlines(), options=options) + + +def _get_options(raw_options, apply_config): + """Return parsed options.""" + if not raw_options: + return parse_args([''], apply_config=apply_config) + + if isinstance(raw_options, dict): + options = parse_args([''], apply_config=apply_config) + for name, value in raw_options.items(): + if not hasattr(options, name): + raise ValueError("No such option '{}'".format(name)) + + # Check for very basic type errors. + expected_type = type(getattr(options, name)) + if not isinstance(expected_type, (str, unicode)): + if isinstance(value, (str, unicode)): + raise ValueError( + "Option '{}' should not be a string".format(name)) + setattr(options, name, value) + else: + options = raw_options + + return options + + +def fix_lines(source_lines, options, filename=''): + """Return fixed source code.""" + # Transform everything to line feed. Then change them back to original + # before returning fixed source code. + original_newline = find_newline(source_lines) + tmp_source = ''.join(normalize_line_endings(source_lines, '\n')) + + # Keep a history to break out of cycles. + previous_hashes = set() + + if options.line_range: + # Disable "apply_local_fixes()" for now due to issue #175. + fixed_source = tmp_source + else: + pep8_options = { + 'ignore': options.ignore, + 'select': options.select, + 'max_line_length': options.max_line_length, + 'hang_closing': options.hang_closing, + } + sio = io.StringIO(tmp_source) + contents = sio.readlines() + results = _execute_pep8(pep8_options, contents) + codes = {result['id'] for result in results + if result['id'] in SELECTED_GLOBAL_FIXED_METHOD_CODES} + # Apply global fixes only once (for efficiency). + fixed_source = apply_global_fixes(tmp_source, + options, + filename=filename, + codes=codes) + + passes = 0 + long_line_ignore_cache = set() + while hash(fixed_source) not in previous_hashes: + if options.pep8_passes >= 0 and passes > options.pep8_passes: + break + passes += 1 + + previous_hashes.add(hash(fixed_source)) + + tmp_source = copy.copy(fixed_source) + + fix = FixPEP8( + filename, + options, + contents=tmp_source, + long_line_ignore_cache=long_line_ignore_cache) + + fixed_source = fix.fix() + + sio = io.StringIO(fixed_source) + return ''.join(normalize_line_endings(sio.readlines(), original_newline)) + + +def fix_file(filename, options=None, output=None, apply_config=False): + if not options: + options = parse_args([filename], apply_config=apply_config) + + original_source = readlines_from_file(filename) + + fixed_source = original_source + + if options.in_place or output: + encoding = detect_encoding(filename) + + if output: + output = LineEndingWrapper(wrap_output(output, encoding=encoding)) + + fixed_source = fix_lines(fixed_source, options, filename=filename) + + if options.diff: + new = io.StringIO(fixed_source) + new = new.readlines() + diff = get_diff_text(original_source, new, filename) + if output: + output.write(diff) + output.flush() + return diff + elif options.in_place: + fp = open_with_encoding(filename, encoding=encoding, mode='w') + fp.write(fixed_source) + fp.close() + original = "".join(original_source).splitlines() + fixed = fixed_source.splitlines() + if original != fixed: + return fixed_source + else: + return '' + else: + if output: + output.write(fixed_source) + output.flush() + return fixed_source + + +def global_fixes(): + """Yield multiple (code, function) tuples.""" + for function in list(globals().values()): + if inspect.isfunction(function): + arguments = _get_parameters(function) + if arguments[:1] != ['source']: + continue + + code = extract_code_from_function(function) + if code: + yield (code, function) + + +def _get_parameters(function): + # pylint: disable=deprecated-method + if sys.version_info.major >= 3: + # We need to match "getargspec()", which includes "self" as the first + # value for methods. + # https://bugs.python.org/issue17481#msg209469 + if inspect.ismethod(function): + function = function.__func__ + + return list(inspect.signature(function).parameters) + else: + return inspect.getargspec(function)[0] + + +def apply_global_fixes(source, options, where='global', filename='', + codes=None): + """Run global fixes on source code. + + These are fixes that only need be done once (unlike those in + FixPEP8, which are dependent on pycodestyle). + + """ + if codes is None: + codes = [] + if any(code_match(code, select=options.select, ignore=options.ignore) + for code in ['E101', 'E111']): + source = reindent(source, + indent_size=options.indent_size) + + for (code, function) in global_fixes(): + if code.upper() in SELECTED_GLOBAL_FIXED_METHOD_CODES \ + and code.upper() not in codes: + continue + if code_match(code, select=options.select, ignore=options.ignore): + if options.verbose: + print('---> Applying {} fix for {}'.format(where, + code.upper()), + file=sys.stderr) + source = function(source, + aggressive=options.aggressive) + + source = fix_2to3(source, + aggressive=options.aggressive, + select=options.select, + ignore=options.ignore, + filename=filename, + where=where, + verbose=options.verbose) + + return source + + +def extract_code_from_function(function): + """Return code handled by function.""" + if not function.__name__.startswith('fix_'): + return None + + code = re.sub('^fix_', '', function.__name__) + if not code: + return None + + try: + int(code[1:]) + except ValueError: + return None + + return code + + +def _get_package_version(): + packages = ["pycodestyle: {}".format(pycodestyle.__version__)] + return ", ".join(packages) + + +def create_parser(): + """Return command-line parser.""" + parser = argparse.ArgumentParser(description=docstring_summary(__doc__), + prog='autopep8') + parser.add_argument('--version', action='version', + version='%(prog)s {} ({})'.format( + __version__, _get_package_version())) + parser.add_argument('-v', '--verbose', action='count', + default=0, + help='print verbose messages; ' + 'multiple -v result in more verbose messages') + parser.add_argument('-d', '--diff', action='store_true', + help='print the diff for the fixed source') + parser.add_argument('-i', '--in-place', action='store_true', + help='make changes to files in place') + parser.add_argument('--global-config', metavar='filename', + default=DEFAULT_CONFIG, + help='path to a global pep8 config file; if this file ' + 'does not exist then this is ignored ' + '(default: {})'.format(DEFAULT_CONFIG)) + parser.add_argument('--ignore-local-config', action='store_true', + help="don't look for and apply local config files; " + 'if not passed, defaults are updated with any ' + "config files in the project's root directory") + parser.add_argument('-r', '--recursive', action='store_true', + help='run recursively over directories; ' + 'must be used with --in-place or --diff') + parser.add_argument('-j', '--jobs', type=int, metavar='n', default=1, + help='number of parallel jobs; ' + 'match CPU count if value is less than 1') + parser.add_argument('-p', '--pep8-passes', metavar='n', + default=-1, type=int, + help='maximum number of additional pep8 passes ' + '(default: infinite)') + parser.add_argument('-a', '--aggressive', action='count', default=0, + help='enable non-whitespace changes; ' + 'multiple -a result in more aggressive changes') + parser.add_argument('--experimental', action='store_true', + help='enable experimental fixes') + parser.add_argument('--exclude', metavar='globs', + help='exclude file/directory names that match these ' + 'comma-separated globs') + parser.add_argument('--list-fixes', action='store_true', + help='list codes for fixes; ' + 'used by --ignore and --select') + parser.add_argument('--ignore', metavar='errors', default='', + help='do not fix these errors/warnings ' + '(default: {})'.format(DEFAULT_IGNORE)) + parser.add_argument('--select', metavar='errors', default='', + help='fix only these errors/warnings (e.g. E4,W)') + parser.add_argument('--max-line-length', metavar='n', default=79, type=int, + help='set maximum allowed line length ' + '(default: %(default)s)') + parser.add_argument('--line-range', '--range', metavar='line', + default=None, type=int, nargs=2, + help='only fix errors found within this inclusive ' + 'range of line numbers (e.g. 1 99); ' + 'line numbers are indexed at 1') + parser.add_argument('--indent-size', default=DEFAULT_INDENT_SIZE, + type=int, help=argparse.SUPPRESS) + parser.add_argument('--hang-closing', action='store_true', + help='hang-closing option passed to pycodestyle') + parser.add_argument('--exit-code', action='store_true', + help='change to behavior of exit code.' + ' default behavior of return value, 0 is no ' + 'differences, 1 is error exit. return 2 when' + ' add this option. 2 is exists differences.') + parser.add_argument('files', nargs='*', + help="files to format or '-' for standard in") + + return parser + + +def parse_args(arguments, apply_config=False): + """Parse command-line options.""" + parser = create_parser() + args = parser.parse_args(arguments) + + if not args.files and not args.list_fixes: + parser.error('incorrect number of arguments') + + args.files = [decode_filename(name) for name in args.files] + + if apply_config: + parser = read_config(args, parser) + args = parser.parse_args(arguments) + args.files = [decode_filename(name) for name in args.files] + + if '-' in args.files: + if len(args.files) > 1: + parser.error('cannot mix stdin and regular files') + + if args.diff: + parser.error('--diff cannot be used with standard input') + + if args.in_place: + parser.error('--in-place cannot be used with standard input') + + if args.recursive: + parser.error('--recursive cannot be used with standard input') + + if len(args.files) > 1 and not (args.in_place or args.diff): + parser.error('autopep8 only takes one filename as argument ' + 'unless the "--in-place" or "--diff" args are ' + 'used') + + if args.recursive and not (args.in_place or args.diff): + parser.error('--recursive must be used with --in-place or --diff') + + if args.in_place and args.diff: + parser.error('--in-place and --diff are mutually exclusive') + + if args.max_line_length <= 0: + parser.error('--max-line-length must be greater than 0') + + if args.select: + args.select = _split_comma_separated(args.select) + + if args.ignore: + args.ignore = _split_comma_separated(args.ignore) + elif not args.select: + if args.aggressive: + # Enable everything by default if aggressive. + args.select = {'E', 'W1', 'W2', 'W3', 'W6'} + else: + args.ignore = _split_comma_separated(DEFAULT_IGNORE) + + if args.exclude: + args.exclude = _split_comma_separated(args.exclude) + else: + args.exclude = {} + + if args.jobs < 1: + # Do not import multiprocessing globally in case it is not supported + # on the platform. + import multiprocessing + args.jobs = multiprocessing.cpu_count() + + if args.jobs > 1 and not args.in_place: + parser.error('parallel jobs requires --in-place') + + if args.line_range: + if args.line_range[0] <= 0: + parser.error('--range must be positive numbers') + if args.line_range[0] > args.line_range[1]: + parser.error('First value of --range should be less than or equal ' + 'to the second') + + return args + + +def read_config(args, parser): + """Read both user configuration and local configuration.""" + try: + from configparser import ConfigParser as SafeConfigParser + from configparser import Error + except ImportError: + from ConfigParser import SafeConfigParser + from ConfigParser import Error + + config = SafeConfigParser() + + try: + config.read(args.global_config) + + if not args.ignore_local_config: + parent = tail = args.files and os.path.abspath( + os.path.commonprefix(args.files)) + while tail: + if config.read([os.path.join(parent, fn) + for fn in PROJECT_CONFIG]): + break + (parent, tail) = os.path.split(parent) + + defaults = {} + option_list = {o.dest: o.type or type(o.default) + for o in parser._actions} + + for section in ['pep8', 'pycodestyle', 'flake8']: + if not config.has_section(section): + continue + for (k, _) in config.items(section): + norm_opt = k.lstrip('-').replace('-', '_') + if not option_list.get(norm_opt): + continue + opt_type = option_list[norm_opt] + if opt_type is int: + value = config.getint(section, k) + elif opt_type is bool: + value = config.getboolean(section, k) + else: + value = config.get(section, k) + if args.verbose: + print("enable config: section={}, key={}, value={}".format( + section, k, value)) + defaults[norm_opt] = value + + parser.set_defaults(**defaults) + except Error: + # Ignore for now. + pass + + return parser + + +def _split_comma_separated(string): + """Return a set of strings.""" + return {text.strip() for text in string.split(',') if text.strip()} + + +def decode_filename(filename): + """Return Unicode filename.""" + if isinstance(filename, unicode): + return filename + + return filename.decode(sys.getfilesystemencoding()) + + +def supported_fixes(): + """Yield pep8 error codes that autopep8 fixes. + + Each item we yield is a tuple of the code followed by its + description. + + """ + yield ('E101', docstring_summary(reindent.__doc__)) + + instance = FixPEP8(filename=None, options=None, contents='') + for attribute in dir(instance): + code = re.match('fix_([ew][0-9][0-9][0-9])', attribute) + if code: + yield ( + code.group(1).upper(), + re.sub(r'\s+', ' ', + docstring_summary(getattr(instance, attribute).__doc__)) + ) + + for (code, function) in sorted(global_fixes()): + yield (code.upper() + (4 - len(code)) * ' ', + re.sub(r'\s+', ' ', docstring_summary(function.__doc__))) + + for code in sorted(CODE_TO_2TO3): + yield (code.upper() + (4 - len(code)) * ' ', + re.sub(r'\s+', ' ', docstring_summary(fix_2to3.__doc__))) + + +def docstring_summary(docstring): + """Return summary of docstring.""" + return docstring.split('\n')[0] if docstring else '' + + +def line_shortening_rank(candidate, indent_word, max_line_length, + experimental=False): + """Return rank of candidate. + + This is for sorting candidates. + + """ + if not candidate.strip(): + return 0 + + rank = 0 + lines = candidate.rstrip().split('\n') + + offset = 0 + if ( + not lines[0].lstrip().startswith('#') and + lines[0].rstrip()[-1] not in '([{' + ): + for (opening, closing) in ('()', '[]', '{}'): + # Don't penalize empty containers that aren't split up. Things like + # this "foo(\n )" aren't particularly good. + opening_loc = lines[0].find(opening) + closing_loc = lines[0].find(closing) + if opening_loc >= 0: + if closing_loc < 0 or closing_loc != opening_loc + 1: + offset = max(offset, 1 + opening_loc) + + current_longest = max(offset + len(x.strip()) for x in lines) + + rank += 4 * max(0, current_longest - max_line_length) + + rank += len(lines) + + # Too much variation in line length is ugly. + rank += 2 * standard_deviation(len(line) for line in lines) + + bad_staring_symbol = { + '(': ')', + '[': ']', + '{': '}'}.get(lines[0][-1]) + + if len(lines) > 1: + if ( + bad_staring_symbol and + lines[1].lstrip().startswith(bad_staring_symbol) + ): + rank += 20 + + for lineno, current_line in enumerate(lines): + current_line = current_line.strip() + + if current_line.startswith('#'): + continue + + for bad_start in ['.', '%', '+', '-', '/']: + if current_line.startswith(bad_start): + rank += 100 + + # Do not tolerate operators on their own line. + if current_line == bad_start: + rank += 1000 + + if ( + current_line.endswith(('.', '%', '+', '-', '/')) and + "': " in current_line + ): + rank += 1000 + + if current_line.endswith(('(', '[', '{', '.')): + # Avoid lonely opening. They result in longer lines. + if len(current_line) <= len(indent_word): + rank += 100 + + # Avoid the ugliness of ", (\n". + if ( + current_line.endswith('(') and + current_line[:-1].rstrip().endswith(',') + ): + rank += 100 + + # Avoid the ugliness of "something[\n" and something[index][\n. + if ( + current_line.endswith('[') and + len(current_line) > 1 and + (current_line[-2].isalnum() or current_line[-2] in ']') + ): + rank += 300 + + # Also avoid the ugliness of "foo.\nbar" + if current_line.endswith('.'): + rank += 100 + + if has_arithmetic_operator(current_line): + rank += 100 + + # Avoid breaking at unary operators. + if re.match(r'.*[(\[{]\s*[\-\+~]$', current_line.rstrip('\\ ')): + rank += 1000 + + if re.match(r'.*lambda\s*\*$', current_line.rstrip('\\ ')): + rank += 1000 + + if current_line.endswith(('%', '(', '[', '{')): + rank -= 20 + + # Try to break list comprehensions at the "for". + if current_line.startswith('for '): + rank -= 50 + + if current_line.endswith('\\'): + # If a line ends in \-newline, it may be part of a + # multiline string. In that case, we would like to know + # how long that line is without the \-newline. If it's + # longer than the maximum, or has comments, then we assume + # that the \-newline is an okay candidate and only + # penalize it a bit. + total_len = len(current_line) + lineno += 1 + while lineno < len(lines): + total_len += len(lines[lineno]) + + if lines[lineno].lstrip().startswith('#'): + total_len = max_line_length + break + + if not lines[lineno].endswith('\\'): + break + + lineno += 1 + + if total_len < max_line_length: + rank += 10 + else: + rank += 100 if experimental else 1 + + # Prefer breaking at commas rather than colon. + if ',' in current_line and current_line.endswith(':'): + rank += 10 + + # Avoid splitting dictionaries between key and value. + if current_line.endswith(':'): + rank += 100 + + rank += 10 * count_unbalanced_brackets(current_line) + + return max(0, rank) + + +def standard_deviation(numbers): + """Return standard deviation.""" + numbers = list(numbers) + if not numbers: + return 0 + mean = sum(numbers) / len(numbers) + return (sum((n - mean) ** 2 for n in numbers) / + len(numbers)) ** .5 + + +def has_arithmetic_operator(line): + """Return True if line contains any arithmetic operators.""" + for operator in pycodestyle.ARITHMETIC_OP: + if operator in line: + return True + + return False + + +def count_unbalanced_brackets(line): + """Return number of unmatched open/close brackets.""" + count = 0 + for opening, closing in ['()', '[]', '{}']: + count += abs(line.count(opening) - line.count(closing)) + + return count + + +def split_at_offsets(line, offsets): + """Split line at offsets. + + Return list of strings. + + """ + result = [] + + previous_offset = 0 + current_offset = 0 + for current_offset in sorted(offsets): + if current_offset < len(line) and previous_offset != current_offset: + result.append(line[previous_offset:current_offset].strip()) + previous_offset = current_offset + + result.append(line[current_offset:]) + + return result + + +class LineEndingWrapper(object): + + r"""Replace line endings to work with sys.stdout. + + It seems that sys.stdout expects only '\n' as the line ending, no matter + the platform. Otherwise, we get repeated line endings. + + """ + + def __init__(self, output): + self.__output = output + + def write(self, s): + self.__output.write(s.replace('\r\n', '\n').replace('\r', '\n')) + + def flush(self): + self.__output.flush() + + +def match_file(filename, exclude): + """Return True if file is okay for modifying/recursing.""" + base_name = os.path.basename(filename) + + if base_name.startswith('.'): + return False + + for pattern in exclude: + if fnmatch.fnmatch(base_name, pattern): + return False + if fnmatch.fnmatch(filename, pattern): + return False + + if not os.path.isdir(filename) and not is_python_file(filename): + return False + + return True + + +def find_files(filenames, recursive, exclude): + """Yield filenames.""" + while filenames: + name = filenames.pop(0) + if recursive and os.path.isdir(name): + for root, directories, children in os.walk(name): + filenames += [os.path.join(root, f) for f in children + if match_file(os.path.join(root, f), + exclude)] + directories[:] = [d for d in directories + if match_file(os.path.join(root, d), + exclude)] + else: + yield name + + +def _fix_file(parameters): + """Helper function for optionally running fix_file() in parallel.""" + if parameters[1].verbose: + print('[file:{}]'.format(parameters[0]), file=sys.stderr) + try: + return fix_file(*parameters) + except IOError as error: + print(unicode(error), file=sys.stderr) + + +def fix_multiple_files(filenames, options, output=None): + """Fix list of files. + + Optionally fix files recursively. + + """ + results = [] + filenames = find_files(filenames, options.recursive, options.exclude) + if options.jobs > 1: + import multiprocessing + pool = multiprocessing.Pool(options.jobs) + ret = pool.map(_fix_file, [(name, options) for name in filenames]) + results.extend(filter(lambda x: x is not None, ret)) + else: + for name in filenames: + ret = _fix_file((name, options, output)) + if ret is None: + continue + if options.diff or options.in_place: + if ret != '': + results.append(ret) + else: + original_source = readlines_from_file(name) + if "".join(original_source).splitlines() != ret.splitlines(): + results.append(ret) + return results + + +def is_python_file(filename): + """Return True if filename is Python file.""" + if filename.endswith('.py'): + return True + + try: + with open_with_encoding( + filename, + limit_byte_check=MAX_PYTHON_FILE_DETECTION_BYTES) as f: + text = f.read(MAX_PYTHON_FILE_DETECTION_BYTES) + if not text: + return False + first_line = text.splitlines()[0] + except (IOError, IndexError): + return False + + if not PYTHON_SHEBANG_REGEX.match(first_line): + return False + + return True + + +def is_probably_part_of_multiline(line): + """Return True if line is likely part of a multiline string. + + When multiline strings are involved, pep8 reports the error as being + at the start of the multiline string, which doesn't work for us. + + """ + return ( + '"""' in line or + "'''" in line or + line.rstrip().endswith('\\') + ) + + +def wrap_output(output, encoding): + """Return output with specified encoding.""" + return codecs.getwriter(encoding)(output.buffer + if hasattr(output, 'buffer') + else output) + + +def get_encoding(): + """Return preferred encoding.""" + return locale.getpreferredencoding() or sys.getdefaultencoding() + + +def main(argv=None, apply_config=True): + """Command-line entry.""" + if argv is None: + argv = sys.argv + + try: + # Exit on broken pipe. + signal.signal(signal.SIGPIPE, signal.SIG_DFL) + except AttributeError: # pragma: no cover + # SIGPIPE is not available on Windows. + pass + + try: + args = parse_args(argv[1:], apply_config=apply_config) + + if args.list_fixes: + for code, description in sorted(supported_fixes()): + print('{code} - {description}'.format( + code=code, description=description)) + return EXIT_CODE_OK + + if args.files == ['-']: + assert not args.in_place + + encoding = sys.stdin.encoding or get_encoding() + + # LineEndingWrapper is unnecessary here due to the symmetry between + # standard in and standard out. + wrap_output(sys.stdout, encoding=encoding).write( + fix_code(sys.stdin.read(), args, encoding=encoding)) + else: + if args.in_place or args.diff: + args.files = list(set(args.files)) + else: + assert len(args.files) == 1 + assert not args.recursive + + ret = fix_multiple_files(args.files, args, sys.stdout) + if args.exit_code and len(ret) > 0: + return EXIT_CODE_EXISTS_DIFF + except KeyboardInterrupt: + return EXIT_CODE_ERROR # pragma: no cover + + +class CachedTokenizer(object): + + """A one-element cache around tokenize.generate_tokens(). + + Original code written by Ned Batchelder, in coverage.py. + + """ + + def __init__(self): + self.last_text = None + self.last_tokens = None + + def generate_tokens(self, text): + """A stand-in for tokenize.generate_tokens().""" + if text != self.last_text: + string_io = io.StringIO(text) + self.last_tokens = list( + tokenize.generate_tokens(string_io.readline) + ) + self.last_text = text + return self.last_tokens + + +_cached_tokenizer = CachedTokenizer() +generate_tokens = _cached_tokenizer.generate_tokens + + +if __name__ == '__main__': + sys.exit(main())