使用 okhttp 抓取数据场景中,偶发Read timed out异常。正常的做法是增加重试机制。
在查看文档过程中,发现okhttp 默认会注册RetryAndFollowUpInterceptor ,字面上是支持重试的
。
那么,为什么timed out 异常不会重试,RetryAndFollowUpInterceptor 是干啥的?
This interceptor recovers from failures and follows redirects as necessary.
add authentication headers
// 针对未授权的异常(HTTP Status-Code .401: Unauthorized),尝试调用authenticate(), 继续请求操作
client.authenticator().authenticate(route, userResponse);
follow redirects
/**
* 针对重定向的异常
* HTTP Status-Code 301: Moved Permanently.
* HTTP Status-Code 302: Temporary Redirect.
* HTTP Status-Code 303: See Other.
* 通过重新构造request情况,达到自动跳转的目的
*/
String location = userResponse.header("Location");
HttpUrl url = userResponse.request().url().resolve(location);
handle a client request timeout(稀有场景)
case HTTP_CLIENT_TIMEOUT:
// 408's are rare in practice, but some servers like HAProxy use this response code. The
// spec says that we may repeat the request without modifications. Modern browsers also
// repeat the request (even non-idempotent ones.)
// 注意:此处的Timeout 不是上述的SocketTimeout...
参考:okhttp3.internal.http.RetryAndFollowUpInterceptor#followUpRequest
遇到如下的异常:ProtocolException、InterruptedIOException
、SSLHandshakeException、CertificateException
称之为:the failure is permanent
。
if (e instanceof InterruptedIOException) {
return e instanceof SocketTimeoutException && !requestSendStarted;
}
参考:okhttp3.internal.http.RetryAndFollowUpInterceptor#recover
根据情况,适当调整timeout设置
new OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.writeTimeout(5, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.build();
增加重试机制,对网络的波动进行容错
实现Interceptor接口,对SocketTimeoutException catch 重试。
通过上述的分析,RetryAndFollowUpInterceptor 解决的是http 协议应用层重试问题,而read timed out 通讯协议层的问题。解决timeout 对于RetryAndFollowUpInterceptor 不是职责内的功能。
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
// 自定义的拦截器优先执行
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
// 真正的发起网络请求
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// 调用链发起调用
return chain.proceed(originalRequest);
}
疑问:在业务工程代码梳理过程中,发现竟然存在xml 和 注解两种方式配置相同beanName,但是不同的Class。竟然能正常启动发布。理论上beanName 是唯一的,是怎么回事。
Insight Spring版本:3.2.0.RELEASE
Spring Bean在容器中的唯一标识是beanName
。对应到xml bean标签是id,对应到注解中是默认属性value。
xml 文件内,是不允许配置多个相同id 的Bean。Ide 会提示,同时启动也会报错 SAXParseException:There are multiple occurrences of ID value 'xxx'.
基于注解的Bean 定义,是不允许配置多个相同value 的Bean。自动扫描注册的过程中,启动报错 ConflictingBeanDefinitionException
: Annotation-specified bean name ‘xxx’ for bean class [com.Foo] conflicts with existing, non-compatible bean definition of same name
and class [com.Too]
Bean注册是面向BeanFactory 层次的操作。简单的说是存储在Map中。
/** Map of bean definition objects, keyed by bean name */
private final Map<String, BeanDefinition> beanDefinitionMap = new ConcurrentHashMap<String, BeanDefinition>(64);
/**
* xml bean 标签解析实现, 生成BeanDefinition,并注册到BeanFactory
* 通过源码可以看到,从解析到注册,是没有唯一校验beanName,是否能注册成功,就完全依赖the registry。
*
* 源码:DefaultBeanDefinitionDocumentReader#processBeanDefinition
*/
protected void processBeanDefinition(Element ele, BeanDefinitionParserDelegate delegate) {
BeanDefinitionHolder bdHolder = delegate.parseBeanDefinitionElement(ele);
if (bdHolder != null) {
bdHolder = delegate.decorateBeanDefinitionIfRequired(ele, bdHolder);
try {
// Register the given bean definition with the given bean factory. 直接调用,没有校验。
BeanDefinitionReaderUtils.registerBeanDefinition(bdHolder, getReaderContext().getRegistry());
}
catch (BeanDefinitionStoreException ex) {
// ...
}
}
}
/**
* 扫描指定的包路径, 生成bean definitions,并注册到BeanFactory
* 注意:checkCandidate 会对beanName 进行唯一性校验,Bean兼容判断。如果判断已存在兼容的BeanDefinition,则不再注册。
*
* @see ClassPathScanningCandidateComponentProvider#findCandidateComponents
* 源码:org.springframework.context.annotation.ClassPathBeanDefinitionScanner#doScan
*/
protected Set<BeanDefinitionHolder> doScan(String... basePackages) {
for (String basePackage : basePackages) {
Set<BeanDefinition> candidates = findCandidateComponents(basePackage);
for (BeanDefinition candidate : candidates) {
String beanName = this.beanNameGenerator.generateBeanName(candidate, this.registry);
// ......
// 注意checkCandidate 的作用:beanName唯一性校验(上述的ConflictingBeanDefinitionException,就是此处出现的);Bean 兼容判断(如果是非扫描Bean,则默认兼容!!!)。
if (checkCandidate(beanName, candidate)) {
BeanDefinitionHolder definitionHolder = new BeanDefinitionHolder(candidate, beanName);
registerBeanDefinition(definitionHolder, this.registry);
}
}
}
return beanDefinitions;
}
BeanFactory 有个配置allowBeanDefinitionOverriding
,默认true,是支持重复注册的。
/**
* Register a new bean definition with this registry.
* @throws BeanDefinitionStoreException 如果beanDefinition.validate()失败,或者禁止覆盖状态下重复beanName注册
*
* @see RootBeanDefinition
* @see ChildBeanDefinition
*/
public void registerBeanDefinition(String beanName, BeanDefinition beanDefinition) throws BeanDefinitionStoreException {
// ......
synchronized (this.beanDefinitionMap) {
Object oldBeanDefinition = this.beanDefinitionMap.get(beanName);
// 唯一性校验,如果allowBeanDefinitionOverriding,那么会重复注册,替换原有beanDefinition。默认支持。
if (oldBeanDefinition != null) {
if (!this.allowBeanDefinitionOverriding) {
throw new BeanDefinitionStoreException(beanDefinition.getResourceDescription(), beanName,
"Cannot register bean definition [" + beanDefinition + "] for bean '" + beanName +
"': There is already [" + oldBeanDefinition + "] bound.");
}
}
this.beanDefinitionMap.put(beanName, beanDefinition);
}
resetBeanDefinition(beanName);
}
根据上述两种注册实现,实例分析配置的注册过程。
<!-- case1: 先配置自动扫描。先注册Foo,再注册Woo,最终暴露的Bean 是Woo -->
<!-- 包路径下存在beanName="foo"的Class(com.service.Foo) -->
<context:component-scan base-package="com.service.*" />
<!-- xml 中直接定义Bean,beanName="foo" -->
<bean id="foo" class="com.service.Woo"></bean>
<!-- case2: 先配置xml bean。先注册Woo,自动扫描发现同名兼容Bean,跳过Foo,最终暴露的Bean 是Woo -->
<!-- xml 中直接定义Bean,beanName="foo" -->
<bean id="foo" class="com.service.Woo"></bean>
<!-- 包路径下存在beanName="foo"的Class(com.service.Foo) -->
<context:component-scan base-package="com.service.*" />
这样的话,xml bean 配置的优先级是高于自动扫描的bean。
结合上述的分析,Spring 在多个xml配置相同Bean,或者自动扫描和xml混合Bean配置的情况下,默认是允许相同beanName 多次出现的。默认可以理解为,最终解析到的BeanDefinition,会覆盖掉之前相同beanName 的所有BeanDefinition
。
通过上述分析,可以发现成熟框架在配置细节上都做的非常完善
。对于兼容性(支持多种bean注册、支持重复配置)、扩展性(支持overwrite)、一致性(注册结果和配置顺序无关)的设计和实现,都是值得我们在日常开发中借鉴和思考的。
springboot boot spring 的方案除了前一篇文章提到的,通过 SpringApplicationRunListener 暴露spring 框架启动的阶段,为spring 容器的初始化各种事件的扩展提供方案。
另外一个boot spring 的方案就是
auto-configuration
,通过个各种starter,提供各种EnableAutoConfiguration 接口的实现,将对应的特性注册到spring容器。
首先,spring 框架支持以@Configuration 注解的形式配置Bean,以@Import 引入并注册Bean。springboot 自定义对应的ImportSelector
,将各种starter 提供的各种@Configuration 配置类引入到spring 容器中。
然后,基于spring的Condition 机制,通过扩展@Conditional,提供更加丰富、具体的选择判断的功能
,支持根据当前classpath或者spring 容器的情况判断是否注册Bean。最终只会有效合法的Bean 注册到spring 容器中。
接下来,针对上述描述的过程,从springboot 入手,逐步分析关键注解的作用。
// EnableAutoConfiguration 的作用就是引入自定义ImportSelector,识别和引入configuration
@Import(EnableAutoConfigurationImportSelector.class)
public @interface EnableAutoConfiguration {
}
借助SpringFactoriesLoader 提供的通用工厂模式机制,springboot 可以加载到classpath 下的configuration classes。
// @see AutoConfigurationImportSelector#selectImports
public String[] selectImports(AnnotationMetadata annotationMetadata) {
// find auto configuration classes in META-INF/spring.factories
List<String> configurations = getCandidateConfigurations(annotationMetadata, attributes);
configurations = removeDuplicates(configurations);
configurations = sort(configurations, autoConfigurationMetadata);
Set<String> exclusions = getExclusions(annotationMetadata, attributes);
configurations.removeAll(exclusions);
configurations = filter(configurations, autoConfigurationMetadata);
fireAutoConfigurationImportEvents(configurations, exclusions);
return configurations.toArray(new String[configurations.size()]);
}
相当于TypeFilter 增强版,通过自定义编程的方式进行判断和筛选bean definition 。
springboot 扩展的注解:@ConditionalOnBean、@ConditionalOnClass、@ConditionalOnProperty 等
/**
* Determine if an item should be skipped based on @Conditional annotations.
*
* @see ConditionEvaluator#shouldSkip
*/
public boolean shouldSkip(AnnotatedTypeMetadata metadata, ConfigurationPhase phase) {
// 规则:无条件=默认符合
if (metadata == null || !metadata.isAnnotated(Conditional.class.getName())) {
return false;
}
// 解析@Conditional 并初始化conditions
List<Condition> conditions = new ArrayList<Condition>();
for (String[] conditionClasses : getConditionClasses(metadata)) {
for (String conditionClass : conditionClasses) {
Condition condition = getCondition(conditionClass);
conditions.add(condition);
}
}
// 根据conditions 判断是否matches
for (Condition condition : conditions) {
if (requiredPhase == null || requiredPhase == phase) {
if (!condition.matches(this.context, metadata)) {
return true;
}
}
}
return false;
}
自从spring 3.x开始,spring 支持以java 代码的形式配置容器。
关键的注解:@Configuration @Bean @Import @ComponentScans @PropertySources
@Configuration
类似spring XML configuration 的作用,加载配置文件、BeanDefinition等,因此应该在容器初始化初期进行。对应的处理类:ConfigurationClassPostProcessor,也就是BeanFactoryPostProcessor。
注意:上述的处理类实现了BeanDefinitionRegistryPostProcessor 接口,这个比标准的BeanFactoryPostProcessor 更早的调用和执行。
public AnnotatedBeanDefinitionReader(BeanDefinitionRegistry registry, Environment environment) {
this.registry = registry;
this.conditionEvaluator = new ConditionEvaluator(registry, environment, null);
// 注册配置处理器,spring annotation特性核心支持
AnnotationConfigUtils.registerAnnotationConfigProcessors(this.registry);
}
类比xml configuration,@Configuration class 相当于某一个xml 文件。
处理逻辑分为两部分:
处理逻辑的两个核心类:
// @see ConfigurationClassPostProcessor#processConfigBeanDefinitions
public void processConfigBeanDefinitions(BeanDefinitionRegistry registry) {
// Parse each @Configuration class
ConfigurationClassParser parser = new ConfigurationClassParser(
this.metadataReaderFactory, this.problemReporter, this.environment,
this.resourceLoader, this.componentScanBeanNameGenerator, registry);
Set<BeanDefinitionHolder> candidates = new LinkedHashSet<BeanDefinitionHolder>(configCandidates);
Set<ConfigurationClass> alreadyParsed = new HashSet<ConfigurationClass>(configCandidates.size());
do {
// 解析@Configuration class,对class 声明的@PropertySources、@ComponentScans、@ImportResource、@Bean、@Import 进行处理。
parser.parse(candidates);
parser.validate();
// Read the model and create bean definitions based on its content
if (this.reader == null) {
this.reader = new ConfigurationClassBeanDefinitionReader(
registry, this.sourceExtractor, this.resourceLoader, this.environment,
this.importBeanNameGenerator, parser.getImportRegistry());
}
// 根据Conditional 判断是否注册到容器中
this.reader.loadBeanDefinitions(configClasses);
alreadyParsed.addAll(configClasses);
// 判断是否有新引入的@Configuration class... 继续加载、解析、处理。
}
while (!candidates.isEmpty());
}
https://www.rabbitmq.com/getstarted.html
The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead
we schedule the task to be done later
.We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
This concept is especially useful in web applications where it’s impossible to handle a complex task during a short HTTP request window
. 同步异步化思想。
One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.
MQ ACK机制(Message acknowledgment) 半吊子英语害死人
You may wonder what happens if one of the consumers starts a long task and dies with it only partly done
. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
An acknowledgement is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it. make sure a message is never lost
.
提出问题,解决问题。
But our tasks will still be lost if RabbitMQ server stops.
When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue and messages as durable
.
最近项目中需要引入工作流引擎,实现业务和流程设计、流转的解耦。
工作流流引擎选用的是snaker,轻量、易上手、可定制。访问数据库用的是JdbcTemplate 。
项目中原有的持久层框架是Mybatis。
这样就带来一个问题:怎样保证业务逻辑(Mybatis) 和工作流流引擎(JdbcTemplate )处于事务控制中
,避免数据异常。
比如:业务单据创建成功后,审批流程启动。如果单据创建成功,审批流程启动异常,事务就应该回滚,否则就成僵尸单据了。
<!-- 按照正常的Spring事务管理配置即可,黑魔法在框架的扩展上体现。 -->
<tx:annotation-driven proxy-target-class="true" />
<bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
首先,事务控制的原理是通过AOP进行事务增强,实现数据连接事务的开启、提交或者回滚。
然后,事务控制的核心是开启事务后,会把对应的Connection 关联到当前的线程中,所有关联的数据库操作,使用的都是这个Connection,才能保证逻辑事务的完整性。
所以,Mybatis 和JdbcTempate 混合事务控制,保证数据一致性,关键就是保证不同框架获取到的是事务开启后的同一个Connection 对象
。剩下的就是AOP 拦截器实现提交或者回滚即可。
关键类:
org.mybatis.spring.transaction.SpringManagedTransactionFactory
org.mybatis.spring.SqlSessionTemplate.SqlSessionInterceptor
工作原理
/**
* 动态代理 SqlSession,拦截对应的方法,事务相关的操作托管给Spring's Transaction Manager
* 通过这个InvocationHandler, 把所有的Mybatis 数据库操作的getSqlSession 都委托给Spring 处理,偷天换日。
* 核心实现:
* 给当前线程绑定一个 SqlSession对象,这样就能够保证后续的持久化操作获取的都是这个 SqlSession对象。
*/
private class SqlSessionInterceptor implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 从Spring事务管理器获取 sqlSession,或者在需要时创建一个新sqlSession。
// Retrieve a sqlSession for the given key that is bound to the current thread.
// @see SqlSessionUtils#getSqlSession
SqlSession sqlSession = getSqlSession(...);
try {
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
// force commit even on non-dirty sessions because some databases require
// a commit/rollback before calling close()
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
// ...
}
}
}
/**
* 管理从Spring's transaction manager 获取的 connection
* 如果事务管理是启用状态,那么commit/rollback/close 操作统一交给事务管理器去完成。
*/
public class SpringManagedTransaction implements Transaction {
@Override
public Connection getConnection() throws SQLException {
if (this.connection == null) {
openConnection();
}
return this.connection;
}
/**
* Gets a connection from Spring transaction manager and discovers if this
* {@code Transaction} should manage connection or let it to Spring.
* <p>
* It also reads autocommit setting because when using Spring Transaction MyBatis
* thinks that autocommit is always false and will always call commit/rollback
* so we need to no-op that calls.
*/
private void openConnection() throws SQLException {
// 获取的Connection 操作,会检测是否启用了事务管理器。如果有事务管理器,会给当前线程绑定或者获取一个 Connection对象。 (Will bind a Connection to the thread if transaction synchronization is active)
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
}
}
public <T> T execute(CallableStatementCreator csc, CallableStatementCallback<T> action) {
// 和Mybatis SpringManagedTransaction 获取Connection 调用同样的方法。
Connection con = DataSourceUtils.getConnection(getDataSource());
CallableStatement cs = null;
try {
cs = csc.createCallableStatement(conToUse);
applyStatementSettings(cs);
T result = action.doInCallableStatement(csToUse);
handleWarnings(cs);
return result;
}
catch (SQLException ex) {
// ...
}
finally {
JdbcUtils.closeStatement(cs);
DataSourceUtils.releaseConnection(con, getDataSource());
}
}
Mybatis JdbcTemplate 获取Connection 的方式殊途同归
,保证获取的是同一个 Connection。当前线程的ThreadLocal 是否有初始化
。DataSourceUtils.getConnection
获取连接。