-
Notifications
You must be signed in to change notification settings - Fork 8
/
test_file_watcher_integration.py
142 lines (116 loc) · 4.92 KB
/
test_file_watcher_integration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# License: MIT
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
"""Integration tests for the `util` module."""
import os
import pathlib
from datetime import timedelta
import pytest
from frequenz.channels import ReceiverStoppedError, select, selected_from
from frequenz.channels.file_watcher import Event, EventType, FileWatcher
from frequenz.channels.timer import SkipMissedAndDrift, Timer
@pytest.mark.integration
async def test_file_watcher(tmp_path: pathlib.Path) -> None:
"""Ensure file watcher is returning paths on file events.
Args:
tmp_path: A tmp directory to run the file watcher on. Created by pytest.
"""
filename = tmp_path / "test-file"
number_of_writes = 0
expected_number_of_writes = 3
file_watcher = FileWatcher(paths=[str(tmp_path)])
timer = Timer(timedelta(seconds=0.1), SkipMissedAndDrift())
async for selected in select(file_watcher, timer):
if selected_from(selected, timer):
filename.write_text(f"{selected.message}")
elif selected_from(selected, file_watcher):
event_type = EventType.CREATE if number_of_writes == 0 else EventType.MODIFY
assert selected.message == Event(type=event_type, path=filename)
number_of_writes += 1
# After receiving a write 3 times, unsubscribe from the writes channel
if number_of_writes == expected_number_of_writes:
break
assert number_of_writes == expected_number_of_writes
@pytest.mark.integration
async def test_file_watcher_deletes(tmp_path: pathlib.Path) -> None:
"""Ensure file watcher is returning paths only on the DELETE change.
Also ensures that DELETE events are sent even if the file was recreated and even if
the file doesn't exist.
Args:
tmp_path: A tmp directory to run the file watcher on. Created by pytest.
"""
filename = tmp_path / "test-file"
file_watcher = FileWatcher(
paths=[str(tmp_path)],
event_types={EventType.DELETE},
force_polling=False,
)
write_timer = Timer(timedelta(seconds=0.1), SkipMissedAndDrift())
deletion_timer = Timer(timedelta(seconds=0.25), SkipMissedAndDrift())
number_of_write = 0
number_of_deletes = 0
number_of_events = 0
# We want to write to a file and then removed, and then write again (create it
# again) and remove it again and then stop.
# Because awatch takes some time to get notified by the OS, we need to stop writing
# while a delete was done, to make sure the file is not recreated before the
# deletion event arrives.
# For the second round of writes and then delete, we allow writing after the delete
# was done as an extra test.
#
# This is an example timeline for this test:
#
# |-----|--.--|-----|---o-|-----|--.--|-----|--o--|-----|-----|-----|-----|-----|
# W W D E W W D W W E
#
# Where:
# W: Write
# D: Delete
# E: FileWatcher Event
async for selected in select(file_watcher, write_timer, deletion_timer):
if selected_from(selected, write_timer):
if number_of_write >= 2 and number_of_events == 0:
continue
filename.write_text(f"{selected.message}")
number_of_write += 1
elif selected_from(selected, deletion_timer):
# Avoid removing the file twice
if not pathlib.Path(filename).is_file():
continue
os.remove(filename)
number_of_deletes += 1
elif selected_from(selected, file_watcher):
number_of_events += 1
if number_of_events >= 2:
break
assert number_of_deletes == 2
# Can be more because the watcher could take some time to trigger
assert number_of_write >= 3
assert number_of_events == 2
@pytest.mark.integration
async def test_file_watcher_exit_iterator(tmp_path: pathlib.Path) -> None:
"""Test breaking the file watcher iterator.
Args:
tmp_path: A tmp directory to run the file watcher on. Created by pytest.
"""
filename = tmp_path / "test-file"
number_of_writes = 0
expected_number_of_writes = 3
file_watcher = FileWatcher(
paths=[str(tmp_path)],
force_polling=True,
polling_interval=timedelta(seconds=0.05),
)
timer = Timer(timedelta(seconds=0.1), SkipMissedAndDrift())
async for selected in select(file_watcher, timer):
if selected_from(selected, timer):
filename.write_text(f"{selected.message}")
elif selected_from(selected, file_watcher):
number_of_writes += 1
if number_of_writes == expected_number_of_writes:
file_watcher._stop_event.set() # pylint: disable=protected-access
break
ready = await file_watcher.ready()
assert ready is False
with pytest.raises(ReceiverStoppedError):
file_watcher.consume()
assert number_of_writes == expected_number_of_writes