Data Push with GDS Gravity and Tomcat

I’ve been working on a project that uses the very nice application stack of mysql-hibernate-graniteds-flex and recently decided I needed to push data from the server to the client. Since I was already using GraniteDS for AMF remoting, I thought I’d take advantage of the Gravity package which does a Comet-like data push. The GDS documentation has some of the information I needed. Since I was running on OS X, I also needed these instructions for getting APR installed with tomcat (which gravity requires for native I/O). BTW, from what I read, APR is a must for any production use of tomcat since it increases I/O performance through native calls to the OS.

After getting this all set up, I was able to get my channel configured and subscribe to it from the flex client. In my tomcat log, I could see how the comet request timeouts happened at regular intervals (as you’d expect). I wasn’t, however, getting messages sent to the client. Since that is the whole purpose of this exercise, getting this working was very important!

After digging through the gravity code, it seems that the thing that was missing is that the sub_topic header needs to be set on the message. Once I set that to be the same as the client was expecting, it worked great! Here is the code for a servlet I extend for some REST web services which need to send a message to my flex app.

import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;

import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.Message;
import org.granite.gravity.AbstractChannel;
import org.granite.gravity.Gravity;
import org.granite.gravity.tomcat.TomcatChannelFactory;

public class GravityServlet extends HttpServlet {
	private Gravity gravity;
	private AbstractChannel pubChannel;

	public void init(ServletConfig config) throws ServletException {
		gravity = (Gravity)config.getServletContext().getAttribute("org.granite.gravity.Gravity");
		pubChannel = new AbstractChannel(gravity) {
				@Override protected void clearQueue() { }
				@Override public void deliver(
						AbstractChannel from,
						Message message,
						String subscriptionId
					) { }
   			 };
		gravity.registerChannel(pubChannel);
	}

	protected void sendMessage(String msg) {
		AsyncMessage message = new AsyncMessage();
		message.setBody(msg);
		message.setHeader(AsyncMessage.SUBTOPIC_HEADER, "discussion");
		message.setDestination("etlprocess");
		gravity.publishMessage(pubChannel, message);
	}
}

Here is the code that registers to receive the messages;

	// this code sets up listener for server push of EDL update status
	consumer = new Consumer();
	consumer.destination = "etlprocess";
	consumer.topic = "discussion";
	consumer.subscribe();
	consumer.addEventListener(MessageEvent.MESSAGE, handleETL);

The web.xml looks like this;

<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com
/dtd/web-app_2_3.dtd">
<web-app>
    <!-- read services-config.xml file at web application startup -->
    <listener>
        <listener-class>org.granite.config.GraniteConfigListener</listener-class>
    </listener>

    <!-- handle AMF requests ([de]serialization) -->
    <filter>
        <filter-name>AMFMessageFilter</filter-name>
        <filter-class>org.granite.messaging.webapp.AMFMessageFilter</filter-class>
    </filter>
    <filter-mapping>
        <filter-name>AMFMessageFilter</filter-name>
        <url-pattern>/graniteamf/*</url-pattern>
    </filter-mapping>

    <!-- handle AMF requests (execution) -->
    <servlet>
        <servlet-name>AMFMessageServlet</servlet-name>
        <servlet-class>org.granite.messaging.webapp.AMFMessageServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet>
        <servlet-name>GravityServlet</servlet-name>
        <servlet-class>org.granite.gravity.tomcat.GravityTomcatServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>AMFMessageServlet</servlet-name>
        <url-pattern>/graniteamf/*</url-pattern>
    </servlet-mapping>
    <servlet-mapping>
        <servlet-name>GravityServlet</servlet-name>
        <url-pattern>/gravity/*</url-pattern>
    </servlet-mapping>
</web-app>

One more piece that I seem to have left out. In the services-config.xml which configures graniteDS, I had to add a service def that defined my destination (in this case, “etlprocess”).

        <service id="messaging-service"
            class="flex.messaging.services.MessagingService"
            messageTypes="flex.messaging.messages.AsyncMessage">
            <adapters>
                <adapter-definition
                    id="default"
                    class="org.granite.gravity.adapters.SimpleServiceAdapter"
                    default="true"/>
            </adapters>

            <destination id="etlprocess">
                <channels>
                    <channel ref="my-gravityamf"/>
                </channels>
            </destination>
        </service>

Flash Socket Code and crossdomain Policy Serving

I’ve just spent the past day trying to get my flash app talking to another device on my network via socket 23. I found some sample telnet code (which operates on port 23) and allowed me to “talk” to the RFID reader. It worked fine as a new project in Flex Builder and being served from a local file. The moment I served the application from a web server (tomcat) on my laptop, I get crossdomain issues. Flash won’t open a socket that is different from the one that served your application unless that socket authorizes it. I will spare you the details that took many hours of my day. If you’re trying to talk to another web server on a different port, no problem.. just put the crossdomain.xml file on that server that authorizes the connection. In this case, I was trying to connect to another host and another port (which runs telnet, not http). The RFID reader can’t be modified to serve up a crossdomain.xml file, so I had to get creative.

My solution was to run a TCP proxy on my web server machine that proxied requests to the RFID reader. I made it listen on port 8023 and forward requests to 23 on the RFID reader. This was the start because I still got errors about that localhost:8023  not being authorized. It turns out that when you try the connection, flash connects to the socket and sends 23 bytes which contain “<policy-file-request/>”. Flash expects whatever is running at that port to respond with the policy string (that would have been in the crossdomain.xml file). So, I modified this little proxy class I got off the internet to recognize the proxy request and respond with a proxy string (null terminated.. that is very important!). Once I had this set up right, I was able to communicate from my flash app to my RFID reader. Not the most elegant solution, but this is something temporary for a demo.

 

To run the code below, compile with javac and invoke “java -classpath <class.file.location> ProxyThread 8023 192.168.1.39 23”. Those options are what I used to talk to my RFID reader, but you’ll likely use different values.

import java.net.*;
import java.io.*;
 
/*
  Java Transparent Proxy
  Copyright (C) 1999 by Didier Frick (http://www.dfr.ch/)
  This software is provided under the GNU general public license (http://www.gnu.org/copyleft/gpl.html).
*/

