Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windowing Support for the Dask Runner #23913

Closed
wants to merge 139 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
139 commits
Select commit Hold shift + click to select a range
bc6b525
WIP: Created a skeleton dask runner implementation.
alxmrs Jun 22, 2022
665ee61
WIP: Idea for a translation evaluator.
alxmrs Jun 23, 2022
d1095c7
Added overrides and a visitor that translates operations.
alxmrs Jul 2, 2022
fd13d40
Fixed a dataclass typo.
alxmrs Jul 2, 2022
e3d0c8a
Expanded translations.
alxmrs Jul 2, 2022
69a660f
Core idea seems to be kinda working...
alxmrs Jul 2, 2022
ab58334
First iteration on DaskRunnerResult (keep track of pipeline state).
alxmrs Jul 3, 2022
5391cd6
Added minimal set of DaskRunner options.
alxmrs Jul 4, 2022
5deb598
WIP: Alllmost got asserts to work! The current status is:
alxmrs Jul 8, 2022
1768e47
With a great 1-liner from @pabloem, groupby is fixed! Now, all three …
alxmrs Jul 8, 2022
e6c7106
Self-review: Cleaned up dask runner impl.
alxmrs Jul 8, 2022
1b58d4c
Self-review: Remove TODOs, delete commented out code, other cleanup.
alxmrs Jul 8, 2022
5fe9372
First pass at linting rules.
alxmrs Jul 9, 2022
b98330e
WIP, include dask dependencies + test setup.
alxmrs Jul 9, 2022
e48780a
WIP: maybe better dask deps?
alxmrs Jul 9, 2022
0dc9e23
Skip dask tests depending on successful import.
alxmrs Jul 10, 2022
326d3a3
Fixed setup.py (missing `,`).
alxmrs Jul 11, 2022
b4cc408
Added an additional comma.
alxmrs Jul 11, 2022
40a6ebe
Moved skipping logic to be above dask import.
alxmrs Jul 11, 2022
3c4204d
Fix lint issues with dask runner tests.
alxmrs Sep 5, 2022
41623ec
Adding destination for client address.
alxmrs Sep 20, 2022
676d752
Changing to async produces a timeout error instead of stuck in infini…
alxmrs Sep 21, 2022
09365f6
Close client during `wait_until_finish`; rm async.
alxmrs Sep 22, 2022
c6ba4ba
Revert "Close client during `wait_until_finish`; rm async."
pabloem Sep 28, 2022
a325356
Revert "Changing to async produces a timeout error instead of stuck i…
pabloem Sep 28, 2022
ea13125
Adding -dask tox targets onto the gradle build
pabloem Sep 28, 2022
f855ffc
Supporting side-inputs for ParDo.
alxmrs Oct 2, 2022
3fd966e
Merge remote-tracking branch 'origin/dask-runner-mvp' into dask-runne…
alxmrs Oct 2, 2022
173d79b
wip - added print stmt.
alxmrs Oct 2, 2022
dd2d15c
wip - prove side inputs is set.
alxmrs Oct 2, 2022
8756618
wip - prove side inputs is set in Pardo.
alxmrs Oct 2, 2022
8380d7b
wip - rm asserts, add print
alxmrs Oct 2, 2022
b908dc3
wip - adding named inputs...
alxmrs Oct 2, 2022
f444b1e
Experiments: non-named side inputs + del `None` in named inputs.
alxmrs Oct 2, 2022
174d6fd
None --> 'None'
alxmrs Oct 2, 2022
60b063a
No default side input.
alxmrs Oct 2, 2022
90ee474
Pass along args + kwargs.
alxmrs Oct 2, 2022
c62050e
WIP Windowing with dask runner.
alxmrs Oct 3, 2022
79d4603
WIP: Created a skeleton dask runner implementation.
alxmrs Jun 22, 2022
248ec70
WIP: Idea for a translation evaluator.
alxmrs Jun 23, 2022
42452ca
Added overrides and a visitor that translates operations.
alxmrs Jul 2, 2022
1da2ddd
Fixed a dataclass typo.
alxmrs Jul 2, 2022
14885a3
Expanded translations.
alxmrs Jul 2, 2022
fca2420
Core idea seems to be kinda working...
alxmrs Jul 2, 2022
6dd1ada
First iteration on DaskRunnerResult (keep track of pipeline state).
alxmrs Jul 3, 2022
6675687
Added minimal set of DaskRunner options.
alxmrs Jul 4, 2022
88ed36b
WIP: Alllmost got asserts to work! The current status is:
alxmrs Jul 8, 2022
2e3a126
With a great 1-liner from @pabloem, groupby is fixed! Now, all three …
alxmrs Jul 8, 2022
6467b0e
Self-review: Cleaned up dask runner impl.
alxmrs Jul 8, 2022
793ba86
Self-review: Remove TODOs, delete commented out code, other cleanup.
alxmrs Jul 8, 2022
e535792
First pass at linting rules.
alxmrs Jul 9, 2022
8e32668
WIP, include dask dependencies + test setup.
alxmrs Jul 9, 2022
318afc2
WIP: maybe better dask deps?
alxmrs Jul 9, 2022
b01855f
Skip dask tests depending on successful import.
alxmrs Jul 10, 2022
2c2eb8d
Fixed setup.py (missing `,`).
alxmrs Jul 11, 2022
e64e9eb
Added an additional comma.
alxmrs Jul 11, 2022
69b118b
Moved skipping logic to be above dask import.
alxmrs Jul 11, 2022
9ffc8d8
Fix lint issues with dask runner tests.
alxmrs Sep 5, 2022
8a2afb7
Adding destination for client address.
alxmrs Sep 20, 2022
93f02f1
Changing to async produces a timeout error instead of stuck in infini…
alxmrs Sep 21, 2022
afdcf1b
Close client during `wait_until_finish`; rm async.
alxmrs Sep 22, 2022
41b5267
Supporting side-inputs for ParDo.
alxmrs Oct 2, 2022
e3ac3f8
Revert "Close client during `wait_until_finish`; rm async."
pabloem Sep 28, 2022
3fddc81
Revert "Changing to async produces a timeout error instead of stuck i…
pabloem Sep 28, 2022
9eeb9ea
Adding -dask tox targets onto the gradle build
pabloem Sep 28, 2022
b4d0999
wip - added print stmt.
alxmrs Oct 2, 2022
0319ffd
wip - prove side inputs is set.
alxmrs Oct 2, 2022
0b13bb0
wip - prove side inputs is set in Pardo.
alxmrs Oct 2, 2022
1e7052b
wip - rm asserts, add print
alxmrs Oct 2, 2022
292e023
wip - adding named inputs...
alxmrs Oct 2, 2022
31c1e2b
Experiments: non-named side inputs + del `None` in named inputs.
alxmrs Oct 2, 2022
f4ecf2f
None --> 'None'
alxmrs Oct 2, 2022
4d24ed9
No default side input.
alxmrs Oct 2, 2022
ee62a4a
Pass along args + kwargs.
alxmrs Oct 2, 2022
506c719
Applied yapf to dask sources.
alxmrs Oct 9, 2022
cd0ba8b
Dask sources passing pylint.
alxmrs Oct 9, 2022
d0a7c63
Added dask extra to docs gen tox env.
alxmrs Oct 9, 2022
775bd07
Applied yapf from tox.
alxmrs Oct 9, 2022
efba1c9
Include dask in mypy checks.
alxmrs Oct 9, 2022
741d961
Upgrading mypy support to python 3.8 since py37 support is deprecated…
alxmrs Oct 9, 2022
f66458a
Manually installing an old version of dask before 3.7 support was dro…
alxmrs Oct 9, 2022
5dcf969
fix lint: line too long.
alxmrs Oct 9, 2022
ec5f613
Fixed type errors with DaskRunnerResult. Disabled mypy type checking …
alxmrs Oct 9, 2022
04b1f1a
Fix pytype errors (in transform_evaluator).
alxmrs Oct 9, 2022
712944b
Ran isort.
alxmrs Oct 9, 2022
567b72b
Ran yapf again.
alxmrs Oct 9, 2022
f53c0a4
Fix imports (one per line)
alxmrs Oct 10, 2022
fb280ad
isort -- alphabetical.
alxmrs Oct 10, 2022
80ddfec
Added feature to CHANGES.md.
alxmrs Oct 10, 2022
40c4e35
ran yapf via tox on linux machine
alxmrs Oct 10, 2022
a70c5f3
Merge branch 'master' into dask-runner-mvp
alxmrs Oct 13, 2022
9fb52e5
Change an import to pass CI.
alxmrs Oct 13, 2022
91115e0
WIP -- better structure in ParDo for windowing. Thanks @pabloem.
alxmrs Oct 13, 2022
26c6016
Skip isort error; needed to get CI to pass.
alxmrs Oct 13, 2022
aec19bf
Skip test logic may favor better with isort.
alxmrs Oct 13, 2022
0673235
(Maybe) the last isort fix.
alxmrs Oct 13, 2022
de03a32
Tested pipeline options (added one fix).
alxmrs Oct 14, 2022
7e0a2c7
Improve formatting of test.
alxmrs Oct 14, 2022
39b1e1c
Self-review: removing side inputs.
alxmrs Oct 14, 2022
6db49fa
add dask to coverage suite in tox.
alxmrs Oct 17, 2022
ed00139
Experiment: Windowed ParDo with @pabloem.
alxmrs Oct 17, 2022
2d8f5d6
Merge branch 'master' of github.com:apache/beam into dask-runner-wind…
alxmrs Oct 17, 2022
d35e9d6
add mandatory args for _OutputHandler
pabloem Oct 17, 2022
973d9f9
Merge pull request #79 from pabloem/dask-runner-windowing
alxmrs Oct 17, 2022
0e5d498
Merge branch 'dask-runner-windowing' of github.com:alxmrs/beam into d…
alxmrs Oct 17, 2022
3feeeac
Update: still need to pre-apply windowed values.
alxmrs Oct 17, 2022
036561c
Merge branch 'master' into dask-runner-mvp
alxmrs Oct 18, 2022
191580d
Capture value error in assert.
alxmrs Oct 18, 2022
365fc87
Merge branch 'master' of github.com:apache/beam into dask-runner-mvp
alxmrs Oct 18, 2022
085447e
Change timeout value to 600 seconds.
alxmrs Oct 18, 2022
1a60a5e
ignoring broken test
pabloem Oct 21, 2022
c1037f8
Update CHANGES.md
pabloem Oct 21, 2022
9e79ffd
Using reflection to test the Dask client constructor.
alxmrs Oct 24, 2022
3e2cc0f
Merge branch 'dask-runner-mvp' into dask-runner-windowing
alxmrs Oct 24, 2022
4edc970
Better method of inspecting the constructor parameters (thanks @TomAu…
alxmrs Oct 24, 2022
fd8e361
Merge branch 'master' of github.com:apache/beam into dask-runner-wind…
alxmrs Oct 25, 2022
d88e8a1
Merge branch 'master' of github.com:apache/beam into dask-runner-wind…
alxmrs Oct 28, 2022
36bea9a
Minor fixes, and now unit tests are passing!!
alxmrs Oct 31, 2022
df315c1
Ran yapf on Dask sources.
alxmrs Nov 1, 2022
ef0d2b6
Ran lint checks.
alxmrs Nov 1, 2022
6c2cc4e
(hopefully) final lint check.
alxmrs Nov 1, 2022
0fae761
Disabled additional ungrouped imports check.
alxmrs Nov 1, 2022
119666c
Ran yapf with correct version.
alxmrs Nov 1, 2022
2e46d88
mini self-review.
alxmrs Nov 2, 2022
2ed8b14
WIP: A more correct windowing implementation with failing tests.
alxmrs Nov 5, 2022
2f193d5
WIP: Further improvements, more correct windowing impl.
alxmrs Nov 5, 2022
577f30a
WIP: Passing initial tests, failing multiple -- drops random elements…
alxmrs Nov 5, 2022
b3a70f6
WIP: All tests are passing :)
alxmrs Nov 5, 2022
7acd8d5
Cleanup: removed variables for debugger.
alxmrs Nov 5, 2022
7e90e2b
Lint + YAPF
alxmrs Nov 5, 2022
6e33ce2
self-review.
alxmrs Nov 5, 2022
f54f14c
fix lint
pabloem Nov 18, 2022
8dd2cdb
fixup
pabloem Nov 18, 2022
518a8f0
ignore internal dask file for docs
pabloem Nov 19, 2022
801b131
fixing dask version
pabloem Nov 28, 2022
393c508
Merge remote-tracking branch 'origin/master' into dask-runner-windowing
pabloem Nov 28, 2022
f89f609
remove Python 3.7 which seems unsupported by newer Dask versions
pabloem Nov 28, 2022
d6486fe
reducing scope of Dask tests
pabloem Nov 30, 2022
6f05963
adding datafrems dep
pabloem Nov 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions .github/workflows/dask_runner_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
os: [ubuntu-latest, macos-latest]
params: [
{"py_ver": "3.7", "tox_env": "py37"},
{"py_ver": "3.8", "tox_env": "py38"},
{"py_ver": "3.9", "tox_env": "py39"},
{"py_ver": "3.10", "tox_env": "py310" },
Expand All @@ -86,15 +85,11 @@ jobs:
run: pip install tox
- name: Install SDK with dask
working-directory: ./sdks/python
run: pip install setuptools --upgrade && pip install -e .[gcp,dask,test]
run: pip install setuptools --upgrade && pip install -e .[dask,test,dataframes]
- name: Run tests basic unix
if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macos')
working-directory: ./sdks/python
run: tox -c tox.ini -e ${{ matrix.params.tox_env }}-dask
- name: Run tests basic windows
if: startsWith(matrix.os, 'windows')
working-directory: ./sdks/python
run: tox -c tox.ini -e ${{ matrix.params.tox_env }}-win-dask
- name: Upload test logs
uses: actions/upload-artifact@v3
if: always()
Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/runners/dask/dask_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ def is_fnapi_compatible():
return False

def run_pipeline(self, pipeline, options):
import dask

# TODO(alxr): Create interactive notebook support.
if is_in_notebook():
raise NotImplementedError('interactive support will come later!')
Expand All @@ -178,5 +180,6 @@ def run_pipeline(self, pipeline, options):
dask_visitor = self.to_dask_bag_visitor()
pipeline.visit(dask_visitor)

futures = client.compute(list(dask_visitor.bags.values()))
opt_graph = dask.optimize(dask_visitor.bags.values())
futures = client.compute(opt_graph)
return DaskRunnerResult(client, futures)
51 changes: 49 additions & 2 deletions sdks/python/apache_beam/runners/dask/dask_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
from apache_beam.testing.util import equal_to

try:
from apache_beam.runners.dask.dask_runner import DaskOptions
from apache_beam.runners.dask.dask_runner import DaskRunner
import dask
import dask.distributed as ddist

from apache_beam.runners.dask.dask_runner import DaskOptions # pylint: disable=ungrouped-imports
from apache_beam.runners.dask.dask_runner import DaskRunner # pylint: disable=ungrouped-imports
except (ImportError, ModuleNotFoundError):
raise unittest.SkipTest('Dask must be installed to run tests.')

Expand Down Expand Up @@ -73,6 +74,11 @@ def test_create(self):
pcoll = p | beam.Create([1])
assert_that(pcoll, equal_to([1]))

def test_create_multiple(self):
with self.pipeline as p:
pcoll = p | beam.Create([1, 2, 3])
assert_that(pcoll, equal_to([1, 2, 3]))

def test_create_and_map(self):
def double(x):
return x * 2
Expand All @@ -81,6 +87,22 @@ def double(x):
pcoll = p | beam.Create([1]) | beam.Map(double)
assert_that(pcoll, equal_to([2]))

def test_create_and_map_multiple(self):
def double(x):
return x * 2

with self.pipeline as p:
pcoll = p | beam.Create([1, 2]) | beam.Map(double)
assert_that(pcoll, equal_to([2, 4]))

def test_create_and_map_many(self):
def double(x):
return x * 2

with self.pipeline as p:
pcoll = p | beam.Create(list(range(1, 11))) | beam.Map(double)
assert_that(pcoll, equal_to(list(range(2, 21, 2))))

def test_create_map_and_groupby(self):
def double(x):
return x * 2, x
Expand All @@ -89,6 +111,31 @@ def double(x):
pcoll = p | beam.Create([1]) | beam.Map(double) | beam.GroupByKey()
assert_that(pcoll, equal_to([(2, [1])]))

def test_create_map_and_groupby_multiple(self):
def double(x):
return x * 2, x

with self.pipeline as p:
pcoll = p | beam.Create([1, 2, 1, 2, 3
]) | beam.Map(double) | beam.GroupByKey()
assert_that(pcoll, equal_to([(2, [1, 1]), (4, [2, 2]), (6, [3])]))

def test_map_with_side_inputs(self):
def mult_by(x, y):
return x * y

with self.pipeline as p:
pcoll = p | beam.Create([1]) | beam.Map(mult_by, 3)
assert_that(pcoll, equal_to([3]))

def test_map_with_named_side_inputs(self):
def mult_by(x, y):
return x * y

with self.pipeline as p:
pcoll = p | beam.Create([1]) | beam.Map(mult_by, y=3)
assert_that(pcoll, equal_to([3]))


if __name__ == '__main__':
unittest.main()
7 changes: 5 additions & 2 deletions sdks/python/apache_beam/runners/dask/overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def infer_output_type(self, input_type):
@typehints.with_input_types(t.Tuple[K, t.Iterable[V]])
@typehints.with_output_types(t.Tuple[K, t.Iterable[V]])
class _GroupAlsoByWindow(beam.ParDo):
"""Not used yet..."""
def __init__(self, windowing):
super().__init__(_GroupAlsoByWindowDoFn(windowing))
self.windowing = windowing
Expand All @@ -86,7 +85,11 @@ def expand(self, input_or_inputs):
@typehints.with_output_types(t.Tuple[K, t.Iterable[V]])
class _GroupByKey(beam.PTransform):
def expand(self, input_or_inputs):
return input_or_inputs | "GroupByKey" >> _GroupByKeyOnly()
return (
input_or_inputs
| "ReifyWindows" >> beam.ParDo(beam.GroupByKey.ReifyWindows())
| "GroupByKey" >> _GroupByKeyOnly()
| "GroupByWindow" >> _GroupAlsoByWindow(input_or_inputs.windowing))


class _Flatten(beam.PTransform):
Expand Down
116 changes: 106 additions & 10 deletions sdks/python/apache_beam/runners/dask/transform_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,73 @@
import abc
import dataclasses
import typing as t
from dataclasses import field

import apache_beam
import dask.bag as db
from apache_beam import DoFn
from apache_beam import TaggedOutput
from apache_beam.pipeline import AppliedPTransform
from apache_beam.runners.common import DoFnContext
from apache_beam.runners.common import DoFnInvoker
from apache_beam.runners.common import DoFnSignature
from apache_beam.runners.common import Receiver
from apache_beam.runners.common import _OutputHandler
from apache_beam.runners.dask.overrides import _Create
from apache_beam.runners.dask.overrides import _Flatten
from apache_beam.runners.dask.overrides import _GroupByKeyOnly
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms.window import WindowFn
from apache_beam.utils.windowed_value import WindowedValue

OpInput = t.Union[db.Bag, t.Sequence[db.Bag], None]
PCollVal = t.Union[WindowedValue, t.Any]


def get_windowed_value(item: t.Any, window_fn: WindowFn) -> WindowedValue:
if isinstance(item, TaggedOutput):
item = item.value

if isinstance(item, WindowedValue):
windowed_value = item
elif isinstance(item, TimestampedValue):
assign_context = WindowFn.AssignContext(item.timestamp, item.value)
windowed_value = WindowedValue(
item.value, item.timestamp, tuple(window_fn.assign(assign_context)))
else:
windowed_value = WindowedValue(item, 0, (GlobalWindow(), ))

return windowed_value


def defenestrate(x):
if isinstance(x, WindowedValue):
return x.value
return x


@dataclasses.dataclass
class TaggingReceiver(Receiver):
tag: str
values: t.List[PCollVal]

def receive(self, windowed_value: WindowedValue):
if self.tag:
output = TaggedOutput(self.tag, windowed_value)
else:
output = windowed_value
self.values.append(output)


@dataclasses.dataclass
class OneReceiver(dict):
values: t.List[PCollVal] = field(default_factory=list)

def __missing__(self, key):
if key not in self:
self[key] = TaggingReceiver(key, self.values)
return self[key]


@dataclasses.dataclass
Expand Down Expand Up @@ -65,15 +123,54 @@ def apply(self, input_bag: OpInput) -> db.Bag:
class ParDo(DaskBagOp):
def apply(self, input_bag: db.Bag) -> db.Bag:
transform = t.cast(apache_beam.ParDo, self.transform)
return input_bag.map(
transform.fn.process, *transform.args, **transform.kwargs).flatten()

args, kwargs = transform.raw_side_inputs
args = list(args)
main_input = next(iter(self.applied.main_inputs.values()))
window_fn = main_input.windowing.windowfn if hasattr(
main_input, "windowing") else None

class Map(DaskBagOp):
def apply(self, input_bag: db.Bag) -> db.Bag:
transform = t.cast(apache_beam.Map, self.transform)
return input_bag.map(
transform.fn.process, *transform.args, **transform.kwargs)
context = DoFnContext(transform.label, state=None)
bundle_finalizer_param = DoFn.BundleFinalizerParam()
do_fn_signature = DoFnSignature(transform.fn)

tagged_receivers = OneReceiver()

output_processor = _OutputHandler(
window_fn=window_fn,
main_receivers=tagged_receivers[None],
tagged_receivers=tagged_receivers,
per_element_output_counter=None,
output_batch_converter=None,
process_yields_batches=False,
process_batch_yields_elements=False)

do_fn_invoker = DoFnInvoker.create_invoker(
do_fn_signature,
output_processor,
context,
None,
args,
kwargs,
user_state_context=None,
bundle_finalizer_param=bundle_finalizer_param)

def apply_dofn_to_bundle(items):
do_fn_invoker.invoke_setup()
do_fn_invoker.invoke_start_bundle()

for it in items:
do_fn_invoker.invoke_process(it)

results = [v.value for v in tagged_receivers.values]

do_fn_invoker.invoke_finish_bundle()
do_fn_invoker.invoke_teardown()

return results

return input_bag.map(get_windowed_value,
window_fn).map_partitions(apply_dofn_to_bundle)


class GroupByKey(DaskBagOp):
Expand All @@ -83,21 +180,20 @@ def key(item):

def value(item):
k, v = item
return k, [elm[1] for elm in v]
return k, [defenestrate(elm[1]) for elm in v]

return input_bag.groupby(key).map(value)


class Flatten(DaskBagOp):
def apply(self, input_bag: OpInput) -> db.Bag:
def apply(self, input_bag: t.List[db.Bag]) -> db.Bag:
assert type(input_bag) is list, 'Must take a sequence of bags!'
return db.concat(input_bag)


TRANSLATIONS = {
_Create: Create,
apache_beam.ParDo: ParDo,
apache_beam.Map: Map,
_GroupByKeyOnly: GroupByKey,
_Flatten: Flatten,
}
1 change: 1 addition & 0 deletions sdks/python/scripts/generate_pydoc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ excluded_patterns=(
'apache_beam/runners/portability/'
'apache_beam/runners/test/'
'apache_beam/runners/worker/'
'apache_beam/runners/dask/transform_evaluator.*'
'apache_beam/testing/benchmarks/chicago_taxi/'
'apache_beam/testing/benchmarks/inference/'
'apache_beam/testing/benchmarks/data/'
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ def get_portability_package_data():
'dataframe': ['pandas>=1.0,<1.5;python_version<"3.10"',
'pandas>=1.4.3,<1.5;python_version>="3.10"'],
'dask': [
'dask >= 2022.6',
'distributed >= 2022.6',
'dask >= 2022.6.0',
'distributed >= 2022.6.0',
],
},
zip_safe=False,
Expand Down
5 changes: 3 additions & 2 deletions sdks/python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[tox]
# new environments will be excluded by default unless explicitly added to envlist.
envlist = py37,py38,py39,py310,py37-{cloud,cython,lint,mypy,dask},py38-{cloud,cython,docs,cloudcoverage,dask},py39-{cloud,cython},py310-{cloud,cython,dask},whitespacelint
envlist = py37,py38,py39,py310,py37-{cloud,cython,lint,mypy,dask},py38-{cloud,cython,docs,cloudcoverage,dask},py39-{cloud,cython,dask},py310-{cloud,cython,dask},whitespacelint
toxworkdir = {toxinidir}/target/{env:ENV_NAME:.tox}

[pycodestyle]
Expand Down Expand Up @@ -93,9 +93,10 @@ commands =
{toxinidir}/scripts/run_pytest.sh {envname} "{posargs}"

[testenv:py{37,38,39}-dask]
extras = test,dask
extras = test,dask,dataframes
commands =
{toxinidir}/scripts/run_pytest.sh {envname} "{posargs}"

[testenv:py38-cloudcoverage]
deps =
codecov
Expand Down