Install this theme
Atmosphere per Session Broadcaster

Atmosphere is a WebSocket/Comet Framework for Java (and other JVM languages) supporting transparently WebSockets, Server Side Events (SSE), Long-Polling, HTTP Streaming (Forever frame) and JSONP. The Atmosphere Framework is portable and can be deployed on any Web Server that supports the Servlet Specification 2.3.

This post demonstrates how to create a “per session” Atmosphere Broadcaster. One that uniquely associates itself with a client’s  http session and forms this way some kind of durable unique “communication channel” between the server and the client. 

To start a project following maven dependency is required in your pom.xml:

<dependency>
   <groupId>org.atmosphere</groupId>
   <artifactId>atmosphere-runtime</artifactId>
   <version>1.0.0</version>
</dependency>

Next we have to a create a HttpSessionListener implementation, instantiating new atmosphere broadcaster with a unique name each time a new http session is created and removing it, once this session will be destroyed by the container.

public class BroadcasterCreater implements HttpSessionListener {

	private static final Logger LOG = LoggerFactory.getLogger(BroadcasterCreater.class);

	public static final String BROADCASTER_ID_KEY = "net.javaforge.blog.atmosphere.broadcasterId";

	@Override
	public void sessionCreated(HttpSessionEvent se) {
		HttpSession session = se.getSession();
		String broadcasterId = "/broadcaster/" + session.getId();

		LOG.info("Creating broadcaster: {}", broadcasterId);
		BroadcasterFactory.getDefault().lookup(broadcasterId, true);
		session.setAttribute(BROADCASTER_ID_KEY, broadcasterId);
	}

	@Override
	public void sessionDestroyed(HttpSessionEvent se) {
		HttpSession session = se.getSession();
		String broadcasterId = (String) session.getAttribute(BROADCASTER_ID_KEY);

		LOG.info("Removing broadcaster: {}", broadcasterId);
		BroadcasterFactory.getDefault().remove(broadcasterId);
	}

}

Once new http session is created by the server, new atmosphere broadcaster will be instantiated by calling #lookup(..) method on the default BroadcasterFactory. Furthemore the identifier of the created broadcaster will be put into the http session as attribute under the BROADCASTER_ID_KEY key.

Next step is to implement an Atmosphere Handler, that opens our “communication channel” and responds to the incoming requests or pushes some messages to the client. In this example we’ll create just a “dummy” handler, that responds to the client with the same message it receives.

@AtmosphereHandlerService
public class UserSessionAwareAtmosphereHandler extends AbstractReflectorAtmosphereHandler {

	private static final Logger LOG = LoggerFactory.getLogger(UserSessionAwareAtmosphereHandler.class);

	@Override
	public void onRequest(AtmosphereResource resource) throws IOException {

		suspendAtmosphereResourceIfNecessary(resource);

		if ("POST".equalsIgnoreCase(resource.getRequest().getMethod())) {
			doBroadcast(resource);
		}
	}

	private void doBroadcast(AtmosphereResource resource) throws IOException {

		AtmosphereRequest req = resource.getRequest();

		String incomingMessage = req.getReader().readLine();
		LOG.info("Incoming message: '{}'", incomingMessage);

		Broadcaster broadcaster = null;
		if (incomingMessage.startsWith("@/broadcaster/")) {
			String broadcasterId = incomingMessage.substring(1, incomingMessage.indexOf(" "));
			broadcaster = lookupBroadcaster(broadcasterId);
		} else {
			broadcaster = lookupBroadcaster(req);
		}

		LOG.info("Broadcasting message with broadcaster  = {}", broadcaster.getID());
		broadcaster.broadcast("ACK! Sent message was: <strong>" + incomingMessage + "</strong>");
	}

	private void suspendAtmosphereResourceIfNecessary(
			AtmosphereResource resource) {

		AtmosphereRequest req = resource.getRequest();
		AtmosphereResponse resp = resource.getResponse();
		String method = req.getMethod();

		if ("GET".equalsIgnoreCase(method)) {

			LOG.info("GET request detected, suspending broadcaster...");

			// Log all events on the console, including WebSocket events.
			resource.addEventListener(new WebSocketEventListenerAdapter());

			resp.setContentType("text/html;charset=ISO-8859-1");

			Broadcaster b = lookupBroadcaster(req);
			resource.setBroadcaster(b);

			if (req.getHeader(HeaderConfig.X_ATMOSPHERE_TRANSPORT)
					.equalsIgnoreCase(HeaderConfig.LONG_POLLING_TRANSPORT)) {
				req.setAttribute(ApplicationConfig.RESUME_ON_BROADCAST, Boolean.TRUE);
				resource.suspend(-1, false);
			} else {
				resource.suspend(-1);
			}

			LOG.info("Broadcasting notification message to all connected users...");
			MetaBroadcaster.getDefault().broadcastTo(
					"/broadcaster/*",
					"new broadcaster connected: <strong>" + b.getID() + "</strong>");
		}

	}

