用Zookeeper搭建一个服务注册中心:不断有ServiceProvider上来注册自己的进程ID或者与注册中心断开,ServiceConsumer则捕捉这些变化。
1.创建一个根结点,所有注册信息都将挂这个结点的下面
public class RegistryStartup { public static void main(String[] args) throws Exception { ZooKeeper zk = new ZooKeeper("localhost", 1000, null); while (zk.getState() != States.CONNECTED) { ;//wait } Stat rootExists = zk.exists("/serviceProviderRoot", false); if (rootExists == null) { zk.create("/serviceProviderRoot", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } System.out.println("The node '/serviceProviderRoot' has been created"); } }
2.ServiceProvider注册自己的进程号
<!--pom.xml, 引入一个库,以帮助获取当前进程的ID--> <dependency> <groupId>net.java.dev.jna</groupId> <artifactId>jna</artifactId> <version>3.5.1</version> </dependency>
public class ServiceProvider { public static void main(String[] args) throws Exception { String pid = String.valueOf(CLibrary.INSTANCE.getpid()); //自己的进程号 ZooKeeper zk = new ZooKeeper("localhost", 1000, null); while (zk.getState() != States.CONNECTED) { ;//wait } System.out.println("Process " + pid + " connected to the Config Server as Service Provider "); byte[] pidData = pid.getBytes(); //注册自己的进程号,注意要使用EPHEMERAL类型的结点,这样当自己下线时相应的注册信息也会自动消失 //把自己当作某个根结点的子结点来注册 String path = zk.create("/serviceProviderRoot/child", pidData, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Process " + pid + " registered as znoed in path: " + path); //永远睡眠,以保持本进程在线 Thread.sleep(Long.MAX_VALUE); } private static interface CLibrary extends Library { CLibrary INSTANCE = (CLibrary) Native.loadLibrary("c", CLibrary.class); int getpid(); } }
3.ServiceConsumer监听Provider的变化
public class ServiceConsumer implements Watcher {//要实现Watcher接口 public static void main(String[] args) throws Exception { //连一下 ZooKeeper zk = new ZooKeeper("localhost", 1000, null); while (zk.getState() != States.CONNECTED) { ;//wait } System.out.println("Connected to the Config Server as Service Consumer "); //创建本类的一个对象,用作一个Watcher ServiceConsumer consumer = new ServiceConsumer(zk); //先处理一下线上现有的provider consumer.dealWithAllProviders(); //防止主线程终止;剩下的活都由Watcher的监视线程来干 Thread.sleep(Long.MAX_VALUE); } private final ZooKeeper zk; ServiceConsumer(ZooKeeper zk) { super(); this.zk = zk; } //处理当前现有的provider private void dealWithAllProviders() { try { //找出当前所有的provider结点(znode) //另外注意这里的第二个参数"this", 它的意思是找出所有结点后又立即注册一个watch行为,以监视新的变化 List<String> pathList = zk.getChildren("/serviceProviderRoot", this); if (pathList.isEmpty()) { return; } //打印所有的provider进程号 List<String> processList = new ArrayList<String>(); for (String path : pathList) { byte[] data = zk.getData("/serviceProviderRoot/" + path, false, null); processList.add(new String(data)); } System.out.println("Providers are: " + processList); //跟其中一个provider交互一下:通过kill -3 ”恐吓“ 它一下 Runtime.getRuntime().exec("kill -3 " + processList.get(processList.size() / 2)); } catch (Exception e) { e.printStackTrace(); } } //本watcher的监视方法 @Override public void process(WatchedEvent event) { if (event.getType() == EventType.NodeChildrenChanged) {//遇子结点(provider)变化,则处理一下所有子结点 dealWithAllProviders(); } } }
4.运行
1. RegistryStartup要先跑一下
2. ServiceProvider和ServiceConumer随便谁先运行
3. 不时地启动新的ServiceProvider或终止运行中的ServiceProvider,观察一下ServiceConumer的控制台输出和运行中的所有的ServiceProvider的输出(当provider列表变动时,总有一个进程会打印线程栈)