最近排查线上问题发现业务 Table 自增id 不连续,进一步发现是事务回滚造成。
-- 上述问题示例。 这样的情况就导致 main_table 自增id 并不连续/出现空挡。
START TRANSACTION;
-- 执行成功,自增id 已分配
INSERT INTO main_table VALUES (?, ?, ?);
-- 执行失败,事务 rollback
INSERT INTO sub_table VALUES (?, ?, ?);
那么,自增id 在数据库中是什么样的机制?可以从 H2 数据库实现方案获得启发。
-- h2 auto_increment 建表语句
create table test(id bigint auto_increment, name varchar(255));
insert into test(name) values('hello');
insert into test(name) values('world');
select * from test;
通过打开 H2 数据库文件,可以看到 test 建表语句是按照H2 dalect 翻译的。可以看到H2 数据库的auto_increment 是通过 SEQUENC 实现的。
CREATE CACHED TABLE PUBLIC.TEST(
ID BIGINT DEFAULT (NEXT VALUE FOR PUBLIC.SYSTEM_SEQUENCE_620D39CD) NOT NULL NULL_TO_DEFAULT SEQUENCE PUBLIC.SYSTEM_SEQUENCE_620D39CD,
NAME VARCHAR(255)
)
官方文档 create_sequence ✨
-- Sequence can produce only integer values.
CREATE SEQUENCE SEQ_ID;
CREATE SEQUENCE SEQ2 AS INTEGER START WITH 10;
Used values are never re-used, even when the transaction is rolled back.
org.h2.command.dml.Insert
org.h2.expression.SequenceValue
org.h2.schema.Sequence
creat table 实例化过程
// @see org.h2.command.ddl.CreateTable#update
for (Column c :data.columns) {
// 如果列定义为 auto_increment, 需要进行dialet 翻译。
if (c.isAutoIncrement()) {
int objId = getObjectId();
//
c.convertAutoIncrementToSequence(session, getSchema(), objId, data.temporary);
}
}
/**
* Convert the auto-increment flag to a sequence that is linked with this
* table.
*
* @see org.h2.table.Column#convertAutoIncrementToSequence
*/
public void convertAutoIncrementToSequence(Session session, Schema schema, int id, boolean temporary) {
// 生成唯一的 sequenceName
while (true) {
ValueUuid uuid = ValueUuid.getNewRandom();
String s = uuid.getString();
s = s.replace('-', '_').toUpperCase();
sequenceName = "SYSTEM_SEQUENCE_" + s;
if (schema.findSequence(sequenceName) == null) {
break;
}
}
// Sequence 实例化,实现自增生成唯一数据
Sequence seq = new Sequence(schema, id, sequenceName, start, increment);
if (temporary) {
seq.setTemporary(true);
} else {
// CREATE SEQUENCE, 数据库 Schema 持久化
session.getDatabase().addSchemaObject(session, seq);
}
setAutoIncrement(false, 0, 0);
// 把Sequence 包装为表达式片段
SequenceValue seqValue = new SequenceValue(seq);
setDefaultExpression(session, seqValue);
setSequence(seq);
}
Insert 过程
// 遍历 Table 的 columns 和 参数Expression, 执行值填充
for (int i = 0; i < columnLen; i++) {
Column c = columns[i];
int index = c.getColumnId();
// 获取 Value Expression
Expression e = expr[i];
if (e != null) {
try {
// 如果是SequenceValue, 生成自增数字
Value v = c.convert(e.getValue(session));
newRow.setValue(index, v);
} catch (DbException ex) {
throw setRow(ex, x, getSQL(expr));
}
}
}
/**
* Get the next value for this sequence.
*
* @param session the session
* @return the next value
*/
public synchronized long getNext(Session session) {
// valueWithMargin 计算, flush 过程...
long v = value;
value += increment;
return v;
}
H2 database 是java 编写的数据库,简单易懂,对于数据库实现原理是个很好参考。
通过阅读数据库的实现,对于应用开发帮助很大,可以适当的扬长避短。
对于SQL 规范,每种数据库都有对应的实现方式,dalect。
分页是 web application 开发最常见的功能。在使用不同的框架和工具过程中,发现初始行/页的定义不同,特意整理记录。
语法:[LIMIT {[offset,] row_count}]
LIMIT row_count
is equivalent to LIMIT 0, row_count
.
The offset of the initial row is 0 (not 1)
参考:MySQL :: MySQL 5.7 Reference Manual :: 13.2.9 SELECT Statement
后端分页,简单讲,就是数据库的分页。 对于mysql 来讲,就是上述 offset row_count 的计算过程。
/**
* 计算起止行号 offset
* @see com.github.pagehelper.Page#calculateStartAndEndRow
*/
private void calculateStartAndEndRow() {
// pageNum 页码,从1开始。 pageNum < 1 , 忽略计算。
this.startRow = this.pageNum > 0 ? (this.pageNum - 1) * this.pageSize : 0;
this.endRow = this.startRow + this.pageSize * (this.pageNum > 0 ? 1 : 0);
}
/**
* 计算总页数 pages/ pageCount。
*/
public void setTotal(long total) {
if (pageSize > 0) {
pages = (int) (total / pageSize + ((total % pageSize == 0) ? 0 : 1));
} else {
pages = 0;
}
}
SQL 拼接实现: com.github.pagehelper.dialect.helper.MySqlDialect
关键类:
org.springframework.data.domain.Pageable
org.springframework.data.web.PageableDefault
/**
* offset 计算,不同于pagehelper, page 页码从0 开始。 default is 0
* @see org.springframework.data.domain.AbstractPageRequest#getOffset
*/
public long getOffset() {
return (long)this.page * (long)this.size;
}
/*
* 使用 Math.ceil 实现。
* @see org.springframework.data.domain.Page#getTotalPages()
*/
@Override
public int getTotalPages() {
return getSize() == 0 ? 1 : (int) Math.ceil((double) total / (double) getSize());
}
/**
* offset 计算,不同于pagehelper, page 页码从0 开始。
* @see org.springframework.data.jdbc.core.convert.SqlGenerator#applyPagination
*/
private SelectBuilder.SelectOrdered applyPagination(Pageable pageable, SelectBuilder.SelectOrdered select) {
// 在spring-data-relation, Limit 抽象为 SelectLimitOffset
SelectBuilder.SelectLimitOffset limitable = (SelectBuilder.SelectLimitOffset) select;
// To read the first 20 rows from start use limitOffset(20, 0). to read the next 20 use limitOffset(20, 20).
SelectBuilder.SelectLimitOffset limitResult = limitable.limitOffset(pageable.getPageSize(), pageable.getOffset());
return (SelectBuilder.SelectOrdered) limitResult;
}
spring-data-commons 提供 mvc 层的分页参数处理器
/**
* Annotation to set defaults when injecting a {@link org.springframework.data.domain.Pageable} into a controller method.
*
* @see org.springframework.data.web.PageableDefault
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface PageableDefault {
/**
* The default-size the injected {@link org.springframework.data.domain.Pageable} should get if no corresponding
* parameter defined in request (default is 10).
*/
int size() default 10;
/**
* The default-pagenumber the injected {@link org.springframework.data.domain.Pageable} should get if no corresponding
* parameter defined in request (default is 0).
*/
int page() default 0;
}
MVC 参数处理器: org.springframework.data.web.PageableHandlerMethodArgumentResolver
Thymeleaf is a modern server-side Java template engine for both web and standalone environments.
<!-- spring-data-examples\web\example\src\main\resources\templates\users.html-->
<nav>
<!-- class样式 bootstrap 默认的分页用法-->
<ul class="pagination" th:with="total = ${users.totalPages}">
<li th:if="${users.hasPrevious()}">
<a th:href="@{/users(page=${users.previousPageable().pageNumber},size=${users.size})}" aria-label="Previous">
<span aria-hidden="true">«</span>
</a>
</li>
<!-- spring-data-examples 分页计算从0 开始, /users?page=0&size=10 -->
<!-- 生成页码列表 ①②③④ -->
<li th:each="page : ${#numbers.sequence(0, total - 1)}"><a th:href="@{/users(page=${page},size=${users.size})}" th:text="${page + 1}">1</a></li>
<li th:if="${users.hasNext()}">
<a th:href="@{/users(page=${users.nextPageable().pageNumber},size=${users.size})}" aria-label="Next">
<span aria-hidden="true">»</span>
</a>
</li>
</ul>
</nav>
// from node_modules\element-ui\packages\pagination\src\pagination.js
// page-count 总页数,total 和 page-count 设置任意一个就可以达到显示页码的功能;
computed: {
internalPageCount() {
if (typeof this.total === 'number') {
// 页数计算使用 Math.ceil
return Math.max(1, Math.ceil(this.total / this.internalPageSize));
} else if (typeof this.pageCount === 'number') {
return Math.max(1, this.pageCount);
}
return null;
}
},
/**
* 起始页计算。 page 页码从1 开始。
*/
getValidCurrentPage(value) {
value = parseInt(value, 10);
const havePageCount = typeof this.internalPageCount === 'number';
let resetValue;
if (!havePageCount) {
if (isNaN(value) || value < 1) resetValue = 1;
} else {
// 强制赋值起始值 1
if (value < 1) {
resetValue = 1;
} else if (value > this.internalPageCount) {
// 数据越界,强制拉回到PageCount
resetValue = this.internalPageCount;
}
}
if (resetValue === undefined && isNaN(value)) {
resetValue = 1;
} else if (resetValue === 0) {
resetValue = 1;
}
return resetValue === undefined ? value : resetValue;
},
java.sql.SQLException: Value '0000-00-00 00:00:00' can not be represented as java.sql.Timestamp
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
at com.mysql.jdbc.ResultSetRow.getTimestampFast(ResultSetRow.java:947)
at com.mysql.jdbc.ResultSetImpl.getTimestampInternal(ResultSetImpl.java:5921)
jdbcUrl 声明 zeroDateTimeBehavior=convertToNull
即可。
jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&allowMultiQueries=true&useSSL=true&zeroDateTimeBehavior=convertToNull
根据上述的异常日志,可以直接找到异常的出处。
/**
* @see com.mysql.jdbc.ResultSetRow#getTimestampFast
*/
protected Timestamp getTimestampFast(...) throws SQLException {
try {
// 标识是否 '0000-00-00 00:00:00'
boolean allZeroTimestamp = true;
// 标识是否为时间格式, 时间是允许 '00:00:00'
boolean onlyTimePresent = false;
// check 数据格式
for (int i = 0; i < length; i++) {
byte b = timestampAsBytes[offset + i];
if (b == ' ' || b == '-' || b == '/') {
onlyTimePresent = false;
}
if (b != '0' && b != ' ' && b != ':' && b != '-' && b != '/' && b != '.') {
allZeroTimestamp = false;
break;
}
}
if (!onlyTimePresent && allZeroTimestamp) {
if (ConnectionPropertiesImpl.ZERO_DATETIME_BEHAVIOR_CONVERT_TO_NULL.equals(conn.getZeroDateTimeBehavior())) {
// 如果连接指定 convertToNull,遇到 allZeroTimestamp 直接返回 null
return null;
} else if (ConnectionPropertiesImpl.ZERO_DATETIME_BEHAVIOR_EXCEPTION.equals(conn.getZeroDateTimeBehavior())) {
// 默认配置 exception, 遇到 allZeroTimestamp 就会抛出上述的异常
throw SQLError.createSQLException("Value '" + StringUtils.toString(timestampAsBytes) + "' can not be represented as java.sql.Timestamp",
SQLError.SQL_STATE_ILLEGAL_ARGUMENT, this.exceptionInterceptor);
}
// We're left with the case of 'round' to a date Java _can_ represent, which is '0001-01-01'.
return rs.fastTimestampCreate(null, 1, 1, 1, 0, 0, 0, 0);
}
}
}
连接所有的配置是以 com.mysql.jdbc.ConnectionPropertiesImpl.ConnectionProperty 体现的。包括 allowableValues defaultValue sinceVersion 关键信息。
private StringConnectionProperty zeroDateTimeBehavior = new StringConnectionProperty("zeroDateTimeBehavior", ZERO_DATETIME_BEHAVIOR_EXCEPTION,
new String[] { ZERO_DATETIME_BEHAVIOR_EXCEPTION, ZERO_DATETIME_BEHAVIOR_ROUND, ZERO_DATETIME_BEHAVIOR_CONVERT_TO_NULL },
Messages.getString("ConnectionProperties.zeroDateTimeBehavior",
new Object[] { ZERO_DATETIME_BEHAVIOR_EXCEPTION, ZERO_DATETIME_BEHAVIOR_ROUND, ZERO_DATETIME_BEHAVIOR_CONVERT_TO_NULL }),
"3.1.4", MISC_CATEGORY, Integer.MIN_VALUE);
mysql driver 对于异常日期数据的处理,可以借鉴到应用程序的开发中。
首先,需要考虑到异常数据对程序的影响,如果无法继续,需要抛出异常;
然后,针对异常数据,提供声明容错的的方案;提升系统稳定性。
@Intercepts({@Signature(
type= Executor.class,
method = "update",
args = {MappedStatement.class ,Object.class})})
public class ExamplePlugin implements Interceptor {
@Override
public Object intercept(Invocation invocation) throws Throwable {
// implement pre-processing if needed
Object returnObject = invocation.proceed();
// implement post-processing if needed
return returnObject;
}
}
Intercepts (mybatis 3.5.11 API)
支持四大组件的处理拦截,实际使用拦截器主要是 Executor。即上述的 @Intercepts.type
ParameterHandler
StatementHandler
Executor
ResultSetHandler
private void pluginElement(XNode parent) throws Exception {
if (parent != null) {
for (XNode child : parent.getChildren()) {
String interceptor = child.getStringAttribute("interceptor");
Properties properties = child.getChildrenAsProperties();
Interceptor interceptorInstance = (Interceptor) resolveClass(interceptor).getDeclaredConstructor().newInstance();
interceptorInstance.setProperties(properties);
configuration.addInterceptor(interceptorInstance);
}
}
}
/**
* Executor 初始化,同时进行插件增强。
* @see org.apache.ibatis.session.Configuration#newExecutor()
*/
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
Executor executor = new SimpleExecutor(this, transaction);
if (cacheEnabled) {
executor = new CachingExecutor(executor);
}
// 基于JDK Proxy, executor 是经过各个interceptor 层层代理后的结果
// 代理生成是通过 Plugin.wrap(Object target, Interceptor interceptor) 完成的
executor = (Executor) interceptorChain.pluginAll(executor);
return executor;
}
public class Plugin implements InvocationHandler {
public static Object wrap(Object target, Interceptor interceptor) {
// 解析@Intercepts ,得到期望拦截的组件和方法
Map<Class<?>, Set<Method>> signatureMap = getSignatureMap(interceptor);
Class<?> type = target.getClass();
Class<?>[] interfaces = getAllInterfaces(type, signatureMap);
// 如果有匹配拦截器的组件和方法,那么就进行代理。 例如: Executor.update()
if (interfaces.length > 0) {
return Proxy.newProxyInstance(
type.getClassLoader(),
interfaces,
// 关键实现,关注target 和 interceptor
new Plugin(target, interceptor, signatureMap));
}
return target;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try {
Set<Method> methods = signatureMap.get(method.getDeclaringClass());
if (methods != null && methods.contains(method)) {
// 通过 interceptor 把参数暴露出去,代码增强在interceptor 完成。
return interceptor.intercept(new Invocation(target, method, args));
}
return method.invoke(target, args);
} catch (Exception e) {
throw ExceptionUtil.unwrapThrowable(e);
}
}
}
public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h) throws IllegalArgumentException {
/*
* Look up or generate the designated proxy class.
* Return the cached copy or create the proxy class via the ProxyClassFactory
* Generate the specified proxy class via ProxyGenerator
*/
Class<?> cl = getProxyClass0(loader, intfs);
/*
* Invoke its constructor with the designated invocation handler.
* InvocationHandler 作为构造入参,生成代理实例
*/
try {
final Constructor<?> cons = cl.getConstructor(constructorParams);
return cons.newInstance(new Object[]{h});
}
}
One of the design goals of logback is to audit and debug complex distributed applications.
It lets the developer place information in a diagnostic context that can be subsequently retrieved by certain logback components.
Chapter 8: Mapped Diagnostic Context
Java 常用的日志工具,logback log4j slf4j 都支持MDC 特性。本文分析 logback MDC。
// MDC operations such as put() and get() affect only the MDC of the current thread, and the children of the current thread.
// Thus, there is no need for the developer to worry about thread-safety or synchronization when programming with the MDC because it handles these issues safely and transparently.
public class LogbackMDCAdapter implements MDCAdapter {
// The internal map is copied so as
// We wish to avoid unnecessarily copying of the map. To ensure
// efficient/timely copying, we have a variable keeping track of the last
// operation.
final ThreadLocal<Map<String, String>> copyOnThreadLocal = new ThreadLocal<Map<String, String>>();
}