javaweb
javaweb copied to clipboard
dubbo源码解析(九) Filter & Listener
Filter解析
通过Filter接口我们可以轻松地实现服务提供方和消费方的拦截
@SPI
public interface Filter {
/**
* do invoke filter.
* <p>
* <code>
* // before filter
* Result result = invoker.invoke(invocation);
* // after filter
* return result;
* </code>
*
* @param invoker service
* @param invocation invocation.
* @return invoke result.
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Invoker#invoke(Invocation)
*/
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
}
我们注意到ReferenceConfig和ServiceConfig的共同父类AbstractInterfaceConfig
// filter
protected String filter;
//key为reference.filter
@Parameter(key = Constants.REFERENCE_FILTER_KEY, append = true)
public String getFilter() {
return filter;
}
//Parameter作用是将filter解析并放入key为service.filter的url
@Parameter(key = Constants.SERVICE_FILTER_KEY, append = true)
public String getFilter() {
return super.getFilter();
}
//xxx,yyy
public void setFilter(String filter) {
checkMultiExtension(Filter.class, "filter", filter);
this.filter = filter;
}
自定义调用拦截使用方法
服务提供方和服务消费方调用过程拦截,Dubbo 本身的大多功能均基于此扩展点实现,每次远程方法执行,该拦截都会被执行,请注意对性能的影响。
约定:
- 用户自定义 filter 默认在内置 filter 之后。
- 特殊值
default
,表示缺省扩展点插入的位置。比如:filter="xxx,default,yyy"
,表示xxx
在缺省 filter 之前,yyy
在缺省 filter 之后。 - 特殊符号
-
,表示剔除。比如:filter="-foo1"
,剔除添加缺省扩展点foo1
。比如:filter="-default"
,剔除添加所有缺省扩展点。 - provider 和 service 同时配置的 filter 时,累加所有 filter,而不是覆盖。比如:
<dubbo:provider filter="xxx,yyy"/>
和<dubbo:service filter="aaa,bbb" />
,则xxx
,yyy
,aaa
,bbb
均会生效。如果要覆盖,需配置:<dubbo:service filter="-xxx,-yyy,aaa,bbb" />
扩展接口
com.alibaba.dubbo.rpc.Filter
扩展配置
<!-- 消费方调用过程拦截 -->
<dubbo:reference filter="xxx,yyy" />
<!-- 消费方调用过程缺省拦截器,将拦截所有reference -->
<dubbo:consumer filter="xxx,yyy"/>
<!-- 提供方调用过程拦截 -->
<dubbo:service filter="xxx,yyy" />
<!-- 提供方调用过程缺省拦截器,将拦截所有service -->
<dubbo:provider filter="xxx,yyy"/>
已知扩展
-
com.alibaba.dubbo.rpc.filter.EchoFilter
-
com.alibaba.dubbo.rpc.filter.GenericFilter
-
com.alibaba.dubbo.rpc.filter.GenericImplFilter
-
com.alibaba.dubbo.rpc.filter.TokenFilter
-
com.alibaba.dubbo.rpc.filter.AccessLogFilter
-
com.alibaba.dubbo.rpc.filter.CountFilter
-
com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
-
com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
-
com.alibaba.dubbo.rpc.filter.ContextFilter
-
com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
-
com.alibaba.dubbo.rpc.filter.ExceptionFilter
-
com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
-
com.alibaba.dubbo.rpc.filter.DeprecatedFilter
扩展示例
Maven 项目结构:
src
|-main
|-java
|-com
|-xxx
|-XxxFilter.java (实现Filter接口)
|-resources
|-META-INF
|-dubbo
|-com.alibaba.dubbo.rpc.Filter (纯文本文件,内容为:xxx=com.xxx.XxxFilter)
XxxFilter.java:
package com.xxx;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
public class XxxFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// before filter ...
Result result = invoker.invoke(invocation);
// after filter ...
return result;
}
}
META-INF/dubbo/com.alibaba.dubbo.rpc.Filter:
xxx=com.xxx.XxxFilter
具体是如何调用的呢?
ProtocolFilterWrapper
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
//通过配置拿到所有filter
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
public int getDefaultPort() {
return protocol.getDefaultPort();
}
//服务暴露
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
//通过读取service.filter构建filter链
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
//服务引用
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url),
//通过读取reference.filter配置构建filter链
Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
public void destroy() {
protocol.destroy();
}
}
Listener解析
@SPI
public interface InvokerListener {
/**
* The invoker referred
*
* @param invoker
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Protocol#refer(Class, URL)
*/
void referred(Invoker<?> invoker) throws RpcException;
/**
* The invoker destroyed.
*
* @param invoker
* @see com.alibaba.dubbo.rpc.Invoker#destroy()
*/
void destroyed(Invoker<?> invoker);
}
@SPI
public interface ExporterListener {
/**
* The exporter exported.
*
* @param exporter
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Protocol#export(Invoker)
*/
void exported(Exporter<?> exporter) throws RpcException;
/**
* The exporter unexported.
*
* @param exporter
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Exporter#unexport()
*/
void unexported(Exporter<?> exporter);
}
protected String listener;
//invoker.listener
@Parameter(key = Constants.INVOKER_LISTENER_KEY, append = true)
public String getListener() {
checkMultiExtension(InvokerListener.class, "listener", listener);
return listener;
}
//exporter.listener
@Parameter(key = Constants.EXPORTER_LISTENER_KEY, append = true)
public String getListener() {
return super.getListener();
}
public void setListener(String listener) {
this.listener = listener;
}
引用监听扩展
扩展说明
当有服务引用时,触发该事件。
扩展接口
com.alibaba.dubbo.rpc.InvokerListener
扩展配置
<!-- 引用服务监听 -->
<dubbo:reference listener="xxx,yyy" />
<!-- 引用服务缺省监听器 -->
<dubbo:consumer listener="xxx,yyy" />
已知扩展
com.alibaba.dubbo.rpc.listener.DeprecatedInvokerListener
扩展示例
Maven 项目结构:
src
|-main
|-java
|-com
|-xxx
|-XxxInvokerListener.java (实现InvokerListener接口)
|-resources
|-META-INF
|-dubbo
|-com.alibaba.dubbo.rpc.InvokerListener (纯文本文件,内容为:xxx=com.xxx.XxxInvokerListener)
XxxInvokerListener.java:
package com.xxx;
import com.alibaba.dubbo.rpc.InvokerListener;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcException;
public class XxxInvokerListener implements InvokerListener {
public void referred(Invoker<?> invoker) throws RpcException {
// ...
}
public void destroyed(Invoker<?> invoker) throws RpcException {
// ...
}
}
META-INF/dubbo/com.alibaba.dubbo.rpc.InvokerListener:
xxx=com.xxx.XxxInvokerListener
暴露监听扩展
扩展说明
当有服务暴露时,触发该事件。
扩展接口
com.alibaba.dubbo.rpc.ExporterListener
扩展配置
<!-- 暴露服务监听 -->
<dubbo:service listener="xxx,yyy" />
<!-- 暴露服务缺省监听器 -->
<dubbo:provider listener="xxx,yyy" />
已知扩展
com.alibaba.dubbo.registry.directory.RegistryExporterListener
扩展示例
Maven 项目结构:
src
|-main
|-java
|-com
|-xxx
|-XxxExporterListener.java (实现ExporterListener接口)
|-resources
|-META-INF
|-dubbo
|-com.alibaba.dubbo.rpc.ExporterListener (纯文本文件,内容为:xxx=com.xxx.XxxExporterListener)
XxxExporterListener.java:
package com.xxx;
import com.alibaba.dubbo.rpc.ExporterListener;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.RpcException;
public class XxxExporterListener implements ExporterListener {
public void exported(Exporter<?> exporter) throws RpcException {
// ...
}
public void unexported(Exporter<?> exporter) throws RpcException {
// ...
}
}
META-INF/dubbo/com.alibaba.dubbo.rpc.ExporterListener:
xxx=com.xxx.XxxExporterListener
ProtocolListenerWrapper解析
public class ProtocolListenerWrapper implements Protocol {
private final Protocol protocol;
public ProtocolListenerWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
public int getDefaultPort() {
return protocol.getDefaultPort();
}
//暴露
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
//exporter.listener
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
//引用
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
//invoker.listener
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}
public void destroy() {
protocol.destroy();
}
}
ListenerExporterWrapper
public class ListenerExporterWrapper<T> implements Exporter<T> {
private static final Logger logger = LoggerFactory.getLogger(ListenerExporterWrapper.class);
private final Exporter<T> exporter;
private final List<ExporterListener> listeners;
public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
if (exporter == null) {
throw new IllegalArgumentException("exporter == null");
}
this.exporter = exporter;
this.listeners = listeners;
if (listeners != null && !listeners.isEmpty()) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.exported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} finally {
if (listeners != null && !listeners.isEmpty()) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.unexported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
}
ListenerInvokerWrapper
public class ListenerInvokerWrapper<T> implements Invoker<T> {
private static final Logger logger = LoggerFactory.getLogger(ListenerInvokerWrapper.class);
private final Invoker<T> invoker;
private final List<InvokerListener> listeners;
public ListenerInvokerWrapper(Invoker<T> invoker, List<InvokerListener> listeners) {
if (invoker == null) {
throw new IllegalArgumentException("invoker == null");
}
this.invoker = invoker;
this.listeners = listeners;
if (listeners != null && !listeners.isEmpty()) {
for (InvokerListener listener : listeners) {
if (listener != null) {
try {
listener.referred(invoker);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
}
}
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
@Override
public String toString() {
return getInterface() + " -> " + (getUrl() == null ? " " : getUrl().toString());
}
public void destroy() {
try {
invoker.destroy();
} finally {
if (listeners != null && !listeners.isEmpty()) {
for (InvokerListener listener : listeners) {
if (listener != null) {
try {
listener.destroyed(invoker);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
}
}
}
}