QueryTool 1.1, even more to like!

Screen shot 2009-08-05 at 4.29.39 PM

A number of you have requested some new features, and some of them have made it into this new release. You can now add and delete domains from within the tool. Also, clicking on results lets you copy the cell or row into the clipboard. This later feature can be really handy for putting item names into queries. So, check it out! Once you download the jar, you run it by using this command;

java -jar QueryTool1.1.jar <accessId> <secretKey>

Amazon CloudWatch with Java/typica

Recently, Amazon announced that it’s CloudWatch service went into public beta. I’ve been involved with the private beta of this and the Elastic Load Balancing and Auto Scaling services. I’ve just completed testing of the CloudWatch monitoring service APIs in typica and thought I’d share some of what has been added.

First of all, the Jec2 class has 2 new methods, monitorInstances(..) and unmonitorInstances(..). They do exactly what you’d expect by turning monitoring on or off for one or more instances. What I think more people will use is the new flag on LaunchConfiguration to enable monitoring when you launch an instance. Also, if you describe instances, you’ll get the monitoring status back now also.

The real CloudWatch APIs are in their own package. I did this because it seems like while they are initially released for EC2, they are written to allow monitoring other service also (hence the namespace parameter). The new API has only two methods. The first lets you list the metrics you can query in the second call. To do this, you can use some code like this;

Monitoring mon = new Monitoring(props.getProperty(“aws.accessId”), props.getProperty(“aws.secretKey”));
List<Metric> metrix = mon.listMetrics();
for (Metric m : metrix) {
System.out.println(“name = “+m.getName()+”:”+m.getNamespace());
for (Dimension dim : m.getDimensions()) {
System.out.println(”   “+dim.getName()+”: “+dim.getValue());
}
}
Monitoring mon = new Monitoring(accessId, secretKey);
List<Metric> metrix = mon.listMetrics();
for (Metric m : metrix) {
	System.out.println("name = "+m.getName()+":"+m.getNamespace());
	for (Dimension dim : m.getDimensions()) {
		System.out.println("   "+dim.getName()+": "+dim.getValue());
	}
}
Here is some of the output (trucated because there is a lot more);
     [java] name = NetworkIn:AWS/EC2
     [java] name = NetworkOut:AWS/EC2
     [java]    ImageId: ami-85d037ec
     [java] name = NetworkOut:AWS/EC2
     [java] name = DiskWriteBytes:AWS/EC2
     [java]    InstanceType: m1.small
     [java] name = CPUUtilization:AWS/EC2
     [java]    InstanceType: m1.large
     [java] name = DiskWriteBytes:AWS/EC2
     [java]    InstanceType: m1.large
     [java] name = DiskReadOps:AWS/EC2
     [java]    InstanceId: i-1de3a674
     [java] name = DiskWriteOps:AWS/EC2
     [java]    InstanceType: m1.small
     [java] name = DiskReadOps:AWS/EC2
     [java]    ImageId: ami-24fa86b
     [java] name = DiskReadOps:AWS/EC2
     [java]    InstanceId: i-51423838

Once you have an instance or an image you’d like to monitor, you can use some code like this to fetch the data;

List<Statistics> stats = new ArrayList<Statistics>();
stats.add(Statistics.AVERAGE);

Map<String, String> dimensions = new HashMap<String, String>();
// can be InstanceId, InstanceType, ImageId
dimensions.put("ImageId", "ami-85d037ec");

Date end = new Date();	// that means now
end = new Date(end.getTime() + 3600000*5); // need to adjust for GMT
Date start = new Date(end.getTime() - 3600000*24);	// 1 days ago
MetricStatisticsResult result = mon.getMetricStatistics(
				60,	// must be multiple of 60
				stats,	// see above
				"AWS/EC2",
				dimensions,
				start,	// start of interval
				end,	// end of interval
				// can be NetworkIn, NetworkOut, DiskReadOps,
				// DiskWriteOps, DiskReadBytes, DiskWriteBytes,
				// CPUUtilization
				"CPUUtilization",
				StandardUnit.PERCENT,
				null);
