https://www.rabbitmq.com/getstarted.html
The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead
we schedule the task to be done later
.We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
This concept is especially useful in web applications where it’s impossible to handle a complex task during a short HTTP request window
. 同步异步化思想。
One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.
MQ ACK机制(Message acknowledgment) 半吊子英语害死人
You may wonder what happens if one of the consumers starts a long task and dies with it only partly done
. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
An acknowledgement is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it. make sure a message is never lost
.
提出问题,解决问题。
But our tasks will still be lost if RabbitMQ server stops.
When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue and messages as durable
.
最近项目中需要引入工作流引擎,实现业务和流程设计、流转的解耦。
工作流流引擎选用的是snaker,轻量、易上手、可定制。访问数据库用的是JdbcTemplate 。
项目中原有的持久层框架是Mybatis。
这样就带来一个问题:怎样保证业务逻辑(Mybatis) 和工作流流引擎(JdbcTemplate )处于事务控制中
,避免数据异常。
比如:业务单据创建成功后,审批流程启动。如果单据创建成功,审批流程启动异常,事务就应该回滚,否则就成僵尸单据了。
<!-- 按照正常的Spring事务管理配置即可,黑魔法在框架的扩展上体现。 -->
<tx:annotation-driven proxy-target-class="true" />
<bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
首先,事务控制的原理是通过AOP进行事务增强,实现数据连接事务的开启、提交或者回滚。
然后,事务控制的核心是开启事务后,会把对应的Connection 关联到当前的线程中,所有关联的数据库操作,使用的都是这个Connection,才能保证逻辑事务的完整性。
所以,Mybatis 和JdbcTempate 混合事务控制,保证数据一致性,关键就是保证不同框架获取到的是事务开启后的同一个Connection 对象
。剩下的就是AOP 拦截器实现提交或者回滚即可。
关键类:
org.mybatis.spring.transaction.SpringManagedTransactionFactory
org.mybatis.spring.SqlSessionTemplate.SqlSessionInterceptor
工作原理
/**
* 动态代理 SqlSession,拦截对应的方法,事务相关的操作托管给Spring's Transaction Manager
* 通过这个InvocationHandler, 把所有的Mybatis 数据库操作的getSqlSession 都委托给Spring 处理,偷天换日。
* 核心实现:
* 给当前线程绑定一个 SqlSession对象,这样就能够保证后续的持久化操作获取的都是这个 SqlSession对象。
*/
private class SqlSessionInterceptor implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 从Spring事务管理器获取 sqlSession,或者在需要时创建一个新sqlSession。
// Retrieve a sqlSession for the given key that is bound to the current thread.
// @see SqlSessionUtils#getSqlSession
SqlSession sqlSession = getSqlSession(...);
try {
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
// force commit even on non-dirty sessions because some databases require
// a commit/rollback before calling close()
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
// ...
}
}
}
/**
* 管理从Spring's transaction manager 获取的 connection
* 如果事务管理是启用状态,那么commit/rollback/close 操作统一交给事务管理器去完成。
*/
public class SpringManagedTransaction implements Transaction {
@Override
public Connection getConnection() throws SQLException {
if (this.connection == null) {
openConnection();
}
return this.connection;
}
/**
* Gets a connection from Spring transaction manager and discovers if this
* {@code Transaction} should manage connection or let it to Spring.
* <p>
* It also reads autocommit setting because when using Spring Transaction MyBatis
* thinks that autocommit is always false and will always call commit/rollback
* so we need to no-op that calls.
*/
private void openConnection() throws SQLException {
// 获取的Connection 操作,会检测是否启用了事务管理器。如果有事务管理器,会给当前线程绑定或者获取一个 Connection对象。 (Will bind a Connection to the thread if transaction synchronization is active)
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
}
}
public <T> T execute(CallableStatementCreator csc, CallableStatementCallback<T> action) {
// 和Mybatis SpringManagedTransaction 获取Connection 调用同样的方法。
Connection con = DataSourceUtils.getConnection(getDataSource());
CallableStatement cs = null;
try {
cs = csc.createCallableStatement(conToUse);
applyStatementSettings(cs);
T result = action.doInCallableStatement(csToUse);
handleWarnings(cs);
return result;
}
catch (SQLException ex) {
// ...
}
finally {
JdbcUtils.closeStatement(cs);
DataSourceUtils.releaseConnection(con, getDataSource());
}
}
Mybatis JdbcTemplate 获取Connection 的方式殊途同归
,保证获取的是同一个 Connection。当前线程的ThreadLocal 是否有初始化
。DataSourceUtils.getConnection
获取连接。现有项目使用Spring3 框架,想要对应用所有的请求进行统一监控。
一种方案是配置全局的
HandlerInterceptor
,实现对请求的监控。一种方案是基于AOP,拦截@RequestMapping,实现对Controller 的方法进行监控。项目中采用的是这种方案,主要目的是收集方法粒度的性能、可用率的数据。
编写完自定义的Aspect 监控类后,发现切面不生效。
在spring-config.xml 中配置启用了@AspectJ 特性。
<!--Enables the use of the @AspectJ style of Spring AOP.-->
<aop:aspectj-autoproxy proxy-target-class="true" />
使用的是Spring3 的框架,web.xml 中集成Spring 配置如下所示
<!-- spring3 常见的配置方式,分为Spring容器配置 和 SpringMVC 配置,两个配置文件中各自扫描对应的包路径 -->
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-config.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<servlet>
<servlet-name>springmvc</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-config-mvc.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
首先确认自定义的AOP Bean 是正常加载后,剩下的问题就是探究为什么<aop:aspectj-autoproxy/>
没有生效。
aspectj-autoproxy 配置的工作原理参考 Insight aop:aspectj-autoproxy 解析。
通过该配置,会给Spring 容器注册AnnotationAwareAspectJAutoProxyCreator, 属于BeanPostProcessor
组件,这样的话,就会在生成bean 的以后,根据Bean的特征,对bean 生成代理类。
作用:checking for marker interfaces or wrapping them with proxies.
对应到本案例,就是检查是否存在 @RequestMapping 注解,并对符合条件的bean 进行代理,实现监控功能的增强。
通过检查AspectJ 语法和对应的路径,发现也正常,那么问题就可能出现在Spring 容器中。
通过上述的web.xml 配置,可以分析出存在两个spring 容器,容器是父子的关系。
// listener配置的父容器
// Bootstrap listener to start up and shut down Spring's root WebApplicationContext
// org.springframework.web.context.ContextLoader#initWebApplicationContext
servletContext.setAttribute(WebApplicationContext.ROOT_WEB_APPLICATION_CONTEXT_ATTRIBUTE, this.context);
// servlet 配置的子容器
// Instantiate the WebApplicationContext for this servlet
ConfigurableWebApplicationContext wac = BeanUtils.instantiateClass(XmlWebApplicationContext.class);
wac.setEnvironment(getEnvironment());
// set root WebApplicationContext
wac.setParent(parent);
按照正常的思路来看,子容器应该能集成父容器的特性和注册的解析器、处理器等。
事实证明,父容器注册AnnotationAwareAspectJAutoProxyCreator,子容器不会查找和使用。这也是AOP代理不生效的原因
。
/**
* 根据指定的类型,返回唯一的bean
* 如果在当前的BeanFactory 找不到,会委托给父BeanFactory 查找,从而实现递归查找的策略
*/
public <T> T getBean(Class<T> requiredType) throws BeansException {
Assert.notNull(requiredType, "Required type must not be null");
String[] beanNames = getBeanNamesForType(requiredType);
if (beanNames.length > 1) {
// ......
}
if (beanNames.length == 1) {
return getBean(beanNames[0], requiredType);
}
else if (beanNames.length == 0 && getParentBeanFactory() != null) {
// 从父容器中查找指定的Bean
return getParentBeanFactory().getBean(requiredType);
}
else {
throw new NoSuchBeanDefinitionException(requiredType, "expected single bean but found " +
beanNames.length + ": " + StringUtils.arrayToCommaDelimitedString(beanNames));
}
}
需要在子容器的配置文件中添加:<aop:aspectj-autoproxy proxy-target-class="true" />
或者合并容器的配置,统一由servlet 启动加载。
从父容器中获取Bean 的方法:
org.springframework.beans.factory.BeanFactoryUtils#beansOfTypeIncludingAncestors()
AOP - Aspect Oriented Programming
/**
* 只需要测试类添加@Transactional, 保证所有的测试方式都有事务控制,并且执行完成后自动回滚。
* 参考 https://docs.spring.io/spring-framework/docs/4.3.13.RELEASE/spring-framework-reference/htmlsingle/#testcontext-tx-enabling-transactions
*/
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TestConfig.class)
@Transactional
public class HibernateUserRepositoryTests {
@Test
public void createUser() {
// track initial state in test database:
final int count = countRowsInTable("user");
User user = new User(...);
repository.save(user);
// Manual flush is required to avoid false positive in test
sessionFactory.getCurrentSession().flush();
assertNumUsers(count + 1);
// 方法执行完成后,insert user 会自动回滚,不会在数据库中添加脏数据
}
}
Spring 采用容器注入的方式,在Junit 测试框架的基础上
,引入TestExecutionListener 的概念,方便拓展自定义的事件。
事务控制就是基于这样的设计,通过实现TestExecutionListener 接口,达到事务特性增强的目的
。
/**
* 支持Spring Test 事务控制实现的TestExecutionListener
*
* @see org.springframework.test.context.transaction.TransactionalTestExecutionListener
*/
public class TransactionalTestExecutionListener extends AbstractTestExecutionListener {
/**
* 如果是以事务的形式运行,那么会在测试方法执行前,开启事务
* 前提条件:有@Transactional 注解,并且容器中配置了TransactionManager。SpringBoot 不用考虑,使用Spring 框架的需要检查下是否有这个配置 <tx:annotation-driven transaction-manager="transactionManager"/>
*/
public void beforeTestMethod(TestContext testContext) throws Exception {
final Method testMethod = testContext.getTestMethod();
// 如果是@NotTransactional,直接跳过
if (testMethod.isAnnotationPresent(NotTransactional.class)) {
return;
}
// 检测当前的方法或者类是否需要事务、是否有配置TransactionManager
PlatformTransactionManager tm = null;
if (transactionAttribute != null) {
tm = getTransactionManager(testContext, transactionAttribute.getQualifier());
}
// 获取到tm,开启事务。按照惯例,缓存事务对象,方便afterTestMethod 进行事务继续操作。
if (tm != null) {
TransactionContext txContext = new TransactionContext(tm, transactionAttribute);
runBeforeTransactionMethods(testContext);
// 复习下开启事务的过程:Check settings、check propagation、Acquire Connection、setAutoCommit(false)、Bind the session holder to the thread
startNewTransaction(testContext, txContext);
this.transactionContextCache.put(testMethod, txContext);
}
}
public void afterTestMethod(TestContext testContext) throws Exception {
Method testMethod = testContext.getTestMethod();
// If the transaction is still active...
TransactionContext txContext = this.transactionContextCache.remove(testMethod);
if (txContext != null && !txContext.transactionStatus.isCompleted()) {
try {
// rollback or commit
endTransaction(testContext, txContext);
}
finally {
runAfterTransactionMethods(testContext);
}
}
}
}
在测试容器启动过程中,会检测当前的测试类是否有指定 TestExecutionListeners。如果没有就采用默认的TestExecutionListeners。
/**
* Determine the default TestExecutionListener classes.
* 默认的TestExecutionListeners:
* ServletTestExecutionListener
* DependencyInjectionTestExecutionListener
* TransactionalTestExecutionListener 就是本人啦!
* @see org.springframework.test.context.TestContextManager
*/
protected Set<Class<? extends TestExecutionListener>> getDefaultTestExecutionListenerClasses() {
Set<Class<? extends TestExecutionListener>> defaultListenerClasses = new LinkedHashSet<>();
for (String className : DEFAULT_TEST_EXECUTION_LISTENER_CLASS_NAMES) {
try {
defaultListenerClasses.add((Class<? extends TestExecutionListener>) getClass().getClassLoader().loadClass(className));
} catch (Throwable t) {
}
}
return defaultListenerClasses;
}
方法注解,不受事务控制
。自定义标识测试的方法是否要回滚
,用来避免默认情况下回滚的操作。在使用Mybatis 查询过程中,会有如下日志打印:
DEBUG com.foo.dao.FooMapper.selectFooList - <== Total: 276
我们知道,Mybatis 只有接口,并不存在日志中的这个类和对应的方法,那么Mybatis 执行日志是怎么打印的?
logger 的name即mappedStatementId,也就是接口名 + 方法名
Mybatis 会在对 Connection、PreparedStatement、ResultSet 进行动态代理
。这样,就会分别打印出需要执行的SQL、SQL入参、结果集等。ConnectionLogger、PreparedStatementLogger、ResultSetLogger
,都是实现了InvocationHandler。既然一个类对应一个代理实现类,那么能不能用静态代理去实现呢?
如果是全部的方法增强,那么可以静态代理,如果是个别方法去增强,那么还是动态代理更加方便和灵活。打印日志只需要对个别的方法进行拦截,在不侵入原有数据库从操作逻辑的前提下,还是动态代理更加合适。/**
* 获取DB Connection
* 如果DEBUG日志级别,会对获取的connection 进行日志增强代理。
* @param statementLog 根据 MappedStatement.getStatementLog() 获得。MappedStatement 初始化过程中,会针对每个Mapper 接口的方法,初始对应的logger,即statementLog = LogFactory.getLog(logId)
*
* @from org.apache.ibatis.executor.BaseExecutor#getConnection
*/
protected Connection getConnection(Log statementLog) throws SQLException {
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
// 针对日志DEBUG级别,会对当前的connection 进行代理,通过statementLog 打印日志和传递Logger
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
}
/**
* JDBC 查询结果集的日志增强实现。
* ResultSetLogger 实现InvocationHandler 接口,拦截 java.sql.ResultSet#next, 对查询结果进行统计和打印
*/
public final class ResultSetLogger extends BaseJdbcLogger implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] params) throws Throwable {
try {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, params);
}
Object o = method.invoke(rs, params);
// 拦截next() 方法,进行row count统计和结果数据打印。
if ("next".equals(method.getName())) {
if (((Boolean) o)) {
rows++;
// 如果应用的日志级别为TRACE, Mybatis 会详细的打印出ResultSet 的所有返回数据。
if (isTraceEnabled()) {
ResultSetMetaData rsmd = rs.getMetaData();
final int columnCount = rsmd.getColumnCount();
if (first) {
first = false;
printColumnHeaders(rsmd, columnCount);
}
printColumnValues(columnCount);
}
} else {
// 结果集遍历完后,打印 Total 信息,解答本文的疑问。
debug(" Total: " + rows, false);
}
}
clearColumnInfo();
return o;
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<properties>
<property name="patternlayout">%d{HH:mm:ss} [%t] %p %c - %m%n</property>
</properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT" follow="true">
<PatternLayout pattern="${patternlayout}"/>
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
<!-- name对应Mapper 所在的包路径下 -->
<Logger name="com.foo.dao" level="TRACE" additivity="true"/>
</Loggers>
</Configuration>