Hopefully on Wednesday Blogmonitor, a new project of Interdings will be open for the public. In the last days and hours I was working on the API-components. An XML/RPC-interface, an REST-service and as a nice goodie a multithreaded TCP-server where you can watch all the incoming pings when they occur. The concept is the following: when an incoming ping occurs, the encapsulated class Blogmonitor::Pingservice::Ping sends a notification to the TCP-component, which waits for incoming connections on a custom port and serves the snippet to all of the connected clients. This is invented as really simple interface and has nothing to do with more standartized technologies like »Publish & Subscribe«, also the logic would be similiar.
Ruby ships an example of a multihreaded server implementation which is called GServer. It provides the possibility to create a child-class of it, overload the serve()-method and be happy. That’s really a good beginning and the concept of having patterns seems to succeed again: don’t code things twice.
<ol><li>require 'gserver'</li><li>class MyServer < GServer</li><li> def serve( io )</li><li> io.puts( "Hello world" )</li><li> end</li><li>end</li><li></li><li>s = MyServer.new 1234</li><li>s.start</li><li>s.join</li></ol>
The code should be more or less self-explaining: load the pattern, extend the class GServer and overload the method server(), which provides an IO-object. The method puts() of the IO-object writes »Hello world« to connected client.
Just create an instance, start() the server, join().
Connect with telnet to localhost and port 1234:
telnet localhost 1234 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. <strong>Hello World</strong> Connection closed by foreign host.
Now you have a simple TCP-Server. But we miss a few things: on the one-hand the backend notification is not implemented and the connection is not persistent. If a client connects, it should not be disconnected just to receive a stream of incoming weblogs. First of all: implement the streaming thingy. Have a look at the lines 3-5 of example I: the serve()-method is executed on a new connection and disconnects the client when it method is called. But we want to have it persistant. A dirty solution would be to write the io-code in an endless loop, which works pretty fine.
<ol><li>require 'gserver'</li><li>class MyServer < GServer</li><li> def serve( io )</li><li> loop do</li><li> io.puts( "Hello World" )</li><li> end</li><li> end</li><li>end</li><li></li><li>s = MyServer.new 1234</li><li>s.start</li><li>s.join</li></ol>
Let’s implement the backend thingy. During my implementation I ran into a number of problems: I had a working example on my Linux-box there and after putting it on our development-system, which is a Solaris-driven T1000 from Sun I saw that from strange reason the implementation of sockets in Ruby seems not to work on Solaris, which is pretty annoying but in the end I’m happy it did not work with sockets. The next idea was to use just TCP with non-blocking connections to send packages from the ping-interface to the Live-monitor-component, but sadly Ruby on Solaris does sadly also not support non-blocking connections. The thing I’d stuck with was UDP. Good old UDP but I think it is pretty ok for this uncritical task. To send some message, you also have a pattern called UDPSocket which can be utilized to do this task.
<ol><li>require 'socket'</li><li>u = UDPSocket.new</li><li>u.bind( "127.0.0.1", 1357 )</li><li>message = u.recvfrom( 512 )[0]</li><li>puts message</li></ol>
<ol><li>require 'socket'</li><li>s = UDPSocket.new</li><li>s.connect( "localhost", 1357 )</li><li>s.send( "Hello world", 0 )</li><li>s.close</li></ol>
The next problem is to make sure all listening TCP-clients get notified. So we need to have something like an IO-heap, where we store all the existing IO-objects and iterate through them to notify everyone. So we modify the serve()-method as it follows:
- def serve( io )
io_heap << io</li><li> loop do; end</li><li>end</li></ol></pre> <p>But who sends the snippet to all the objects? We also overload the constructor of ourio_heap = []MyServerclass to start a notification thread which does the following tasks:<ul><li>Start the UDP-server</li> <li>Iterate to our heap of IO-objects to notify them all</li></ul></p> <h3>Example V, the complete component</h3> <pre><ol><li>require 'gserver'</li><li></li><li>class MyServer < GServer</li><li> def initialize( port, *args )</li><li>udp = Thread.fork do</li><li>udp_socket = UDPSocket.newudp_socket.bind( "localhost", 1357 )</li><li> loop do</li><li> payload =udp_socket.recvfrom( 512 )[0]- unless payload.empty?
io_heap.each do |io|</li><li> begin</li><li> io.puts payload</li><li> rescue Errno::EPIPE => error</li><li>io_heap.delete io- end
- end
- end
- end
- end
- super(port, *args)
- end
- def serve( io )
- @io_heap << io
- loop do; end
- end
- end
- server = MyServer.new( 1234, host=“localhost” )
- server.start
- server.join
Isn’t it really, really short?
Filed under Blogmonitor, Code, GServer, Ruby, TCP, Technology, UDP & no comments & no trackbacks