Skip to content

Commit

Permalink
Fix batching regression (explosion#12094)
Browse files Browse the repository at this point in the history
* Fix batching regression

Some time ago, the spaCy v4 branch switched to the new Thinc v9
schedule. However, this introduced an error in how batching is handed.

In the PR, the batchers were changed to keep track of their step,
so that the step can be passed to the schedule. However, the issue
is that the training loop repeatedly calls the batching functions
(rather than using an infinite generator/iterator). So, the step and
therefore the schedule would be reset each epoch. Before the schedule
switch we didn't have this issue, because the old schedules were
stateful.

This PR fixes this issue by reverting the batching functions to use
a (stateful) generator. Their registry functions do accept a `Schedule`
and we convert `Schedule`s to generators.

* Update batcher docs

* Docstring fixes

* Make minibatch take iterables again as well

* Bump thinc requirement to 9.0.0.dev2

* Use type declaration

* Convert another comment into a proper type declaration
  • Loading branch information
danieldk authored and jikanter committed May 10, 2024
1 parent 2ca6e61 commit 1743358
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 54 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ requires = [
"cymem>=2.0.2,<2.1.0",
"preshed>=3.0.2,<3.1.0",
"murmurhash>=0.28.0,<1.1.0",
"thinc>=9.0.0.dev1,<9.1.0",
"thinc>=9.0.0.dev2,<9.1.0",
"numpy>=1.15.0",
]
build-backend = "setuptools.build_meta"
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ spacy-legacy>=3.0.11,<3.1.0
spacy-loggers>=1.0.0,<2.0.0
cymem>=2.0.2,<2.1.0
preshed>=3.0.2,<3.1.0
thinc>=9.0.0.dev1,<9.1.0
thinc>=9.0.0.dev2,<9.1.0
ml_datasets>=0.2.0,<0.3.0
murmurhash>=0.28.0,<1.1.0
wasabi>=0.9.1,<1.2.0
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ install_requires =
murmurhash>=0.28.0,<1.1.0
cymem>=2.0.2,<2.1.0
preshed>=3.0.2,<3.1.0
thinc>=9.0.0.dev1,<9.1.0
thinc>=9.0.0.dev2,<9.1.0
wasabi>=0.9.1,<1.2.0
srsly>=2.4.3,<3.0.0
catalogue>=2.0.6,<2.1.0
Expand Down
4 changes: 3 additions & 1 deletion spacy/tests/pipeline/test_tagger.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ def test_issue4348():
optimizer = nlp.initialize()
for i in range(5):
losses = {}
batches = util.minibatch(TRAIN_DATA, size=compounding(4.0, 32.0, 1.001))
batches = util.minibatch(
TRAIN_DATA, size=compounding(4.0, 32.0, 1.001).to_generator()
)
for batch in batches:
nlp.update(batch, sgd=optimizer, losses=losses)

Expand Down
8 changes: 6 additions & 2 deletions spacy/tests/pipeline/test_textcat.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ def test_issue3611():
optimizer = nlp.initialize()
for i in range(3):
losses = {}
batches = util.minibatch(train_data, size=compounding(4.0, 32.0, 1.001))
batches = util.minibatch(
train_data, size=compounding(4.0, 32.0, 1.001).to_generator()
)

for batch in batches:
nlp.update(examples=batch, sgd=optimizer, drop=0.1, losses=losses)
Expand Down Expand Up @@ -128,7 +130,9 @@ def test_issue4030():
optimizer = nlp.initialize()
for i in range(3):
losses = {}
batches = util.minibatch(train_data, size=compounding(4.0, 32.0, 1.001))
batches = util.minibatch(
train_data, size=compounding(4.0, 32.0, 1.001).to_generator()
)

for batch in batches:
nlp.update(examples=batch, sgd=optimizer, drop=0.1, losses=losses)
Expand Down
4 changes: 3 additions & 1 deletion spacy/tests/training/test_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,9 @@ def _train_tuples(train_data):
optimizer = nlp.initialize()
for i in range(5):
losses = {}
batches = minibatch(train_examples, size=compounding(4.0, 32.0, 1.001))
batches = minibatch(
train_examples, size=compounding(4.0, 32.0, 1.001).to_generator()
)
for batch in batches:
nlp.update(batch, sgd=optimizer, losses=losses)

Expand Down
58 changes: 30 additions & 28 deletions spacy/training/batchers.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import itertools
from thinc.schedules import Schedule, constant as constant_schedule
from thinc.schedules import Schedule

from ..util import minibatch, registry

Sizing = Union[Sequence[int], int, Schedule[int]]
SizingSchedule = Union[Iterable[int], int, Schedule]
Sizing = Union[Iterable[int], int]
ItemT = TypeVar("ItemT")
BatcherT = Callable[[Iterable[ItemT]], Iterable[List[ItemT]]]


@registry.batchers("spacy.batch_by_padded.v1")
def configure_minibatch_by_padded_size(
*,
size: Sizing,
size: SizingSchedule,
buffer: int,
discard_oversize: bool,
get_length: Optional[Callable[[ItemT], int]] = None
Expand All @@ -21,8 +22,8 @@ def configure_minibatch_by_padded_size(
The padded size is defined as the maximum length of sequences within the
batch multiplied by the number of sequences in the batch.
size (int or Sequence[int]): The largest padded size to batch sequences into.
Can be a single integer, or a sequence, allowing for variable batch sizes.
size (int, Iterable[int] or Schedule): The largest padded size to batch sequences
into. Can be a single integer, or a sequence, allowing for variable batch sizes.
buffer (int): The number of sequences to accumulate before sorting by length.
A larger buffer will result in more even sizing, but if the buffer is
very large, the iteration order will be less random, which can result
Expand All @@ -36,7 +37,7 @@ def configure_minibatch_by_padded_size(
optionals = {"get_length": get_length} if get_length is not None else {}
return partial(
minibatch_by_padded_size,
size=size,
size=_schedule_to_sizing(size),
buffer=buffer,
discard_oversize=discard_oversize,
**optionals
Expand All @@ -46,14 +47,14 @@ def configure_minibatch_by_padded_size(
@registry.batchers("spacy.batch_by_words.v1")
def configure_minibatch_by_words(
*,
size: Sizing,
size: SizingSchedule,
tolerance: float,
discard_oversize: bool,
get_length: Optional[Callable[[ItemT], int]] = None
) -> BatcherT:
"""Create a batcher that uses the "minibatch by words" strategy.
size (int or Sequence[int]): The target number of words per batch.
size (int, Iterable[int] or Schedule): The target number of words per batch.
Can be a single integer, or a sequence, allowing for variable batch sizes.
tolerance (float): What percentage of the size to allow batches to exceed.
discard_oversize (bool): Whether to discard sequences that by themselves
Expand All @@ -64,7 +65,7 @@ def configure_minibatch_by_words(
optionals = {"get_length": get_length} if get_length is not None else {}
return partial(
minibatch_by_words,
size=size,
size=_schedule_to_sizing(size),
tolerance=tolerance,
discard_oversize=discard_oversize,
**optionals
Expand All @@ -73,15 +74,15 @@ def configure_minibatch_by_words(

@registry.batchers("spacy.batch_by_sequence.v1")
def configure_minibatch(
size: Sizing, get_length: Optional[Callable[[ItemT], int]] = None
size: SizingSchedule, get_length: Optional[Callable[[ItemT], int]] = None
) -> BatcherT:
"""Create a batcher that creates batches of the specified size.
size (int or Sequence[int]): The target number of items per batch.
size (int, Iterable[int] or Schedule): The target number of items per batch.
Can be a single integer, or a sequence, allowing for variable batch sizes.
"""
optionals = {"get_length": get_length} if get_length is not None else {}
return partial(minibatch, size=size, **optionals)
return partial(minibatch, size=_schedule_to_sizing(size), **optionals)


def minibatch_by_padded_size(
Expand All @@ -97,7 +98,7 @@ def minibatch_by_padded_size(
The padded size is defined as the maximum length of sequences within the
batch multiplied by the number of sequences in the batch.
size (int or Sequence[int]): The largest padded size to batch sequences into.
size (int or Iterable[int]): The largest padded size to batch sequences into.
buffer (int): The number of sequences to accumulate before sorting by length.
A larger buffer will result in more even sizing, but if the buffer is
very large, the iteration order will be less random, which can result
Expand All @@ -108,13 +109,12 @@ def minibatch_by_padded_size(
The `len` function is used by default.
"""
if isinstance(size, int):
size_ = constant_schedule(size)
size_: Iterator[int] = itertools.repeat(size)
else:
assert isinstance(size, Schedule)
size_ = size
for step, outer_batch in enumerate(minibatch(seqs, size=buffer)):
size_ = iter(size)
for outer_batch in minibatch(seqs, size=buffer):
outer_batch = list(outer_batch)
target_size = size_(step)
target_size = next(size_)
for indices in _batch_by_length(outer_batch, target_size, get_length):
subbatch = [outer_batch[i] for i in indices]
padded_size = max(len(seq) for seq in subbatch) * len(subbatch)
Expand All @@ -136,7 +136,7 @@ def minibatch_by_words(
themselves, or be discarded if discard_oversize=True.
seqs (Iterable[Sequence]): The sequences to minibatch.
size (int or Sequence[int]): The target number of words per batch.
size (int or Iterable[int]): The target number of words per batch.
Can be a single integer, or a sequence, allowing for variable batch sizes.
tolerance (float): What percentage of the size to allow batches to exceed.
discard_oversize (bool): Whether to discard sequences that by themselves
Expand All @@ -145,12 +145,10 @@ def minibatch_by_words(
item. The `len` function is used by default.
"""
if isinstance(size, int):
size_ = constant_schedule(size)
size_: Iterator[int] = itertools.repeat(size)
else:
assert isinstance(size, Schedule)
size_ = size
step = 0
target_size = size_(step)
size_ = iter(size)
target_size = next(size_)
tol_size = target_size * tolerance
batch = []
overflow = []
Expand All @@ -175,8 +173,7 @@ def minibatch_by_words(
else:
if batch:
yield batch
step += 1
target_size = size_(step)
target_size = next(size_)
tol_size = target_size * tolerance
batch = overflow
batch_size = overflow_size
Expand All @@ -194,8 +191,7 @@ def minibatch_by_words(
else:
if batch:
yield batch
step += 1
target_size = size_(step)
target_size = next(size_)
tol_size = target_size * tolerance
batch = [seq]
batch_size = n_words
Expand Down Expand Up @@ -232,3 +228,9 @@ def _batch_by_length(
batches = [list(sorted(batch)) for batch in batches]
batches.reverse()
return batches


def _schedule_to_sizing(size: SizingSchedule) -> Sizing:
if isinstance(size, Schedule):
return size.to_generator()
return size
8 changes: 4 additions & 4 deletions spacy/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1642,12 +1642,12 @@ def minibatch(items, size):
so that batch-size can vary on each step.
"""
if isinstance(size, int):
size_ = constant_schedule(size)
size_ = itertools.repeat(size)
else:
size_ = size
size_ = iter(size)
items = iter(items)
for step in itertools.count():
batch_size = size_(step)
while True:
batch_size = next(size_)
batch = list(itertools.islice(items, int(batch_size)))
if len(batch) == 0:
break
Expand Down
30 changes: 15 additions & 15 deletions website/docs/api/top-level.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -878,14 +878,14 @@ themselves, or be discarded if `discard_oversize` is set to `True`. The argument
> get_length = null
> ```
| Name | Description |
| ------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `seqs` | The sequences to minibatch. ~~Iterable[Any]~~ |
| `size` | The target number of words per batch. Can also be a block referencing a schedule, e.g. [`compounding`](https://thinc.ai/docs/api-schedules/#compounding). ~~Union[int, Sequence[int]]~~ |
| `tolerance` | What percentage of the size to allow batches to exceed. ~~float~~ |
| `discard_oversize` | Whether to discard sequences that by themselves exceed the tolerated size. ~~bool~~ |
| `get_length` | Optional function that receives a sequence item and returns its length. Defaults to the built-in `len()` if not set. ~~Optional[Callable[[Any], int]]~~ |
| **CREATES** | The batcher that takes an iterable of items and returns batches. ~~Callable[[Iterable[Any]], Iterable[List[Any]]]~~ |
| Name | Description |
| ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `seqs` | The sequences to minibatch. ~~Iterable[Any]~~ |
| `size` | The target number of words per batch. Can also be a block referencing a schedule, e.g. [`compounding`](https://thinc.ai/docs/api-schedules/#compounding). ~~Union[int, Iterable[int], Schedule]~~ |
| `tolerance` | What percentage of the size to allow batches to exceed. ~~float~~ |
| `discard_oversize` | Whether to discard sequences that by themselves exceed the tolerated size. ~~bool~~ |
| `get_length` | Optional function that receives a sequence item and returns its length. Defaults to the built-in `len()` if not set. ~~Optional[Callable[[Any], int]]~~ |
| **CREATES** | The batcher that takes an iterable of items and returns batches. ~~Callable[[Iterable[Any]], Iterable[List[Any]]]~~ |
### spacy.batch_by_sequence.v1 {id="batch_by_sequence",tag="registered function"}
Expand All @@ -900,11 +900,11 @@ themselves, or be discarded if `discard_oversize` is set to `True`. The argument
Create a batcher that creates batches of the specified size.
| Name | Description |
| ------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `size` | The target number of items per batch. Can also be a block referencing a schedule, e.g. [`compounding`](https://thinc.ai/docs/api-schedules/#compounding). ~~Union[int, Sequence[int]]~~ |
| `get_length` | Optional function that receives a sequence item and returns its length. Defaults to the built-in `len()` if not set. ~~Optional[Callable[[Any], int]]~~ |
| **CREATES** | The batcher that takes an iterable of items and returns batches. ~~Callable[[Iterable[Any]], Iterable[List[Any]]]~~ |
| Name | Description |
| ------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `size` | The target number of items per batch. Can also be a block referencing a schedule, e.g. [`compounding`](https://thinc.ai/docs/api-schedules/#compounding). ~~Union[int, Iterable[int], Schedule]~~ |
| `get_length` | Optional function that receives a sequence item and returns its length. Defaults to the built-in `len()` if not set. ~~Optional[Callable[[Any], int]]~~ |
| **CREATES** | The batcher that takes an iterable of items and returns batches. ~~Callable[[Iterable[Any]], Iterable[List[Any]]]~~ |
### spacy.batch_by_padded.v1 {id="batch_by_padded",tag="registered function"}
Expand All @@ -926,7 +926,7 @@ sequences in the batch.
| Name | Description |
| ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `size` | The largest padded size to batch sequences into. Can also be a block referencing a schedule, e.g. [`compounding`](https://thinc.ai/docs/api-schedules/#compounding). ~~Union[int, Sequence[int]]~~ |
| `size` | The largest padded size to batch sequences into. Can also be a block referencing a schedule, e.g. [`compounding`](https://thinc.ai/docs/api-schedules/#compounding). ~~Union[int, Iterable[int], Schedule]~~ |
| `buffer` | The number of sequences to accumulate before sorting by length. A larger buffer will result in more even sizing, but if the buffer is very large, the iteration order will be less random, which can result in suboptimal training. ~~int~~ |
| `discard_oversize` | Whether to discard sequences that are by themselves longer than the largest padded batch size. ~~bool~~ |
| `get_length` | Optional function that receives a sequence item and returns its length. Defaults to the built-in `len()` if not set. ~~Optional[Callable[[Any], int]]~~ |
Expand Down Expand Up @@ -1528,7 +1528,7 @@ vary on each step.
| Name | Description |
| ---------- | ------------------------------------------------ |
| `items` | The items to batch up. ~~Iterable[Any]~~ |
| `size` | The batch size(s). ~~Union[int, Sequence[int]]~~ |
| `size` | The batch size(s). ~~Union[int, Iterable[int]]~~ |
| **YIELDS** | The batches. |
### util.filter_spans {id="util.filter_spans",tag="function",version="2.1.4"}
Expand Down

0 comments on commit 1743358

Please sign in to comment.