以下代码代表一个加法。客户端提交加数,服务端回送“和”。
服务端
package player.kent.chen.temp.learnnetty.raw; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; /** * 报文格式:request=a+b, response=c * * @author 陈坚 2013年6月18日下午2:47:04 */ public class PlayNettyAddServer { //server关闭时会用到这个变量 static final ChannelGroup allChannels = new DefaultChannelGroup(); //真正处理报文的 private static final class AddServerHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { //拿到request报文 ChannelBuffer inputBuf = (ChannelBuffer) e.getMessage(); String requestStr = new String(inputBuf.array()); //解析request报文并计算 String[] params = requestStr.split("\\+"); if (params.length != 2) { return; } int a = Integer.parseInt(params[0].trim()); int b = Integer.parseInt(params[1].trim()); String result = String.valueOf(a + b); //回送response报文 Channel channel = e.getChannel(); ChannelBuffer outputBuf = ChannelBuffers.buffer(result.length()); outputBuf.writeBytes(result.getBytes()); channel.write(outputBuf); } @Override //异常处理 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { e.getCause().printStackTrace(); e.getChannel().close(); } } //启动入口 public static void main(String[] args) throws InterruptedException { ExecutorService bossThreadPool = Executors.newCachedThreadPool(); //侦听线程池 ExecutorService workerThreadPool = Executors.newCachedThreadPool(); //工作线程池 ChannelFactory channelFactory = new NioServerSocketChannelFactory(bossThreadPool, workerThreadPool); //channelFactory用来产生和管理channel ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); //a helper class that sets up a server bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { //pipeline可以理解为handler的集合,一个接一个地处理 return Channels.pipeline(new AddServerHandler()); } }); //设定一些通信参数 bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); //在某端口启动 Channel channel = bootstrap.bind(new InetSocketAddress(18318)); System.out.println("服务器已启动,并将在20秒后关闭"); allChannels.add(channel); Thread.sleep(20 * 1000); //开始释放资源 System.out.println("服务器开始关闭..."); System.out.println("关闭所有channel"); ChannelGroupFuture closeFuture = allChannels.close(); closeFuture.awaitUninterruptibly(); System.out.println("释放外部资源"); channelFactory.releaseExternalResources(); } }
客服端
package player.kent.chen.temp.learnnetty.raw; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; public class PlayNettyClient { private static final class AddClientHandler extends SimpleChannelHandler { @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channel连通"); } @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channel关闭"); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); String result = new String(buf.array()); System.out.println("The add result is: " + result); } @Override //异常处理 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { e.getCause().printStackTrace(); e.getChannel().close(); } } //client入口 public static void main(String[] args) throws InterruptedException { ExecutorService bossThreadPool = Executors.newCachedThreadPool(); //主线程池 ExecutorService workerThreadPool = Executors.newCachedThreadPool(); //从线程池 ChannelFactory channelFactory = new NioClientSocketChannelFactory(bossThreadPool, workerThreadPool); ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new AddClientHandler()); } }); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); //连上服务器 ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress(18318)); //发出一个请求 Channel channel = connectFuture.awaitUninterruptibly().getChannel(); ChannelBuffer requestBuf = ChannelBuffers.dynamicBuffer(); requestBuf.writeBytes(" 82 + 28 ".getBytes()); ChannelFuture writeFuture = channel.write(requestBuf); writeFuture.awaitUninterruptibly(); //等待断开 ChannelFuture closeFuture = connectFuture.getChannel().getCloseFuture(); closeFuture.awaitUninterruptibly(); System.out.println("连接被断开"); //释放资源 channelFactory.releaseExternalResources(); } }