	@Override
	public void destroy() {
	}

	private Broadcaster lookupBroadcaster(AtmosphereRequest req) {
		String broadcasterId = (String) req.getSession().getAttribute(BroadcasterCreater.BROADCASTER_ID_KEY);
		return lookupBroadcaster(broadcasterId);
	}

	private Broadcaster lookupBroadcaster(String broadcasterId) {
		LOG.info("Looking up for broadcaster: {}", broadcasterId);
		Broadcaster broadcaster = BroadcasterFactory.getDefault().lookup(broadcasterId);

		LOG.info("Broadcaster found : {}", broadcaster.getID());
		return broadcaster;
	}
}

Some explanations:

  • lookupBroadcaster(..) method scans BroadcasterFactory and returns a broadcaster instance with the requested identifier. This broadcaster definitely exists already, because it was created by out http session listener one step earlier.
  • suspendAtmosphereResourceIfNecessary(..) method implements a typical Atmosphere way to handle the first incoming GET request. It initiates hierby a durable communication channel. Furthermore note the usage of the Atmosphere’s MetaBroadcaster, sending notification to all connected broadcasters using “/broadcaster/*" pattern.
  • doBroadcast(..) method investigates the incoming message. If it starts with a  special “@/broadcaster/" prefix, then the handler responds to the broadcater explicitly mentioned in the message (this way you can send messages to other connected broadcasters/http sessions). Otherwise it responds to the broadcaster associated with the current http session.

Note! Atmosphere is not only able to respond to the incoming message. Due the durable communication channel between server and client you can push  messages to the client every time you want using Broadcaster.broadcast(..) method.

Next step is to connect all this things together by specifying them in the web.xml:

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee"
	xmlns:j2ee="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee    http://java.sun.com/xml/ns/j2ee/web-app_2.5.xsd">

	<listener>
		<listener-class>net.javaforge.blog.atmosphere.BroadcasterCreater</listener-class>
	</listener>

	<servlet>
		<description>AtmosphereServlet</description>
		<servlet-name>AtmosphereServlet</servlet-name>
		<servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class>
		<init-param>
			<param-name>org.atmosphere.cpr.AtmosphereHandler</param-name>
			<param-value>net.javaforge.blog.atmosphere.UserSessionAwareAtmosphereHandler
			</param-value>
		</init-param>
		<init-param>
			<param-name>org.atmosphere.useNative</param-name>
			<param-value>true</param-value>
		</init-param>
		<init-param>
			<param-name>org.atmosphere.cpr.sessionSupport</param-name>
			<param-value>true</param-value>
		</init-param>
		<load-on-startup>0</load-on-startup>
	</servlet>

	<servlet-mapping>
		<servlet-name>AtmosphereServlet</servlet-name>
		<url-pattern>/atmosphere/*</url-pattern>
	</servlet-mapping>

	<welcome-file-list>
		<welcome-file>index.jsp</welcome-file>
	</welcome-file-list>

</web-app> 

The only thing that needs to be demonstrated, is the message processing on the clientside. This can be done by adding  the jquery and the jquery.atmosphere.js libraries to your clientside code and by writing some javascript handling the communication.

Javascript snippet below briefly demonstrates how the clientside code can be implemented:

$(document).ready(
	function() {
		function subscribe() {
			var request = {
				url : document.location.toString() + 'atmosphere/<%=broadcasterId%>',
				transport : 'websocket'
			};

			request.onMessage = function(response) { // handle incoming message
				detectedTransport = response.transport;
				if (response.status == 200) {
					var data = response.responseBody;
					if (data.length > 0) {
						// do something  with the data....
					}
				}
			};
			subSocket = socket.subscribe(request);
		}

		function unsubscribe() {
			socket.unsubscribe();
		}

		function connect() {
			unsubscribe();
			subscribe();
		}

		function sendMessage(){
			var msg = .... ; // message to send
			subSocket.push({ data : msg ? msg : "<null>" });
		}
});

Now you are able to support a durable per-session connection in your web application.

Please consult the GitHub for the complete working example: https://github.com/bigpuritz/javaforge-blog/tree/master/atmosphere-per-session-broadcaster

Check out the sources and execute mvn clean install jetty:run in your console. This will build the project and start the embedded Jetty web server instance.

 
Blog comments powered by Disqus