A sequence of elements supporting sequential and parallel aggregate operations.
A stream pipeline consists of a source, zero or more intermediate operations, and a terminal operation
Streams are lazy
了解了上述的概念,要分析 parallel stream 作用原理,就要从terminal operation 源码入手。
/**
* Stream 定义的接口可以作为源码阅读的切入口
*
* Returns whether this stream, if a terminal operation were to be executed,
* would execute in parallel. 注意:terminal operation executed
* @see java.util.stream.BaseStream#isParallel
*/
boolean isParallel();
java.util.stream.AbstractPipeline the core implementations of the Stream interface
java.util.stream.TerminalOp
java.util.stream.ForEachOps.ForEachOp
java.util.stream.ReduceOps.ReduceOp
/**
* 通过TerminalOp 发生真正的调用,是否并行,在此处确定
* Evaluate the pipeline with a terminal operation to produce a result.
*
* @see java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
*/
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
return isParallel() ? terminalOp.evaluateParallel(...) : terminalOp.evaluateSequential(...);
}
// 这样的Terminal Operation
persons.parallelStream().map(person -> person.name).forEach(System.out::println)
如何实现并发调用,只需要关注evaluateParallel
方法。
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
class ForEachTask<S, T> extends ForkJoinTask<T>
至此,我们就知道并发是依靠ForkJoinTask 实现的。
近期,在使用SpringBoot Test 单测验证service 逻辑过程中,发现service “注入”的mapper 竟然是null, 导致业务方法NPE。
之前的单测一直是这样的,是因为SpringBoot 版本问题,导致注入失败么?
@Slf4j
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class FooServiceProcessTest {
/**
* 诡异的问题就在这里,serviceProcess 注入并没有问题。
* serviceProcess 的属性mapper 是null
*/
@Autowired
private FooServiceProcess serviceProcess;
/**
* 测试更新关联关系操作
* cascadeUpdateFoo 方法中,是通过this.fooMapper.update() 方式编写的。
* 需要测试的方法是private ,为了方便,直接用反射调用的。就是因为这个非常规使用,导致个诡异问题出现
*/
@Test
void cascadeUpdateFoo() throws InvocationTargetException, IllegalAccessException {
Method method = ReflectionUtils.findMethod(serviceProcess.getClass(), "cascadeUpdateFoo", FooParam.class);
ReflectionUtils.makeAccessible(method);
FooParam param = new FooParam();
param.setId(180L);
param.setName("foo");
param.setUpdateUser("foo");
method.invoke(serviceProcess, param);
}
}
从报错日志和直观的判断,mapper 为null,可能的问题是Mybatis 或者Mybatis 插件扫描,加载出了问题。通过查看ApplicationContext, mapper 是存在的,况且注入使用的是@Autowired,如果容器没有mapper,启动就会报错。
然后就是从spring-test 去排查,是否会存在父子容器注入替换的问题。之前在排查SpringMVC 就遇到过这种问题。
参考上述代码,FooServiceProcess
可以正常注入,追查容器的初始化或者注入线索的意义就不太大了。那就从单测代码上追查。
通过debug, 发现真正的问题。serviceProcess 实例并非原始的对象,是经过cglib 动态代理过的proxy。如果是调用private 方法,调用的就是这个proxy 的方法,自然会出现NPE 的问题。
通过上述的排查和分析,这个问题和SpringBoot 并没有关系。
问题出在cglib 动态代理类的调用方式和继承机制上,通过编写测试类HotPot,再按照上述的反射调用private 方法,即可复现。
/**
* private, cglib can't enhance...<br/>
* this 是指proxy, getMaterial() 则调用了目标instance.getMaterial(), 是可以正常返回的。<br/>
* 打印结果:<br/>
* x.y.simple.HotPot$$EnhancerByCGLIB$$23f73db0
* intercept method--class x.y.simple.HotPot.getMaterial
* this material is china., need heat
*/
private void prepare() {
System.out.println(this.getClass().getName());
System.out.println("this material is " + this.getMaterial() + ", need heat");
}
/**
* 同样是无法代理的private, this 是proxy, material 是proxy 实例的属性,所以是null<br/>
* 打印结果:<br/>
* the material null need wash, after used
*/
private void finish () {
System.out.println("the material " + this.material + " need wash, after used");
}
/**
* public 方法,proxy.toString, 拦截后,变为目标instance.toString()<br/>
* 打印结果:<br/>
* x.y.simple.HotPot
* HotPot{temperature=99.9, material='china.'}
*/
@Override
public String toString() {
System.out.println(this.getClass().getName());
return "HotPot{temperature=" + this.temperature + ", material='" + this.material + '\'' + '}';
}
代码库: https://github.com/MrRobot5/sample-base-more
不要使用非常规的写法,不常用的功能或者api,遇到问题的概率会大很多。
这次的问题深入追查后,是cglib 代理的问题,是java 继承理论知识的充分表现。
越是离奇诡异的问题,原因越是简单和直白。但是排查弯路总是要走的,经验就是这么来的。
一次上线完成后,观察线上日志,发现很多死锁异常(org.springframework.dao.DeadlockLoserDataAccessException)。
mysql 提示信息:Deadlock found when trying to get lock; try restarting transaction。
这是一次并发插入/更新引发的死锁案例
事务1 | 事务2 | 备注 |
---|---|---|
INSERT INTO country (countryname, countrycode) VALUES (‘Angola’,’AO’) |
||
INSERT INTO country (countryname, countrycode) VALUES (‘Brazil’,’BR’) |
正常执行,相安无事 | |
UPDATE country SET countryname = ‘Angola’ WHERE countryname = ‘AO’ |
阻塞。事务1等待事务2 释放锁 | |
UPDATE country SET countryname = ‘Brazil’ WHERE countryname = ‘BR’ |
DEADLOCK。事务2 等待事务1 释放锁 |
-- 查询死锁日志
SHOW ENGINE innodb STATUS
------------------------
LATEST DETECTED DEADLOCK
------------------------
2022-06-02 19:37:26 9ec
*** (1) TRANSACTION:
TRANSACTION 11187331, ACTIVE 5 sec fetching rows
mysql tables in use 1, locked 1
LOCK WAIT 4 lock struct(s), heap size 312, 21 row lock(s), undo log entries 1
UPDATE country SET countryname = 'Angola' WHERE countryname = 'AO'
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 515 page no 3 n bits 88 index `PRIMARY` of table `test`.`country` trx id 11187331 lock_mode X waiting
*** (2) TRANSACTION:
TRANSACTION 11187330, ACTIVE 5 sec starting index read
mysql tables in use 1, locked 1
UPDATE country SET countryname = 'Brazil' WHERE countryname = 'BR'
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 515 page no 3 n bits 88 index `PRIMARY` of table `test`.`country` trx id 11187330 lock_mode X locks rec but not gap
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 515 page no 3 n bits 88 index `PRIMARY` of table `test`.`country` trx id 11187330 lock_mode X waiting
*** WE ROLL BACK TRANSACTION (2)
日志怎么看?
上述的 innodb 日志已经精简过了。
其中列出了死锁相关的两个事务*** (1) TRANSACTION
。
引发死锁的事务会列出持有的锁 HOLDS THE LOCK(S)
lock_mode X locks rec but not gap。 这对应的就是insert 语句的加锁。
最后,WE ROLL BACK TRANSACTION (2)
使用Mysql 客户端,通过两个客户端执行SQL 来模拟并发。
-- 查询事务隔离级别 (Transaction Isolation Levels)
SELECT @@GLOBAL.tx_isolation, @@tx_isolation;
client1 | client2 | 备注 |
---|---|---|
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; | ||
START TRANSACTION; |
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; START TRANSACTION; |
事务开始前才能修改事务隔离级别 |
UPDATE country SET countrycode =’foo’ WHERE Id =13; |
||
UPDATE country SET countrycode =’foo’ WHERE Id =13; |
/* SQL错误(1205):Lock wait timeout exceeded; try restarting transaction */ | |
SELECT * FROM country WHERE Id =13; |
/* SQL错误(1205):Lock wait timeout exceeded; try restarting transaction */ |
注意: 示例中是根据主键,mysql 是行锁,如果update 不是同一行数据,不会发生锁冲突。如果非索引更新,那么就是表锁,表事务串行。
client1 | client2 | 备注 |
---|---|---|
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; | ||
START TRANSACTION; |
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; | |
START TRANSACTION; |
||
UPDATE country SET countrycode =’foo-2’ WHERE Id =14; |
||
SELECT * FROM country WHERE Id =14; |
读写不阻塞。读取的是旧值。 | |
COMMIT; | ||
SELECT * FROM country WHERE Id =14; |
client2 读取的countrycode 仍然是旧的。 |
client1 | client2 | 备注 |
---|---|---|
SET TRANSACTION ISOLATION LEVEL READ COMMITTED; | ||
START TRANSACTION; |
SET TRANSACTION ISOLATION LEVEL READ COMMITTED; | |
START TRANSACTION; |
||
UPDATE country SET countrycode =’foo-3’ WHERE Id =14; |
||
SELECT * FROM country WHERE Id =14; |
读取的是旧值。 | |
COMMIT; | ||
SELECT * FROM country WHERE Id =14; |
读取的是client1更新的值。 |
client1 | client2 | 备注 |
---|---|---|
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED; | ||
START TRANSACTION; |
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED; | |
START TRANSACTION; |
||
UPDATE country SET countrycode =’foo-4’ WHERE Id =14; |
||
SELECT * FROM country WHERE Id =14; |
读取的是client1更新的值。 | |
ROLLBACK; | ||
SELECT * FROM country WHERE Id =14; |
读取的是client1更新前的值。 |
事务的隔离级别(access mode)是声明事务间数据可见级别的。
无论什么样的级别,对于同一条数据更新,写锁,肯定是互斥的。
These characteristics set the transaction isolation level or access mode.
The isolation level is used for operations on InnoDB tables. The access mode may be specified as of MySQL 5.6.5 and indicates whether transactions operate in read/write or read-only mode.
最近读源码,发现包装模式的写法案例。虽然不是严格的包装设计模式,但是能达到封装和适配的效果,简化使用方式和功能扩展。
这种编程模式非常实用,实际开发过程中可以借鉴。
在 Spring 封装javamail 的实现中,有个MimeMessageHelper 类。
通过包装MimeMessage 对象,提供简便的操作API。同时,镜像操作MimeMessage对象,对MimeMessage 直接赋值的操作。
/**
* 虽然名为Helper, 并不是常用的static Helper。Helper 提供的简便API调用,会直接映射到mimeMessage 对象上
* 使用方式:new MimeMessageHelper(mimeMessage);
* @see org.springframework.mail.javamail.MimeMessageHelper
*/
public MimeMessageHelper(MimeMessage mimeMessage) {
this(mimeMessage, null);
}
借鉴: 对于复杂的三方插件或者类库封装,可以参考spring 对javamail 的适配。