Skip to content

Commit e8ae803

Browse files
baranowbfl4via
authored andcommitted
[UNDERTOW-1934] invoke onClose in case of network failure
1 parent 6f620cd commit e8ae803

File tree

4 files changed

+66
-1
lines changed

4 files changed

+66
-1
lines changed

core/src/main/java/io/undertow/websockets/core/WebSocketChannel.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.undertow.websockets.extensions.ExtensionFunction;
2525
import org.xnio.ChannelExceptionHandler;
2626
import org.xnio.ChannelListener;
27+
import org.xnio.ChannelListener.SimpleSetter;
2728
import org.xnio.ChannelListeners;
2829
import org.xnio.IoUtils;
2930
import org.xnio.OptionMap;
@@ -82,6 +83,7 @@ public abstract class WebSocketChannel extends AbstractFramedChannel<WebSocketCh
8283
*/
8384
private final Set<WebSocketChannel> peerConnections;
8485

86+
private static final CloseMessage CLOSE_MSG = new CloseMessage(CloseMessage.GOING_AWAY, WebSocketMessages.MESSAGES.messageCloseWebSocket());
8587
/**
8688
* Create a new {@link WebSocketChannel}
8789
* 8
@@ -158,6 +160,15 @@ protected void lastDataRead() {
158160
} catch (IOException e) {
159161
IoUtils.safeClose(this);
160162
}
163+
final ChannelListener<?> listener = ((SimpleSetter<WebSocketChannel>)getReceiveSetter()).get();
164+
if(listener instanceof AbstractReceiveListener) {
165+
final AbstractReceiveListener abstractReceiveListener = (AbstractReceiveListener) listener;
166+
try {
167+
abstractReceiveListener.onCloseMessage(CLOSE_MSG, this);
168+
} catch(Exception e) {
169+
e.printStackTrace();
170+
}
171+
}
161172
}
162173
}
163174

core/src/main/java/io/undertow/websockets/core/WebSocketMessages.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,4 +171,7 @@ public interface WebSocketMessages {
171171

172172
@Message(id = 2045, value = "Unable to send on newly created channel!")
173173
IllegalStateException unableToSendOnNewChannel();
174+
175+
@Message(id = 2046, value = "Closing WebSocket, peer went away.")
176+
String messageCloseWebSocket();
174177
}

core/src/test/java/io/undertow/websockets/core/protocol/AbstractWebSocketServerTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626
import io.undertow.websockets.core.AbstractReceiveListener;
2727
import io.undertow.websockets.core.BufferedBinaryMessage;
2828
import io.undertow.websockets.core.BufferedTextMessage;
29+
import io.undertow.websockets.core.CloseMessage;
2930
import io.undertow.websockets.core.WebSocketCallback;
3031
import io.undertow.websockets.core.WebSocketChannel;
32+
import io.undertow.websockets.core.WebSocketMessages;
3133
import io.undertow.websockets.core.WebSockets;
3234
import io.undertow.websockets.spi.WebSocketHttpExchange;
3335
import io.undertow.websockets.utils.FrameChecker;
@@ -46,6 +48,7 @@
4648
import java.io.IOException;
4749
import java.net.URI;
4850
import java.nio.ByteBuffer;
51+
import java.util.concurrent.TimeUnit;
4952
import java.util.concurrent.atomic.AtomicBoolean;
5053

5154
/**
@@ -167,6 +170,50 @@ protected void onFullCloseMessage(WebSocketChannel channel, BufferedBinaryMessag
167170
client.destroy();
168171
}
169172

173+
@Test
174+
public void testCloseOnPeerGone() throws Exception {
175+
if (getVersion() == WebSocketVersion.V00) {
176+
// ignore 00 tests for now
177+
return;
178+
}
179+
final AtomicBoolean connected = new AtomicBoolean(false);
180+
final FutureResult<CloseMessage> latch = new FutureResult();
181+
DefaultServer.setRootHandler(new WebSocketProtocolHandshakeHandler(new WebSocketConnectionCallback() {
182+
@Override
183+
public void onConnect(final WebSocketHttpExchange exchange, final WebSocketChannel channel) {
184+
connected.set(true);
185+
channel.getReceiveSetter().set(new AbstractReceiveListener() {
186+
187+
@Override
188+
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) {
189+
Assert.fail();
190+
}
191+
192+
@Override
193+
protected void onCloseMessage(CloseMessage msg, WebSocketChannel channel) {
194+
latch.setResult(msg);
195+
}
196+
197+
@Override
198+
protected void onError(WebSocketChannel channel, Throwable t) {
199+
Assert.fail();
200+
}
201+
});
202+
channel.resumeReceives();
203+
}
204+
}));
205+
206+
WebSocketTestClient client = new WebSocketTestClient(getVersion(),
207+
new URI("ws://" + NetworkUtils.formatPossibleIpv6Address(DefaultServer.getHostAddress("default")) + ":"
208+
+ DefaultServer.getHostPort("default") + "/"));
209+
client.connect();
210+
client.destroy(true);
211+
latch.getIoFuture().await(5000, TimeUnit.MILLISECONDS);
212+
final CloseMessage msg = latch.getIoFuture().get();
213+
Assert.assertNotNull(msg);
214+
Assert.assertEquals(WebSocketMessages.MESSAGES.messageCloseWebSocket(), msg.getReason());
215+
}
216+
170217
protected WebSocketVersion getVersion() {
171218
return WebSocketVersion.V00;
172219
}

core/src/test/java/io/undertow/websockets/utils/WebSocketTestClient.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
137137
* Destroy the client and also close open connections if any exist
138138
*/
139139
public void destroy() {
140-
if (!closed) {
140+
this.destroy(false);
141+
}
142+
143+
public void destroy(boolean dirty) {
144+
if (!closed && !dirty) {
141145
final CountDownLatch latch = new CountDownLatch(1);
142146
send(new CloseWebSocketFrame(), new FrameListener() {
143147
@Override

0 commit comments

Comments
 (0)