Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Stop ignoring tests/server.py #15084

Merged
merged 21 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion tests/http/federation/test_matrix_federation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
IOpenSSLClientConnectionCreator,
IProtocolFactory,
)
from twisted.internet.protocol import Factory
from twisted.internet.protocol import Factory, Protocol
from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol
from twisted.web._newclient import ResponseNeverReceived
from twisted.web.client import Agent
Expand Down Expand Up @@ -466,7 +466,10 @@ def _do_get_via_proxy(
else:
assert isinstance(proxy_server_transport, FakeTransport)
client_protocol = proxy_server_transport.other
assert isinstance(client_protocol, Protocol)
c2s_transport = client_protocol.transport
assert c2s_transport is not None
assert isinstance(c2s_transport, FakeTransport)
clokep marked this conversation as resolved.
Show resolved Hide resolved
c2s_transport.other = server_ssl_protocol

self.reactor.advance(0)
Expand Down
5 changes: 4 additions & 1 deletion tests/http/test_proxyagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
_WrappingProtocol,
)
from twisted.internet.interfaces import IProtocol, IProtocolFactory
from twisted.internet.protocol import Factory
from twisted.internet.protocol import Factory, Protocol
from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol
from twisted.web.http import HTTPChannel

Expand Down Expand Up @@ -644,7 +644,10 @@ def _do_https_request_via_proxy(
else:
assert isinstance(proxy_server_transport, FakeTransport)
client_protocol = proxy_server_transport.other
assert isinstance(client_protocol, Protocol)
c2s_transport = client_protocol.transport
assert c2s_transport is not None
assert isinstance(c2s_transport, FakeTransport)
c2s_transport.other = server_ssl_protocol

self.reactor.advance(0)
Expand Down
43 changes: 22 additions & 21 deletions tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
List,
MutableMapping,
Optional,
Sequence,
Tuple,
Type,
Union,
Expand Down Expand Up @@ -573,7 +574,7 @@ def get_clock() -> Tuple[ThreadedMemoryReactorClock, Clock]:


@implementer(ITransport)
@attr.s(cmp=False)
@attr.s(cmp=False, auto_attribs=True)
class FakeTransport:
"""
A twisted.internet.interfaces.ITransport implementation which sends all its data
Expand All @@ -588,48 +589,48 @@ class FakeTransport:
If you want bidirectional communication, you'll need two instances.
"""

other = attr.ib()
other: IProtocol
"""The Protocol object which will receive any data written to this transport.

:type: twisted.internet.interfaces.IProtocol
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""

_reactor = attr.ib()
_reactor: IReactorTime
"""Test reactor

:type: twisted.internet.interfaces.IReactorTime
"""

_protocol = attr.ib(default=None)
_protocol: Optional[IProtocol] = None
"""The Protocol which is producing data for this transport. Optional, but if set
will get called back for connectionLost() notifications etc.
"""

_peer_address: Optional[IAddress] = attr.ib(default=None)
_peer_address: Optional[IAddress] = None
"""The value to be returned by getPeer"""

_host_address: Optional[IAddress] = attr.ib(default=None)
_host_address: Optional[IAddress] = None
"""The value to be returned by getHost"""

disconnecting = False
disconnected = False
connected = True
buffer = attr.ib(default=b"")
producer = attr.ib(default=None)
autoflush = attr.ib(default=True)
buffer: bytes = attr.Factory(bytes)
clokep marked this conversation as resolved.
Show resolved Hide resolved
producer: Optional[IPushProducer] = None
autoflush: bool = True

def getPeer(self) -> Optional[IAddress]:
return self._peer_address

def getHost(self) -> Optional[IAddress]:
return self._host_address

def loseConnection(self, reason=None):
def loseConnection(self, reason: Optional[Failure] = None) -> None:
if not self.disconnecting:
logger.info("FakeTransport: loseConnection(%s)", reason)
self.disconnecting = True
if self._protocol:
self._protocol.connectionLost(reason)
self._protocol.connectionLost(reason) # type: ignore[arg-type]
clokep marked this conversation as resolved.
Show resolved Hide resolved

# if we still have data to write, delay until that is done
if self.buffer:
Expand All @@ -640,38 +641,38 @@ def loseConnection(self, reason=None):
self.connected = False
self.disconnected = True

def abortConnection(self):
def abortConnection(self) -> None:
logger.info("FakeTransport: abortConnection()")

if not self.disconnecting:
self.disconnecting = True
if self._protocol:
self._protocol.connectionLost(None)
self._protocol.connectionLost(None) # type: ignore[arg-type]

self.disconnected = True

def pauseProducing(self):
def pauseProducing(self) -> None:
if not self.producer:
return

self.producer.pauseProducing()

def resumeProducing(self):
def resumeProducing(self) -> None:
if not self.producer:
return
self.producer.resumeProducing()

def unregisterProducer(self):
def unregisterProducer(self) -> None:
if not self.producer:
return

self.producer = None

def registerProducer(self, producer, streaming):
def registerProducer(self, producer: IPushProducer, streaming: bool) -> None:
self.producer = producer
self.producerStreaming = streaming

def _produce():
def _produce() -> None:
if not self.producer:
# we've been unregistered
return
Expand All @@ -683,7 +684,7 @@ def _produce():
if not streaming:
self._reactor.callLater(0.0, _produce)

def write(self, byt):
def write(self, byt: bytes) -> None:
if self.disconnecting:
raise Exception("Writing to disconnecting FakeTransport")

Expand All @@ -695,11 +696,11 @@ def write(self, byt):
if self.autoflush:
self._reactor.callLater(0.0, self.flush)

def writeSequence(self, seq):
def writeSequence(self, seq: Iterable[bytes]) -> None:
for x in seq:
self.write(x)

def flush(self, maxbytes=None):
def flush(self, maxbytes: Optional[int] = None) -> None:
if not self.buffer:
# nothing to do. Don't write empty buffers: it upsets the
# TLSMemoryBIOProtocol
Expand Down