Dubbo网络通信组件架构
网络通信组件通用设计方法和架构

架构方面通常使用加一层解决问题。
请求响应层:完成网络层的调度,完成请求处理过程(心跳,时效性处理,复杂业务耦合处理)
Dubbo网络通信架构
- Protocol层 业务层
- Exchange层 信息交换层,用来封装请求-响应模式(心跳处理)
- Transport层 网络传输层,抽象Netty等框架作为统一的接口
- Serialize层 序列化层,主要完成数据的序列化和反序列化过
Protocol
export 服务端对外暴露服务
refer 客户端对远程服务进行引用
1 2 3 4 5 6 7 8 9 10 11 12
| @SPI("dubbo") public interface Protocol { int getDefaultPort();
@Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy(); }
|
Exchanger
1 2 3 4 5 6 7 8 9
| @SPI(HeaderExchanger.NAME) public interface Exchanger {
@Adaptive({Constants.EXCHANGER_KEY}) ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
@Adaptive({Constants.EXCHANGER_KEY}) ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException; }
|
Transporter
1 2 3 4 5 6 7 8 9
| @SPI("netty") public interface Transporter {
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) Server bind(URL url, ChannelHandler handler) throws RemotingException;
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler) throws RemotingException; }
|
Dubbo服务端通信机制
Dubbo服务端集成Netty服务,启动服务监听
典型的多层架构

DubboProtocol创建ExchangeServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| // 根据传入的服务请求URL来创建ExchangeServer private void openServer(URL url) { String key = url.getAddress(); boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } else { server.reset(url); } } }
// 通过Exchanger接口创建ExchangeServer private ExchangeServer createServer(URL url) { ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } return server; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class HeaderExchanger implements Exchanger { public static final String NAME = "header";
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
|
心跳
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| //心跳检测功能 private void startHeatbeatTimer() { stopHeartbeatTimer(); if (heartbeat > 0) { heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { public Collection<Channel> getChannels() { return Collections.unmodifiableCollection( HeaderExchangeServer.this.getChannels()); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } }
|
NettyServer
1 2 3 4 5 6 7 8 9 10 11
| public class NettyTransporter implements Transporter { public static final String NAME = "netty4";
public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); }
public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| protected void doOpen() throws Throwable { ... bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); channel = bootstrap.bind(getBindAddress()); }
public void send(Object message, boolean sent) throws RemotingException { Collection<Channel> channels = getChannels(); for (Channel channel : channels) { if (channel.isConnected()) { channel.send(message, sent); } } }
|
Dubbo客户端通信机制

DubboProtocol
Exchangers.connect 获取ExchangeClient
1 2 3 4 5 6 7 8 9 10 11 12
| private ExchangeClient initClient(URL url) { ExchangeClient client; try { if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { client = new LazyConnectExchangeClient(url, requestHandler); } else { client = Exchangers.connect(url, requestHandler); } }
return client; }
|
问题
分层架构,单一职责,扩展性,
兼容性、维护性