public class ProxyThread extends Thread {
     protected class StreamCopyThread extends Thread {
	private Socket inSock;
	private Socket outSock;
	private boolean done=false;
	private StreamCopyThread peer;
	private boolean inFromLocal;	// in from local port
	private OutputStream out;
 
	private String policy = "<cross-domain-policy>\n<allow-access-from domain=\"*\" to-ports=\"8023\"/>\n</cross-domain-policy>";
 
	public StreamCopyThread(Socket inSock, Socket outSock, boolean in) {
	    this.inSock=inSock;
	    this.outSock=outSock;
	    this.inFromLocal = in;
	}
 
	public void sendPolicy() {
		try {
			out.write(policy.getBytes());
			System.err.println("Sent policy");
		} catch (IOException ex) {
			System.err.println("Error sending policy file");
		}
	}
 
	public void run() {
	    byte[] buf=new byte[bufSize];
	    int count=-1;
	    try {
		InputStream in=inSock.getInputStream();
		out=outSock.getOutputStream();
		try {
		    while(((count=in.read(buf))>0)&&!isInterrupted()) {
		    	if (inFromLocal && count==23 && new String(buf).startsWith("<policy-file-request/>")) {
				// send policy file back.. don't forward this to other port
				System.err.println("Got policy request");
				peer.sendPolicy();
			}
			else {
				out.write(buf,0,count);
				//System.err.println(count+" bytes "+(inFromLocal?"sent":"received"));
			}
		    }
		} catch(Exception xc) {
		    if(debug) {
			// FIXME
			// It's very difficult to sort out between "normal"
			// exceptions (occuring when one end closes the connection
			// normally), and "exceptional" exceptions (when something
			// really goes wrong)
			// Therefore we only log exceptions occuring here if the debug flag
			// is true, in order to avoid cluttering up the log.
			err.println(header+":"+xc);
			xc.printStackTrace();
		    }
		} finally {
		    // The input and output streams will be closed when the sockets themselves
		    // are closed.
		    out.flush();
		}
	    } catch(Exception xc) {
		err.println(header+":"+xc);
		xc.printStackTrace();
	    }
	    synchronized(lock) {
		done=true;
		try {
		    if((peer==null)||peer.isDone()) {
			// Cleanup if there is only one peer OR
			// if _both_ peers are done
			inSock.close();
			outSock.close();
		    }
		    else 
			// Signal the peer (if any) that we're done on this side of the connection
			peer.interrupt();
		} catch(Exception xc) {
		    err.println(header+":"+xc);
		    xc.printStackTrace();
		} finally {
		    connections.removeElement(this);
		}
	    }
	}
 
	public boolean isDone() {
	    return done;
	}
    
	public void setPeer(StreamCopyThread peer) {
	    this.peer=peer;
	}
     }

    // Holds all the currently active StreamCopyThreads
    private java.util.Vector connections=new java.util.Vector();
    // Used to synchronize the connection-handling threads with this thread
    private Object lock=new Object();
    // The address to forward connections to
    private InetAddress dstAddr;
    // The port to forward connections to
    private int dstPort;
    // Backlog parameter used when creating the ServerSocket
    protected static final int backLog=100;
    // Timeout waiting for a StreamCopyThread to finish
    public static final int threadTimeout=2000; //ms
    // Linger time
    public static final int lingerTime=180; //seconds (?)
    // Size of receive buffer
    public static final int bufSize=2048;
    // Header to prepend to log messages
    private String header;
    // This proxy's server socket
    private ServerSocket srvSock;
    // Debug flag
    private boolean debug=false;
 
    // Log streams for output and error messages
    private PrintStream out;
    private PrintStream err;
 
