`
jbm3072
  • 浏览: 209052 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

自定义的RPC的Java实现

    博客分类:
  • Java
阅读更多

在看hadoop的源代码的时候,看到hadoop实现了一个自定义的RPC,于是有了自己写代码实现RPC的想法。

RPC的全名Remote Process Call,即远程过程调用。使用RPC,可以像使用本地的程序一样使用远程服务器上的程序。下面是一个简单的RPC 调用实例,从中可以看到RPC如何使用以及好处:

 

public class MainClient {
	public static void main(String[] args) {
		Echo echo = RPC.getProxy(Echo.class, "127.0.0.1", 20382);	
		System.out.println(echo.echo("hello,hello"));
	}
}

 

public interface Echo {
	public String echo(String string);
}
 

 

 

 

使用RPC.getProxy生成接口Echo的代理实现类。然后就可以像使用本地的程序一样来调用Echo中的echo方法。

使用RPC的好处是简化了远程服务访问。提高了开发效率。在分发代码时,只需要将接口分发给客户端使用,在客户端看来只有接口,没有具体类实现。这样保证了代码的可扩展性和安全性。

 

在看了RPCClient如何使用,我们再来定义一个RPC服务器的接口,看看服务器都提供什么操作:

 

 

public interface Server {
	public void stop();
	public void start();
	public void register(Class interfaceDefiner,Class impl);
	public void call(Invocation invo);
	public boolean isRunning();
	public int getPort();
}

 

 服务器提供了start和stop方法。使用register注册一个接口和对应的实现类。call方法用于执行Invocation指定的接口的方法名。isRunning返回了服务器的状态,getPort()则返回了服务器使用的端口。

 

来看看Invocation的定义:

 

 

public class Invocation implements Serializable{
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	private Class interfaces;
	private Method method;
	private Object[] params;
	private Object result;
	
	
	/**
	 * @return the result
	 */
	public Object getResult() {
		return result;
	}
	/**
	 * @param result the result to set
	 */
	public void setResult(Object result) {
		this.result = result;
	}
	/**
	 * @return the interfaces
	 */
	public Class getInterfaces() {
		return interfaces;
	}
	/**
	 * @param interfaces the interfaces to set
	 */
	public void setInterfaces(Class interfaces) {
		this.interfaces = interfaces;
	}
	/**
	 * @return the method
	 */
	public Method getMethod() {
		return method;
	}
	/**
	 * @param method the method to set
	 */
	public void setMethod(Method method) {
		this.method = method;
	}
	/**
	 * @return the params
	 */
	public Object[] getParams() {
		return params;
	}
	/**
	 * @param params the params to set
	 */
	public void setParams(Object[] params) {
		this.params = params;
	}
	@Override
	public String toString() {
		return interfaces.getName()+"."+method.getMethodName()+"("+Arrays.toString(params)+")";
	}
	
}
 

 

 

     具体服务器实现类中的call方法是这样使用Invocation的:

 

 

 

@Override
		public void call(Invocation invo) {
			Object obj = serviceEngine.get(invo.getInterfaces().getName()); //根据接口名,找到对应的处理类
			if(obj!=null) {
				try {
					Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());
					Object result = m.invoke(obj, invo.getParams());
					invo.setResult(result);
				} catch (Throwable th) {
					th.printStackTrace();
				}
			} else {
				throw new IllegalArgumentException("has no these class");
			}
		}
 

 

  下面来看服务器接收连接并处理连接请求的核心代码:

 

 

 

public class Listener extends Thread {
	private ServerSocket socket;
	private Server server;

	public Listener(Server server) {
		this.server = server;
	}

	@Override
	public void run() {

		System.out.println("启动服务器中,打开端口" + server.getPort());
		try {
			socket = new ServerSocket(server.getPort());
		} catch (IOException e1) {
			e1.printStackTrace();
			return;
		}
		while (server.isRunning()) {
			try {
				
				Socket client = socket.accept();
				ObjectInputStream ois = new ObjectInputStream(client.getInputStream());
				Invocation invo = (Invocation) ois.readObject();
				server.call(invo);
				ObjectOutputStream oos = new ObjectOutputStream(client.getOutputStream());
				oos.writeObject(invo);
				oos.flush();
				oos.close();
				ois.close();
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

		}

		try {
			if (socket != null && !socket.isClosed())
				socket.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}
 

 

RPC具体的Server类是这样来使用Listener的:

 

public static class RPCServer implements Server{
		private int port = 20382;
		private Listener listener; 
		private boolean isRuning = true;
		
		/**
		 * @param isRuning the isRuning to set
		 */
		public void setRuning(boolean isRuning) {
			this.isRuning = isRuning;
		}

		/**
		 * @return the port
		 */
		public int getPort() {
			return port;
		}

		/**
		 * @param port the port to set
		 */
		public void setPort(int port) {
			this.port = port;
		}

		private Map<String ,Object> serviceEngine = new HashMap<String, Object>();
		
		
		@Override
		public void call(Invocation invo) {
			System.out.println(invo.getClass().getName());
			Object obj = serviceEngine.get(invo.getInterfaces().getName());
			if(obj!=null) {
				try {
					Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());
					Object result = m.invoke(obj, invo.getParams());
					invo.setResult(result);
				} catch (Throwable th) {
					th.printStackTrace();
				}
			} else {
				throw new IllegalArgumentException("has no these class");
			}
		}

		@Override
		public void register(Class interfaceDefiner, Class impl) {
			try {
				this.serviceEngine.put(interfaceDefiner.getName(), impl.newInstance());
				System.out.println(serviceEngine);
			} catch (Throwable e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} 
		}

		@Override
		public void start() {
			System.out.println("启动服务器");
			listener = new Listener(this);
			this.isRuning = true;
			listener.start();
		}

		@Override
		public void stop() {
			this.setRuning(false);
		}

		@Override
		public boolean isRunning() {
			return isRuning;
		}
		
	}
 

    服务器端代码搞定后,来看看客户端的代码,先看看我们刚开始使用RPC.getProxy方法:

 

public static <T> T getProxy(final Class<T> clazz,String host,int port) {
		
		final Client client = new Client(host,port);
		InvocationHandler handler = new InvocationHandler() {
			
			@Override
			public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				Invocation invo = new Invocation();
				invo.setInterfaces(clazz);
				invo.setMethod(new org.jy.rpc.protocal.Method(method.getName(),method.getParameterTypes()));
				invo.setParams(args);
				client.invoke(invo);
				return invo.getResult();
			}
		};
		T t = (T) Proxy.newProxyInstance(RPC.class.getClassLoader(), new Class[] {clazz}, handler);
		return t;
	}
 

Client类的代码如下:

 

public class Client {
	private String host;
	private int port;
	private Socket socket;
	private ObjectOutputStream oos;
	private ObjectInputStream ois;

	public String getHost() {
		return host;
	}


	public void setHost(String host) {
		this.host = host;
	}

	public int getPort() {
		return port;
	}
	public void setPort(int port) {
		this.port = port;
	}

	public Client(String host, int port) {
		this.host = host;
		this.port = port;
	}

	public void init() throws UnknownHostException, IOException {
		socket = new Socket(host, port);
		oos = new ObjectOutputStream(socket.getOutputStream());
	}

	public void invoke(Invocation invo) throws UnknownHostException, IOException, ClassNotFoundException {
		init();
		System.out.println("写入数据");
		oos.writeObject(invo);
		oos.flush();
		ois = new ObjectInputStream(socket.getInputStream());
		Invocation result = (Invocation) ois.readObject();
		invo.setResult(result.getResult());
	}

}
 

    至此,RPC的客户端和服务器端代码完成,启动服务器的代码如下:

 

public class Main {
	public static void main(String[] args) {
		Server server = new RPC.RPCServer();
		server.register(Echo.class, RemoteEcho.class);
		server.start();
	}

}
 

   现在先运行服务器端代码,再运行客户端代码,就可以成功运行。

   详细的代码,参考附件的源代码。

 

    在写这个RPC时,没有想太多。在数据串行化上,使用了java的标准io序列化机制,虽然不能跨平台,但是做DEMO还是不错的;另外在处理客户端请求上,使用了ServerSocket,而没有使用ServerSocketChannel这个java nio中的新特性;在动态生成接口的实现类上,使用了java.lang.reflet中的Proxy类。他可以动态创建接口的实现类。

 

 

 

  • Rpc.rar (17.1 KB)
  • 下载次数: 1335
12
4
分享到:
评论
11 楼 sky88088 2015-09-30  
感谢分享~
10 楼 a2773945 2015-07-15  
   好东西,谢谢大哥
9 楼 beer2008cn 2015-03-31  
写的很棒
8 楼 shuangjue 2014-12-08  
不能跨平台,和RMI相比有何优势呢?
能否提供个跨平台的方案,比如android与J2SE双向通行
7 楼 god333666 2014-10-23  
写得很不错 受启发
6 楼 Cockadoodledoo 2014-03-07  
[flash=200,200][img][list]
[*]
引用
[color=cyan][/color]

[/list][/img][/flash]      
5 楼 hngmduyi 2014-03-03  
4 楼 charles751 2013-08-14  
请教,你这个实现和java RMI相比的优势是什么?
3 楼 peter-wang-2003 2012-10-20  
谢谢大哥,看了这个有很大的启发,可以改成NIO模式,性能比较高
2 楼 xin_jmail 2012-09-10  
很好的教程,谢谢啦~我qq530422429,希望能与楼主交流…
1 楼 kimmking 2011-06-14  

我几年前也做过一个,考虑了跨平台。

http://code.google.com/p/rpcfx/

相关推荐

Global site tag (gtag.js) - Google Analytics