System.out.println("metrics label = "+result.getLabel());
for (Datapoint dp : result.getDatapoints()) {
	System.out.println(dp.getTimestamp().getTime().toString()+
			" samples:"+dp.getSamples()+" "+dp.getAverage()+" "+dp.getUnit());
}
It can be useful monitor by ImageId when you’re running a pool of servers (like with the auto scaling service). I’ve tried to include comments within the code that indicate appropriate values because it can get complicated..
     [java] metrics label = CPUUtilization
     [java] Fri May 22 10:56:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 11:42:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 12:55:00 EDT 2009 samples:1.0 1.54 Percent
     [java] Fri May 22 12:41:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 13:10:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 10:09:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 12:51:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 12:40:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 10:07:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 13:41:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 10:34:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 12:01:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 10:17:00 EDT 2009 samples:1.0 0.39 Percent
     [java] Fri May 22 11:39:00 EDT 2009 samples:1.0 1.15 Percent
     [java] Fri May 22 10:06:00 EDT 2009 samples:1.0 0.38 Percent
     [java] Fri May 22 12:10:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 12:09:00 EDT 2009 samples:1.0 0.76 Percent
     [java] Fri May 22 13:46:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 10:39:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 12:11:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 12:03:00 EDT 2009 samples:1.0 1.15 Percent
     [java] Fri May 22 11:32:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 10:44:00 EDT 2009 samples:1.0 0.0 Percent
     [java] Fri May 22 12:45:00 EDT 2009 samples:1.0 0.0 Percent
This code is available in typica SVN as of r265. Look for typica release 1.6 which will contain CloudWatch, ElasticLoadBalancing and AutoScaling once a little more testing has been completed.

Query Tool for Amazon SimpleDB

Although these projects are never really finished, I can say I’ve completed version 1 of the Amazon SimpleDB Query Tool. This is built on top of a new SimpleDB API that will be part of an upcoming typica release. The code currently resides in a branch, but will hopefully get merges into trunk in the next few weeks.

OK, more about the tool. It was built to provide a convenient way to test queries. That’s it. Towards that end, there is a list of features I included that really met a need for me.

  • flexible query workspace (scratch pad)
  • run query on the line where the cursor is
  • allow domain selection via the UI
  • display domain metadata
  • show results from several queries at once
  • show box usage and other stats

Now, the moment you’ve all been waiting for, a screenshot!

querytool

Right now, the code is still in SVN, so if you’d like to run it, you’ll need to check out the branch and build it.  If you get that far, to run it, you can use this command, “ant test.main -Dclass=QueryTool -Dargs=”<access id> <secret key>”

For an official release, I’ll make an executable jar, so you’d run “java -jar QueryTool.jar <access id> <secret key>”

Data Push with GDS Gravity and Tomcat

I’ve been working on a project that uses the very nice application stack of mysql-hibernate-graniteds-flex and recently decided I needed to push data from the server to the client. Since I was already using GraniteDS for AMF remoting, I thought I’d take advantage of the Gravity package which does a Comet-like data push. The GDS documentation has some of the information I needed. Since I was running on OS X, I also needed these instructions for getting APR installed with tomcat (which gravity requires for native I/O). BTW, from what I read, APR is a must for any production use of tomcat since it increases I/O performance through native calls to the OS.

After getting this all set up, I was able to get my channel configured and subscribe to it from the flex client. In my tomcat log, I could see how the comet request timeouts happened at regular intervals (as you’d expect). I wasn’t, however, getting messages sent to the client. Since that is the whole purpose of this exercise, getting this working was very important!

After digging through the gravity code, it seems that the thing that was missing is that the sub_topic header needs to be set on the message. Once I set that to be the same as the client was expecting, it worked great! Here is the code for a servlet I extend for some REST web services which need to send a message to my flex app.

import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;

import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.Message;
import org.granite.gravity.AbstractChannel;
import org.granite.gravity.Gravity;
import org.granite.gravity.tomcat.TomcatChannelFactory;

public class GravityServlet extends HttpServlet {
	private Gravity gravity;
	private AbstractChannel pubChannel;

	public void init(ServletConfig config) throws ServletException {
		gravity = (Gravity)config.getServletContext().getAttribute("org.granite.gravity.Gravity");
		pubChannel = new AbstractChannel(gravity) {
				@Override protected void clearQueue() { }
				@Override public void deliver(
						AbstractChannel from,
						Message message,
						String subscriptionId
					) { }
   			 };
		gravity.registerChannel(pubChannel);
	}

	protected void sendMessage(String msg) {
		AsyncMessage message = new AsyncMessage();
		message.setBody(msg);
		message.setHeader(AsyncMessage.SUBTOPIC_HEADER, "discussion");
		message.setDestination("etlprocess");
		gravity.publishMessage(pubChannel, message);
	}
}

Here is the code that registers to receive the messages;

	// this code sets up listener for server push of EDL update status
	consumer = new Consumer();
	consumer.destination = "etlprocess";
	consumer.topic = "discussion";
	consumer.subscribe();
	consumer.addEventListener(MessageEvent.MESSAGE, handleETL);

The web.xml looks like this;

<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com
/dtd/web-app_2_3.dtd">
<web-app>
    <!-- read services-config.xml file at web application startup -->
    <listener>
        <listener-class>org.granite.config.GraniteConfigListener</listener-class>
    </listener>

    <!-- handle AMF requests ([de]serialization) -->
    <filter>
        <filter-name>AMFMessageFilter</filter-name>
        <filter-class>org.granite.messaging.webapp.AMFMessageFilter</filter-class>
    </filter>
    <filter-mapping>
        <filter-name>AMFMessageFilter</filter-name>
        <url-pattern>/graniteamf/*</url-pattern>
    </filter-mapping>

    <!-- handle AMF requests (execution) -->
    <servlet>
        <servlet-name>AMFMessageServlet</servlet-name>
        <servlet-class>org.granite.messaging.webapp.AMFMessageServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet>
        <servlet-name>GravityServlet</servlet-name>
        <servlet-class>org.granite.gravity.tomcat.GravityTomcatServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>AMFMessageServlet</servlet-name>
        <url-pattern>/graniteamf/*</url-pattern>
    </servlet-mapping>
    <servlet-mapping>
        <servlet-name>GravityServlet</servlet-name>
        <url-pattern>/gravity/*</url-pattern>
    </servlet-mapping>
</web-app>

One more piece that I seem to have left out. In the services-config.xml which configures graniteDS, I had to add a service def that defined my destination (in this case, “etlprocess”).

        <service id="messaging-service"
            class="flex.messaging.services.MessagingService"
            messageTypes="flex.messaging.messages.AsyncMessage">
            <adapters>
                <adapter-definition
                    id="default"
                    class="org.granite.gravity.adapters.SimpleServiceAdapter"
                    default="true"/>
            </adapters>

            <destination id="etlprocess">
                <channels>
                    <channel ref="my-gravityamf"/>
                </channels>
            </destination>
        </service>

Flash Socket Code and crossdomain Policy Serving

I’ve just spent the past day trying to get my flash app talking to another device on my network via socket 23. I found some sample telnet code (which operates on port 23) and allowed me to “talk” to the RFID reader. It worked fine as a new project in Flex Builder and being served from a local file. The moment I served the application from a web server (tomcat) on my laptop, I get crossdomain issues. Flash won’t open a socket that is different from the one that served your application unless that socket authorizes it. I will spare you the details that took many hours of my day. If you’re trying to talk to another web server on a different port, no problem.. just put the crossdomain.xml file on that server that authorizes the connection. In this case, I was trying to connect to another host and another port (which runs telnet, not http). The RFID reader can’t be modified to serve up a crossdomain.xml file, so I had to get creative.

My solution was to run a TCP proxy on my web server machine that proxied requests to the RFID reader. I made it listen on port 8023 and forward requests to 23 on the RFID reader. This was the start because I still got errors about that localhost:8023  not being authorized. It turns out that when you try the connection, flash connects to the socket and sends 23 bytes which contain “<policy-file-request/>”. Flash expects whatever is running at that port to respond with the policy string (that would have been in the crossdomain.xml file). So, I modified this little proxy class I got off the internet to recognize the proxy request and respond with a proxy string (null terminated.. that is very important!). Once I had this set up right, I was able to communicate from my flash app to my RFID reader. Not the most elegant solution, but this is something temporary for a demo.

 

To run the code below, compile with javac and invoke “java -classpath <class.file.location> ProxyThread 8023 192.168.1.39 23”. Those options are what I used to talk to my RFID reader, but you’ll likely use different values.

import java.net.*;
import java.io.*;
 
/*
  Java Transparent Proxy
  Copyright (C) 1999 by Didier Frick (http://www.dfr.ch/)
  This software is provided under the GNU general public license (http://www.gnu.org/copyleft/gpl.html).
*/

public class ProxyThread extends Thread {
     protected class StreamCopyThread extends Thread {
	private Socket inSock;
	private Socket outSock;
	private boolean done=false;
	private StreamCopyThread peer;
	private boolean inFromLocal;	// in from local port
	private OutputStream out;
 
	private String policy = "<cross-domain-policy>\n<allow-access-from domain=\"*\" to-ports=\"8023\"/>\n</cross-domain-policy>";
 
	public StreamCopyThread(Socket inSock, Socket outSock, boolean in) {
	    this.inSock=inSock;
	    this.outSock=outSock;
	    this.inFromLocal = in;
	}
 
	public void sendPolicy() {
		try {
			out.write(policy.getBytes());
			System.err.println("Sent policy");
		} catch (IOException ex) {
			System.err.println("Error sending policy file");
		}
	}
 
	public void run() {
	    byte[] buf=new byte[bufSize];
	    int count=-1;
	    try {
		InputStream in=inSock.getInputStream();
		out=outSock.getOutputStream();
		try {
		    while(((count=in.read(buf))>0)&&!isInterrupted()) {
		    	if (inFromLocal && count==23 && new String(buf).startsWith("<policy-file-request/>")) {
				// send policy file back.. don't forward this to other port
				System.err.println("Got policy request");
				peer.sendPolicy();
			}
			else {
				out.write(buf,0,count);
				//System.err.println(count+" bytes "+(inFromLocal?"sent":"received"));
			}
		    }
		} catch(Exception xc) {
		    if(debug) {
			// FIXME
			// It's very difficult to sort out between "normal"
			// exceptions (occuring when one end closes the connection
			// normally), and "exceptional" exceptions (when something
			// really goes wrong)
			// Therefore we only log exceptions occuring here if the debug flag
			// is true, in order to avoid cluttering up the log.
			err.println(header+":"+xc);
			xc.printStackTrace();
		    }
		} finally {
		    // The input and output streams will be closed when the sockets themselves
		    // are closed.
		    out.flush();
		}
	    } catch(Exception xc) {
		err.println(header+":"+xc);
		xc.printStackTrace();
	    }
	    synchronized(lock) {
		done=true;
		try {
		    if((peer==null)||peer.isDone()) {
			// Cleanup if there is only one peer OR
			// if _both_ peers are done
			inSock.close();
			outSock.close();
		    }
		    else 
			// Signal the peer (if any) that we're done on this side of the connection
			peer.interrupt();
		} catch(Exception xc) {
		    err.println(header+":"+xc);
		    xc.printStackTrace();
		} finally {
		    connections.removeElement(this);
		}
	    }
	}
 
	public boolean isDone() {
	    return done;
	}
    
	public void setPeer(StreamCopyThread peer) {
	    this.peer=peer;
	}
     }

    // Holds all the currently active StreamCopyThreads
    private java.util.Vector connections=new java.util.Vector();
    // Used to synchronize the connection-handling threads with this thread
    private Object lock=new Object();
    // The address to forward connections to
    private InetAddress dstAddr;
    // The port to forward connections to
    private int dstPort;
    // Backlog parameter used when creating the ServerSocket
    protected static final int backLog=100;
    // Timeout waiting for a StreamCopyThread to finish
    public static final int threadTimeout=2000; //ms
    // Linger time
    public static final int lingerTime=180; //seconds (?)
    // Size of receive buffer
    public static final int bufSize=2048;
    // Header to prepend to log messages
    private String header;
    // This proxy's server socket
    private ServerSocket srvSock;
    // Debug flag
    private boolean debug=false;
 
    // Log streams for output and error messages
    private PrintStream out;
    private PrintStream err;
 
    private static final String 
	argsMessage="Arguments: ( [source_address] source_port dest_address dest_port ) | config_file";
    private static final String 
	propertyPrefix="proxy";

 
    public ProxyThread(InetAddress srcAddr,int srcPort,
		       InetAddress dstAddr,int dstPort, PrintStream out, PrintStream err) 
	throws IOException {
	this.out=out;
	this.err=err;
	this.srvSock=(srcAddr==null) ? new ServerSocket(srcPort,backLog) :  
	    new ServerSocket(srcPort,backLog,srcAddr);
	this.dstAddr=dstAddr;
	this.dstPort=dstPort;
	this.header=(srcAddr==null ? "" : srcAddr.toString())+":"+srcPort+" <-> "+dstAddr+":"+dstPort;
	start();
    }
 
    public void run() {
	out.println(header+" : starting");
	try {
	    while(!isInterrupted()) {
		Socket serverSocket=srvSock.accept();
		try {
		    serverSocket.setSoLinger(true,lingerTime);
		    Socket clientSocket=new Socket(dstAddr,dstPort);
		    clientSocket.setSoLinger(true,lingerTime);
		    StreamCopyThread sToC=new StreamCopyThread(serverSocket,clientSocket, true);
		    StreamCopyThread cToS=new StreamCopyThread(clientSocket,serverSocket, false);
		    sToC.setPeer(cToS);
		    cToS.setPeer(sToC);
		    synchronized(lock) {
			connections.addElement(cToS);
			connections.addElement(sToC);
			sToC.start();
			cToS.start();
		    }
		} catch(Exception xc) {
		    err.println(header+":"+xc);
		    if(debug)
			xc.printStackTrace();
		}
	    }
	    srvSock.close();
	} catch(IOException xc) {
	    err.println(header+":"+xc);
	    if(debug)
		xc.printStackTrace();
	} finally {
	    cleanup();
	    out.println(header+" : stopped");
	}
    }
 
     private void cleanup() {
	synchronized(lock) {
	    try {
		while(connections.size()>0) {
		    StreamCopyThread sct=(StreamCopyThread)connections.elementAt(0);
		    sct.interrupt();
		    sct.join(threadTimeout);
		}
	    } catch(InterruptedException xc) {
	    }
	}
    }
 
    private static ProxyThread addProxy(String src,String srcPort, String dst, String dstPort,
					PrintStream out, PrintStream err) throws
					UnknownHostException, IOException
    {
	InetAddress srcAddr=(src==null) ? null : InetAddress.getByName(src);
	return new ProxyThread(srcAddr,Integer.parseInt(srcPort),
			       InetAddress.getByName(dst),Integer.parseInt(dstPort),out,err);
    }
 
    private static java.util.Vector parseConfigFile(String fileName,PrintStream out,PrintStream err) throws 
        FileNotFoundException, IOException, UnknownHostException
    {
	java.util.Vector result=new java.util.Vector();
	FileInputStream in=new FileInputStream(fileName);
	java.util.Properties props= new java.util.Properties();
	props.load(in);
	in.close();
	for(int i=0;;i++) {
	    String srcAddr=props.getProperty(propertyPrefix+"."+i+".sourceAddr");
	    String srcPort=props.getProperty(propertyPrefix+"."+i+".sourcePort");
	    if(srcPort==null)
		break;
	    String dstAddr=props.getProperty(propertyPrefix+"."+i+".destAddr");
	    String dstPort=props.getProperty(propertyPrefix+"."+i+".destPort");
	    if(dstAddr==null) {
		throw new IllegalArgumentException("Missing destination address for proxy "+i);
	    }
	    if(dstPort==null) {
		throw new IllegalArgumentException("Missing destination port for proxy "+i);
	    }
	    result.addElement(addProxy(srcAddr,srcPort,dstAddr,dstPort,out,err));
	}
	return result;
    }
 
    static java.util.Vector parseArguments(String[] argv,PrintStream out,PrintStream err) throws
        FileNotFoundException, IOException, UnknownHostException
    {
	java.util.Vector result=null;
	int argBase=0;
	String src=null;
	if(argv.length>1) {
	    if(argv.length>3) {
		argBase=1;
		src=argv[0];
	    }
	    result=new java.util.Vector();
	    result.addElement(addProxy(src,argv[argBase++],argv[argBase++],argv[argBase++],out,err));
	} else if(argv.length==1) {
	    result=parseConfigFile(argv[0],out,err);
	} else {
	    throw new IllegalArgumentException(argsMessage);
	}
	return result;
    }
 
    public static void main(String[] argv) throws Exception {
	System.out.println("Java Transparent Proxy");
	System.out.println("Copyright (C) 1999 by Didier Frick (http://www.dfr.ch/)");
	System.out.println("This software is provided under the GNU general public license"+
			   " (http://www.gnu.org/copyleft/gpl.html)");
	try {
	    parseArguments(argv,System.out,System.err);
	} catch(IllegalArgumentException xc) {
	    System.err.println(xc.getMessage());
	    System.exit(1);
	}
    }
}
The initial ProxyThread code came from here: http://www.dfr.ch/en/proxy.html