    private static final String 
	argsMessage="Arguments: ( [source_address] source_port dest_address dest_port ) | config_file";
    private static final String 
	propertyPrefix="proxy";

 
    public ProxyThread(InetAddress srcAddr,int srcPort,
		       InetAddress dstAddr,int dstPort, PrintStream out, PrintStream err) 
	throws IOException {
	this.out=out;
	this.err=err;
	this.srvSock=(srcAddr==null) ? new ServerSocket(srcPort,backLog) :  
	    new ServerSocket(srcPort,backLog,srcAddr);
	this.dstAddr=dstAddr;
	this.dstPort=dstPort;
	this.header=(srcAddr==null ? "" : srcAddr.toString())+":"+srcPort+" <-> "+dstAddr+":"+dstPort;
	start();
    }
 
    public void run() {
	out.println(header+" : starting");
	try {
	    while(!isInterrupted()) {
		Socket serverSocket=srvSock.accept();
		try {
		    serverSocket.setSoLinger(true,lingerTime);
		    Socket clientSocket=new Socket(dstAddr,dstPort);
		    clientSocket.setSoLinger(true,lingerTime);
		    StreamCopyThread sToC=new StreamCopyThread(serverSocket,clientSocket, true);
		    StreamCopyThread cToS=new StreamCopyThread(clientSocket,serverSocket, false);
		    sToC.setPeer(cToS);
		    cToS.setPeer(sToC);
		    synchronized(lock) {
			connections.addElement(cToS);
			connections.addElement(sToC);
			sToC.start();
			cToS.start();
		    }
		} catch(Exception xc) {
		    err.println(header+":"+xc);
		    if(debug)
			xc.printStackTrace();
		}
	    }
	    srvSock.close();
	} catch(IOException xc) {
	    err.println(header+":"+xc);
	    if(debug)
		xc.printStackTrace();
	} finally {
	    cleanup();
	    out.println(header+" : stopped");
	}
    }
 
     private void cleanup() {
	synchronized(lock) {
	    try {
		while(connections.size()>0) {
		    StreamCopyThread sct=(StreamCopyThread)connections.elementAt(0);
		    sct.interrupt();
		    sct.join(threadTimeout);
		}
	    } catch(InterruptedException xc) {
	    }
	}
    }
 
    private static ProxyThread addProxy(String src,String srcPort, String dst, String dstPort,
					PrintStream out, PrintStream err) throws
					UnknownHostException, IOException
    {
	InetAddress srcAddr=(src==null) ? null : InetAddress.getByName(src);
	return new ProxyThread(srcAddr,Integer.parseInt(srcPort),
			       InetAddress.getByName(dst),Integer.parseInt(dstPort),out,err);
    }
 
    private static java.util.Vector parseConfigFile(String fileName,PrintStream out,PrintStream err) throws 
        FileNotFoundException, IOException, UnknownHostException
    {
	java.util.Vector result=new java.util.Vector();
	FileInputStream in=new FileInputStream(fileName);
	java.util.Properties props= new java.util.Properties();
	props.load(in);
	in.close();
	for(int i=0;;i++) {
	    String srcAddr=props.getProperty(propertyPrefix+"."+i+".sourceAddr");
	    String srcPort=props.getProperty(propertyPrefix+"."+i+".sourcePort");
	    if(srcPort==null)
		break;
	    String dstAddr=props.getProperty(propertyPrefix+"."+i+".destAddr");
	    String dstPort=props.getProperty(propertyPrefix+"."+i+".destPort");
	    if(dstAddr==null) {
		throw new IllegalArgumentException("Missing destination address for proxy "+i);
	    }
	    if(dstPort==null) {
		throw new IllegalArgumentException("Missing destination port for proxy "+i);
	    }
	    result.addElement(addProxy(srcAddr,srcPort,dstAddr,dstPort,out,err));
	}
	return result;
    }
 
    static java.util.Vector parseArguments(String[] argv,PrintStream out,PrintStream err) throws
        FileNotFoundException, IOException, UnknownHostException
    {
	java.util.Vector result=null;
	int argBase=0;
	String src=null;
	if(argv.length>1) {
	    if(argv.length>3) {
		argBase=1;
		src=argv[0];
	    }
	    result=new java.util.Vector();
	    result.addElement(addProxy(src,argv[argBase++],argv[argBase++],argv[argBase++],out,err));
	} else if(argv.length==1) {
	    result=parseConfigFile(argv[0],out,err);
	} else {
	    throw new IllegalArgumentException(argsMessage);
	}
	return result;
    }
 
    public static void main(String[] argv) throws Exception {
	System.out.println("Java Transparent Proxy");
	System.out.println("Copyright (C) 1999 by Didier Frick (http://www.dfr.ch/)");
	System.out.println("This software is provided under the GNU general public license"+
			   " (http://www.gnu.org/copyleft/gpl.html)");
	try {
	    parseArguments(argv,System.out,System.err);
	} catch(IllegalArgumentException xc) {
	    System.err.println(xc.getMessage());
	    System.exit(1);
	}
    }
}
The initial ProxyThread code came from here: http://www.dfr.ch/en/proxy.html