Skip to content

Commit

Permalink
feat(Net): Non-blocking WebSocket #4773
Browse files Browse the repository at this point in the history
  • Loading branch information
obiltschnig committed Nov 16, 2024
1 parent 5d69308 commit cedd086
Show file tree
Hide file tree
Showing 6 changed files with 436 additions and 107 deletions.
10 changes: 8 additions & 2 deletions Net/include/Poco/Net/WebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,21 @@ class Net_API WebSocket: public StreamSocket

#endif //POCO_NEW_STATE_ON_MOVE

void shutdown();
int shutdown();
/// Sends a Close control frame to the server end of
/// the connection to initiate an orderly shutdown
/// of the connection.
///
/// Returns the number of bytes sent or -1 if the socket
/// is non-blocking and the frame cannot be sent at this time.

void shutdown(Poco::UInt16 statusCode, const std::string& statusMessage = "");
int shutdown(Poco::UInt16 statusCode, const std::string& statusMessage = "");
/// Sends a Close control frame to the server end of
/// the connection to initiate an orderly shutdown
/// of the connection.
///
/// Returns the number of bytes sent or -1 if the socket
/// is non-blocking and the frame cannot be sent at this time.

int sendFrame(const void* buffer, int length, int flags = FRAME_TEXT);
/// Sends the contents of the given buffer through
Expand Down
43 changes: 36 additions & 7 deletions Net/include/Poco/Net/WebSocketImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,16 @@ class Net_API WebSocketImpl: public StreamSocketImpl
virtual void sendUrgent(unsigned char data);
virtual int available();
virtual bool secure() const;
virtual void setSendBufferSize(int size);
virtual int getSendBufferSize();
virtual void setReceiveBufferSize(int size);
virtual int getReceiveBufferSize();
virtual void setSendTimeout(const Poco::Timespan& timeout);
virtual Poco::Timespan getSendTimeout();
virtual void setReceiveTimeout(const Poco::Timespan& timeout);
virtual Poco::Timespan getReceiveTimeout();
virtual void setBlocking(bool flag);
virtual bool getBlocking() const;

// Internal
int frameFlags() const;
Expand All @@ -93,13 +99,35 @@ class Net_API WebSocketImpl: public StreamSocketImpl
enum
{
FRAME_FLAG_MASK = 0x80,
MAX_HEADER_LENGTH = 14
MAX_HEADER_LENGTH = 14,
MASK_LENGTH = 4
};

int receiveHeader(char mask[4], bool& useMask);
int receivePayload(char *buffer, int payloadLength, char mask[4], bool useMask);
int receiveNBytes(void* buffer, int bytes);
int receiveSomeBytes(char* buffer, int bytes);
struct ReceiveState
{
int frameFlags = 0;
bool useMask = false;
char mask[MASK_LENGTH];
int headerLength = 0;
int payloadLength = 0;
int remainingPayloadLength = 0;
Poco::Buffer<char> payload{0};
};

struct SendState
{
int length = 0;
int remainingPayloadOffset = 0;
int remainingPayloadLength = 0;
Poco::Buffer<char> payload{0};
};

int peekHeader(ReceiveState& receiveState);
void skipHeader(int headerLength);
int receivePayload(char *buffer, int payloadLength, char mask[MASK_LENGTH], bool useMask);
int receiveNBytes(void* buffer, int length);
int receiveSomeBytes(char* buffer, int length);
int peekSomeBytes(char* buffer, int length);
virtual ~WebSocketImpl();

private:
Expand All @@ -109,8 +137,9 @@ class Net_API WebSocketImpl: public StreamSocketImpl
int _maxPayloadSize;
Poco::Buffer<char> _buffer;
int _bufferOffset;
int _frameFlags;
bool _mustMaskPayload;
ReceiveState _receiveState;
SendState _sendState;
Poco::Random _rnd;
};

Expand All @@ -120,7 +149,7 @@ class Net_API WebSocketImpl: public StreamSocketImpl
//
inline int WebSocketImpl::frameFlags() const
{
return _frameFlags;
return _receiveState.frameFlags;
}


Expand Down
36 changes: 4 additions & 32 deletions Net/src/WebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,48 +107,20 @@ WebSocket& WebSocket::operator = (const Socket& socket)
}


#ifdef POCO_NEW_STATE_ON_MOVE

WebSocket& WebSocket::operator = (Socket&& socket)
{
if (dynamic_cast<WebSocketImpl*>(socket.impl()))
Socket::operator = (std::move(socket));
else
throw InvalidArgumentException("Cannot assign incompatible socket");
return *this;
}


WebSocket& WebSocket::operator = (WebSocket&& socket)
{
Socket::operator = (std::move(socket));
return *this;
}

#endif // POCO_NEW_STATE_ON_MOVE


WebSocket& WebSocket::operator = (const WebSocket& socket)
{
Socket::operator = (socket);
return *this;
}


void WebSocket::shutdown()
int WebSocket::shutdown()
{
shutdown(WS_NORMAL_CLOSE);
return shutdown(WS_NORMAL_CLOSE);
}


void WebSocket::shutdown(Poco::UInt16 statusCode, const std::string& statusMessage)
int WebSocket::shutdown(Poco::UInt16 statusCode, const std::string& statusMessage)
{
Poco::Buffer<char> buffer(statusMessage.size() + 2);
Poco::MemoryOutputStream ostr(buffer.begin(), buffer.size());
Poco::BinaryWriter writer(ostr, Poco::BinaryWriter::NETWORK_BYTE_ORDER);
writer << statusCode;
writer.writeRaw(statusMessage);
sendFrame(buffer.begin(), static_cast<int>(ostr.charsWritten()), FRAME_FLAG_FIN | FRAME_OP_CLOSE);
return sendFrame(buffer.begin(), static_cast<int>(ostr.charsWritten()), FRAME_FLAG_FIN | FRAME_OP_CLOSE);
}


Expand Down
Loading

0 comments on commit cedd086

Please sign in to comment.