Skip to content

Commit

Permalink
gh-105069: Add a readline-like callable to the tokenizer to consume i…
Browse files Browse the repository at this point in the history
…nput iteratively

Signed-off-by: Pablo Galindo <pablogsal@gmail.com>
  • Loading branch information
pablogsal committed May 29, 2023
1 parent 39f6a04 commit 0a661c4
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 97 deletions.
113 changes: 64 additions & 49 deletions Lib/test/test_tokenize.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from test import support
from test.support import os_helper
from tokenize import (tokenize, _tokenize, untokenize, NUMBER, NAME, OP,
from tokenize import (tokenize, untokenize, NUMBER, NAME, OP,
STRING, ENDMARKER, ENCODING, tok_name, detect_encoding,
open as tokenize_open, Untokenizer, generate_tokens,
NEWLINE, _generate_tokens_from_c_tokenizer, DEDENT, TokenInfo)
Expand Down Expand Up @@ -50,6 +50,13 @@ def check_tokenize(self, s, expected):
self.assertEqual(result,
[" ENCODING 'utf-8' (0, 0) (0, 0)"] +
expected.rstrip().splitlines())

def test_invalid_readline(self):
def gen():
yield "sdfosdg"
yield "sdfosdg"
with self.assertRaises(TypeError):
list(tokenize(gen().__next__))

def test_implicit_newline(self):
# Make sure that the tokenizer puts in an implicit NEWLINE
Expand Down Expand Up @@ -1154,7 +1161,8 @@ class TestTokenizerAdheresToPep0263(TestCase):

def _testFile(self, filename):
path = os.path.join(os.path.dirname(__file__), filename)
TestRoundtrip.check_roundtrip(self, open(path, 'rb'))
with open(path, 'rb') as f:
TestRoundtrip.check_roundtrip(self, f)

def test_utf8_coding_cookie_and_no_utf8_bom(self):
f = 'tokenize_tests-utf8-coding-cookie-and-no-utf8-bom-sig.txt'
Expand Down Expand Up @@ -1199,7 +1207,8 @@ def readline():
yield b''

# skip the initial encoding token and the end tokens
tokens = list(_tokenize(readline(), encoding='utf-8'))[:-2]
tokens = list(_generate_tokens_from_c_tokenizer(readline().__next__, encoding='utf-8',
extra_tokens=True))[:-2]
expected_tokens = [TokenInfo(3, '"ЉЊЈЁЂ"', (1, 0), (1, 7), '"ЉЊЈЁЂ"\n')]
self.assertEqual(tokens, expected_tokens,
"bytes not decoded with encoding")
Expand Down Expand Up @@ -1468,13 +1477,13 @@ def test_tokenize(self):
def mock_detect_encoding(readline):
return encoding, [b'first', b'second']

def mock__tokenize(readline, encoding):
def mock__tokenize(readline, encoding, **kwargs):
nonlocal encoding_used
encoding_used = encoding
out = []
while True:
try:
next_line = next(readline)
next_line = readline()
except StopIteration:
return out
if next_line:
Expand All @@ -1491,16 +1500,16 @@ def mock_readline():
return str(counter).encode()

orig_detect_encoding = tokenize_module.detect_encoding
orig__tokenize = tokenize_module._tokenize
orig_c_token = tokenize_module._generate_tokens_from_c_tokenizer
tokenize_module.detect_encoding = mock_detect_encoding
tokenize_module._tokenize = mock__tokenize
tokenize_module._generate_tokens_from_c_tokenizer = mock__tokenize
try:
results = tokenize(mock_readline)
self.assertEqual(list(results)[1:],
[b'first', b'second', b'1', b'2', b'3', b'4'])
finally:
tokenize_module.detect_encoding = orig_detect_encoding
tokenize_module._tokenize = orig__tokenize
tokenize_module._generate_tokens_from_c_tokenizer = orig_c_token

self.assertEqual(encoding_used, encoding)

Expand Down Expand Up @@ -1827,9 +1836,10 @@ class CTokenizeTest(TestCase):
def check_tokenize(self, s, expected):
# Format the tokens in s in a table format.
# The ENDMARKER and final NEWLINE are omitted.
f = StringIO(s)
with self.subTest(source=s):
result = stringify_tokens_from_source(
_generate_tokens_from_c_tokenizer(s), s
_generate_tokens_from_c_tokenizer(f.readline), s
)
self.assertEqual(result, expected.rstrip().splitlines())

Expand Down Expand Up @@ -2668,43 +2678,44 @@ def test_unicode(self):

def test_invalid_syntax(self):
def get_tokens(string):
return list(_generate_tokens_from_c_tokenizer(string))

self.assertRaises(SyntaxError, get_tokens, "(1+2]")
self.assertRaises(SyntaxError, get_tokens, "(1+2}")
self.assertRaises(SyntaxError, get_tokens, "{1+2]")

self.assertRaises(SyntaxError, get_tokens, "1_")
self.assertRaises(SyntaxError, get_tokens, "1.2_")
self.assertRaises(SyntaxError, get_tokens, "1e2_")
self.assertRaises(SyntaxError, get_tokens, "1e+")

self.assertRaises(SyntaxError, get_tokens, "\xa0")
self.assertRaises(SyntaxError, get_tokens, "€")

self.assertRaises(SyntaxError, get_tokens, "0b12")
self.assertRaises(SyntaxError, get_tokens, "0b1_2")
self.assertRaises(SyntaxError, get_tokens, "0b2")
self.assertRaises(SyntaxError, get_tokens, "0b1_")
self.assertRaises(SyntaxError, get_tokens, "0b")
self.assertRaises(SyntaxError, get_tokens, "0o18")
self.assertRaises(SyntaxError, get_tokens, "0o1_8")
self.assertRaises(SyntaxError, get_tokens, "0o8")
self.assertRaises(SyntaxError, get_tokens, "0o1_")
self.assertRaises(SyntaxError, get_tokens, "0o")
self.assertRaises(SyntaxError, get_tokens, "0x1_")
self.assertRaises(SyntaxError, get_tokens, "0x")
self.assertRaises(SyntaxError, get_tokens, "1_")
self.assertRaises(SyntaxError, get_tokens, "012")
self.assertRaises(SyntaxError, get_tokens, "1.2_")
self.assertRaises(SyntaxError, get_tokens, "1e2_")
self.assertRaises(SyntaxError, get_tokens, "1e+")

self.assertRaises(SyntaxError, get_tokens, "'sdfsdf")
self.assertRaises(SyntaxError, get_tokens, "'''sdfsdf''")

self.assertRaises(SyntaxError, get_tokens, "("*1000+"a"+")"*1000)
self.assertRaises(SyntaxError, get_tokens, "]")
the_string = StringIO(string)
return list(_generate_tokens_from_c_tokenizer(the_string.readline))

for case in [
"(1+2]",
"(1+2}",
"{1+2]",
"1_",
"1.2_",
"1e2_",
"1e+",

"\xa0",
"€",
"0b12",
"0b1_2",
"0b2",
"0b1_",
"0b",
"0o18",
"0o1_8",
"0o8",
"0o1_",
"0o",
"0x1_",
"0x",
"1_",
"012",
"1.2_",
"1e2_",
"1e+",
"'sdfsdf",
"'''sdfsdf''",
"("*1000+"a"+")"*1000,
"]",
]:
with self.subTest(case=case):
self.assertRaises(SyntaxError, get_tokens, case)

def test_max_indent(self):
MAXINDENT = 100
Expand All @@ -2715,20 +2726,24 @@ def generate_source(indents):
return source

valid = generate_source(MAXINDENT - 1)
tokens = list(_generate_tokens_from_c_tokenizer(valid))
the_input = StringIO(valid)
tokens = list(_generate_tokens_from_c_tokenizer(the_input.readline))
self.assertEqual(tokens[-2].type, DEDENT)
self.assertEqual(tokens[-1].type, ENDMARKER)
compile(valid, "<string>", "exec")

invalid = generate_source(MAXINDENT)
self.assertRaises(SyntaxError, lambda: list(_generate_tokens_from_c_tokenizer(invalid)))
the_input = StringIO(invalid)
self.assertRaises(SyntaxError, lambda: list(_generate_tokens_from_c_tokenizer(the_input.readline)))
self.assertRaises(
IndentationError, compile, invalid, "<string>", "exec"
)

def test_continuation_lines_indentation(self):
def get_tokens(string):
return [(kind, string) for (kind, string, *_) in _generate_tokens_from_c_tokenizer(string)]
the_string = StringIO(string)
return [(kind, string) for (kind, string, *_)
in _generate_tokens_from_c_tokenizer(the_string.readline)]

code = dedent("""
def fib(n):
Expand Down
32 changes: 11 additions & 21 deletions Lib/tokenize.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import sys
from token import *
from token import EXACT_TOKEN_TYPES
import _tokenize as c_tokenizer

cookie_re = re.compile(r'^[ \t\f]*#.*?coding[:=][ \t]*([-\w.]+)', re.ASCII)
blank_re = re.compile(br'^[ \t\f]*(?:[#\r\n]|$)', re.ASCII)
Expand Down Expand Up @@ -443,29 +444,15 @@ def tokenize(readline):
# BOM will already have been stripped.
encoding = "utf-8"
yield TokenInfo(ENCODING, encoding, (0, 0), (0, 0), '')
yield from _tokenize(rl_gen, encoding)

def _tokenize(rl_gen, encoding):
source = b"".join(rl_gen).decode(encoding)
for token in _generate_tokens_from_c_tokenizer(source, extra_tokens=True):
yield token
yield from _generate_tokens_from_c_tokenizer(rl_gen.__next__, encoding, extra_tokens=True)

def generate_tokens(readline):
"""Tokenize a source reading Python code as unicode strings.
This has the same API as tokenize(), except that it expects the *readline*
callable to return str objects instead of bytes.
"""
def _gen():
while True:
try:
line = readline()
except StopIteration:
return
if not line:
return
yield line.encode()
return _tokenize(_gen(), 'utf-8')
return _generate_tokens_from_c_tokenizer(readline, extra_tokens=True)

def main():
import argparse
Expand Down Expand Up @@ -502,9 +489,9 @@ def error(message, filename=None, location=None):
tokens = list(tokenize(f.readline))
else:
filename = "<stdin>"
tokens = _tokenize(
tokens = _generate_tokens_from_c_tokenizer(
(x.encode('utf-8') for x in iter(sys.stdin.readline, "")
), "utf-8")
), "utf-8", extra_tokens=True)


# Output the tokenization
Expand All @@ -531,10 +518,13 @@ def error(message, filename=None, location=None):
perror("unexpected error: %s" % err)
raise

def _generate_tokens_from_c_tokenizer(source, extra_tokens=False):
def _generate_tokens_from_c_tokenizer(source, encoding=None, extra_tokens=False):
"""Tokenize a source reading Python code as unicode strings using the internal C tokenizer"""
import _tokenize as c_tokenizer
for info in c_tokenizer.TokenizerIter(source, extra_tokens=extra_tokens):
if encoding is None:
it = c_tokenizer.TokenizerIter(source, extra_tokens=extra_tokens)
else:
it = c_tokenizer.TokenizerIter(source, encoding=encoding, extra_tokens=extra_tokens)
for info in it:
yield TokenInfo._make(info)


Expand Down
Loading

0 comments on commit 0a661c4

Please sign in to comment.