Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to manage who is consuming from specific queue? #69

Open
achmadns opened this issue Jun 20, 2016 · 3 comments
Open

How to manage who is consuming from specific queue? #69

achmadns opened this issue Jun 20, 2016 · 3 comments

Comments

@achmadns
Copy link

Dear @rstoyanchev , I created something like "How not to use SockJS". Now I need to create 'Authorization' feature so that user A can only subscribed into '/amq/queue/user.a', and user B can only subscribed into '/amq/queue/user.b' identified by 'access_token' upon handshake.

I tried to play around with ApplicationListener<SessionSubscribeEvent> and tried to implement what StompSubProtocolHandler.afterSessionEnded() did yet still no luck. It is based on stupid guess actually. Could you please give me pointer?

Executing this code:

final StompSubProtocolHandler handler = (StompSubProtocolHandler) event.getSource();
        event.getMessage().getHeaders().get("simpSessionId");
        publisher.publishEvent(new SessionDisconnectEvent(handler, new GenericMessage<byte[]>("".getBytes()),
                (String) event.getMessage().getHeaders().get("simpSessionId"), CloseStatus.POLICY_VIOLATION));

Got NPE:

java.lang.NullPointerException: null
    at org.springframework.web.socket.messaging.DefaultSimpUserRegistry.onApplicationEvent(DefaultSimpUserRegistry.java:76) ~[spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.messaging.simp.user.MultiServerUserRegistry.onApplicationEvent(MultiServerUserRegistry.java:107) ~[spring-messaging-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:166) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:138) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:382) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:336) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at com.firstwap.stream.server.listener.SessionSubscribeEventListener.onApplicationEvent(SessionSubscribeEventListener.java:31) ~[classes/:na]
    at com.firstwap.stream.server.listener.SessionSubscribeEventListener.onApplicationEvent(SessionSubscribeEventListener.java:16) ~[classes/:na]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:166) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:138) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:382) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:336) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.messaging.StompSubProtocolHandler.publishEvent(StompSubProtocolHandler.java:365) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.messaging.StompSubProtocolHandler.handleMessageFromClient(StompSubProtocolHandler.java:287) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.messaging.SubProtocolWebSocketHandler.handleMessage(SubProtocolWebSocketHandler.java:307) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.handler.WebSocketHandlerDecorator.handleMessage(WebSocketHandlerDecorator.java:75) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.handler.LoggingWebSocketHandlerDecorator.handleMessage(LoggingWebSocketHandlerDecorator.java:56) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator.handleMessage(ExceptionWebSocketHandlerDecorator.java:58) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter.onWebSocketText(JettyWebSocketHandlerAdapter.java:83) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at sun.reflect.GeneratedMethodAccessor59.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_92]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_92]
    at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:71) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:72) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onTextMessage(JettyAnnotatedEventDriver.java:234) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.message.SimpleTextMessage.messageComplete(SimpleTextMessage.java:69) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.appendMessage(AbstractEventDriver.java:66) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onTextFrame(JettyAnnotatedEventDriver.java:226) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:162) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:367) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:176) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:105) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.extensions.compress.CompressExtension.forwardIncoming(CompressExtension.java:142) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:85) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:220) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:220) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:256) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.readParse(AbstractWebSocketConnection.java:675) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:505) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273) [jetty-io-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95) [jetty-io-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93) [jetty-io-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303) [jetty-util-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148) [jetty-util-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136) [jetty-util-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671) [jetty-util-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589) [jetty-util-9.3.9.v20160517.jar:9.3.9.v20160517]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]

Thanks in advance.

@rstoyanchev
Copy link
Owner

And you are aware that Spring Security already has this feautre? http://docs.spring.io/spring-security/site/docs/current/reference/html/websocket.html

@achmadns
Copy link
Author

Thank you for your quick response. I just get to know that feature. Unfortunately I still can't find my solution. After a subscription is filtered only for any principal that has role 'USER', I need to add additional step, to make sure one to one relationship between queue and user . Let's take an example, subscription is success if the destination contains the username that is being subscribed to achieve specific queue per user and allow only specific user accordingly. Here is the illustration:

@Override
    protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
        messages
                .nullDestMatcher().authenticated()
                .simpDestMatchers("/queueu/**").hasRole("USER").addInterceptor(
                new MessageSubscriptionInterceptor(){
                    public void intercept(SessionSubscribeEvent subscription, WebSocketHandler wsHandler, Principal principal, Map<String, Object> attributes){
                        final String destination = (String) subscription.getMessage().getHeaders().get("destination");
                        if(!destination.contains(principal.getName().toLowerCase())){
                            subscription.cancel("UNAUTHORIZED ACCESS!");
                        }
                    }
                }
        )
                .simpTypeMatchers(MESSAGE, SUBSCRIBE).denyAll()
                .anyMessage().denyAll();
    }

Anyway, thank you. Spring team makes developer's life easier.

@ghost
Copy link

ghost commented Oct 21, 2016

Regarding your need "additional step, to make sure one to one relationship between queue and user", I originally thought our product needed to enforce one to one, in order to avoid
collisions, however I learned that session IDs can handle that. See "User Destinations" section of the Websockets user guide. http://docs.spring.io/spring/docs/current/spring-framework-reference/html/websocket.html

     For example, a client might subscribe to the destination "/user/queue/position-updates". This destination will be handled by the UserDestinationMessageHandler and transformed into a destination unique to the user session, e.g. "/queue/position-updates-user123". This provides the convenience of subscribing to a generically named destination while at the same time ensuring no collisions with other users subscribing to the same destination so that each user can receive unique stock position updates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants