December 24, 2009

Integrating websockets with appengine applications

Today, I've been struggling with an experimental implementation for a pseudo server push on appengine application. So let me share it with you.

The only problem with appengine is that we can not utilize comet like capabilities on it because of its 30 seconds request limit.

In this article, I use external websockets server for a pseudo server push on appengine. Here is the diagram(Click for bigger image).



Let me explain this diagram a bit.

  1. When a client request for the content, 
  2. appengine webapp will returns the newest contents with javascripts for establishing a new websockets connection to an external websockets server.
  3. The browser opens a new websockets connection to the websockets server. This connection will remain as long as possible for notifying updates of the contents.
  4. Another browser requests for updating the contents(e.g. Posting a new comment...etc...).
  5. appengine webapp will save the state in the datastore, and give the newest contents to the client, notifying about this updates to the websockets server as well, simultaneously.
  6. On the websockets server, every server process will receive the notification, and tell their clients that there is some update via the persistent websockets connection.
  7. Now, the first browser knows that there is updated contents on the server, so (in this case) it makes an ajax request to the appengine webapp, and receives the newest contents.
I've implemented a simple chat with this architecture. Please visit and try it if you'd like. I've tested it only with Chrome 4 or higher(including chromium).

Now, let's walk through into the code. On the appengine side, when a new comment arives, I need to notify it to the websockets server, so I use urlfetch for this. Here is the code:

def index(request):
  form = CommentForm()
  if request.method == 'POST':
    if form.validate(request.form):
      if request.user.is_authenticated():
        form.save(owner=request.user)
      else:
        form.save()
      import time
      urlfetch.fetch('http://mars.shehas.net/update_webhook.cgi?'
                     + str(time.time()))
      return redirect(url_for('chat/index'))
  comments = Comment.all().order('-created').fetch(20)
  return render_to_response('chat/index.html',
                            {'form': form.as_widget(),
                             'comments': comments})

The most important thing is that after a new comment is saved, the handler makes an urlfetch call to the external websockets server for notification. It is also important to add time.time() string representation to the url because without this, appengine urlfetch server may cache the response, and this urlfetch call will be useless as a webhook.

On the client side, we have to create a websocket connection, and set appropriate callbacks on some events of the connection. I've wrote a new class for this.

update_check_socket.js
function UpdateCheckSocket(host, port, resource, statusElement, callback) {
  this.host = host;
  this.port = port;
  this.resource = resource;
  this.statusElement = statusElement;
  this.callback = callback;
  this.ws = new WebSocket("ws://"+this.host+":"+this.port+this.resource);
  this.ws.onopen = function(e) {
    statusElement.innerHTML='Web sockets connected';
  };
  this.ws.onmessage = function(e) {
    var newDiv = document.createElement('div');
    newDiv.innerHTML = e.origin + decodeURIComponent(e.data);
    statusElement.insertBefore(newDiv, statusElement.firstChild);
    if (decodeURIComponent(e.data) == 'UPDATED') {
      callback();
    }
  };
  this.ws.onclose = function(e) {
    var newDiv = document.createElement('div');
    newDiv.innerHTML = 'Web sockets closed';
    statusElement.insertBefore(newDiv, statusElement.firstChild);
  };
}

function UpdateCheckSocket_send(message) {
  if(typeof(message) == 'undefined' || message =='') {
    alert('no message...');
    return;
  }
  this.ws.send(encodeURIComponent(message));
}
UpdateCheckSocket.prototype.send = UpdateCheckSocket_send;

On the main html, there is a callback for retrieving the newest contents. In some cases, the connection will be closed unintentionally because some network routers might delete the NAT table when there has been no data  for few minutes. So there is also the code for avoiding this by sending 'NOOP' string to the server periodically.

Here is the code for main html(as long as I'm concerned, blogger could not handle html well).

Ok. Let's go to the websockets side. On the external websockets server, I need to 1) accept update notification from appengine webapp(webhook handler), 2) handle websockets connection and notify the update to the client.

Here is the code for the webhook.
update_webhook.cgi
#!/usr/bin/python

import sqlite3
import time
import sys

conn = sqlite3.connect("/tmp/test")
c = conn.cursor()
try:
  c.execute('create table updated (t real)')
  c.execute('insert into updated values (?)', (time.time(),))
except Exception, e:
  sys.stderr.write(str(e))
c.execute('update updated set t = ?', (time.time(),))
conn.commit()
c.close()
conn.close()

print "Content-type: text/plain\n"
print "OK"

This is a very simple script. Its an experimental implementation, so currently it does't check if the request really came from a particular appengine webapp. So do not use this code as is in any production environment.

The last piece is websocket wsh script(I used pywebsockets here).
import sqlite3
import time

from mod_pywebsocket import msgutil

CHECK_INTERVAL=1
_UPDATE_SIGNAL='UPDATED'

def web_socket_do_extra_handshake(request):
  pass  # Always accept.


def web_socket_transfer_data(request):
  last_checked = time.time()
  conn = sqlite3.connect("/tmp/test")
  c = conn.cursor()
  while True:
    time.sleep(CHECK_INTERVAL)
    c.execute('select t from updated')
    r = c.fetchone()
    if r and r[0] > last_checked:
      last_checked = r[0]
      msgutil.send_message(request, _UPDATE_SIGNAL)

Here, I use sqlite3 for recording the last update time. Using sqlite3 might not be appropriate for the production environment either, again, this is just an experimental implementaion :-)

Well that's about it. Actually it works, but I don't think this is the best approach. Maybe current implementation won't scale, it might be somewhat cumbersome to setup all of these complicated stuff. I hope I can make these set of code more sophisticated and general in the future, or I hope someone can write better code/architecture for similar purpose.

Merry X'mas and happy new year :-)

4 comments:

Makoto said...

Hi, Matsuo san. Great post!! It's a shame that you have to host WebSocket server externally to work around the 30 second limit, but I can see that it's still beneficial, if you want to offload most CRUD task to App Engine.
I think Google will eventually do some workaround on the 30 sec rule, as their App Engine support of Golang is in roadmap, and Golang has WebSocket as standard library.

http://go.googlecode.com/hg/doc/devel/roadmap.html

rasputnik said...

On the Java front, the Atmosphere project is able to support Google App Engine for COMET/long-polling apps.

Not sure of the internals as they wrap the underlying implementation, but my guess is they detect the 30-second timeout and reconnect.

See : http://atmosphere.dev.java.net/ for details.

tmatsuo said...

Hi Makoto,
Thanks for the comment. Good to know that Golang is going to be ported to appengine. Thats super cool.

Hi rasputnik,
Yes, that is definitely possible, but as the number of the users of your app increases, your app will exhaust 30 concurrent request limit soon. In my opinion, any long poling app like commet doesn't suit appengine well.

wsxwhx714 said...

IS VERY GOOD..............................