相对于单例数据库的查询操作,分布式数据查询会有很多技术难题。
本文记录 Mysql 分库分表 和 Elasticsearch Join 查询的实现思路,学习分布式场景数据处理的设计思路。
分库分表场景下,查询语句如何分发,数据如何组织。相较于NoSQL 数据库,Mysql 在SQL 规范的范围内,相对比较容易适配分布式场景。
基于 sharding-jdbc 中间件的方案,了解整个设计思路。
sharding-jdbc 代理了原始的 datasource, 实现 jdbc 规范来完成分库分表的分发和组装,应用层无感知。
执行流程:SQL解析 => 执行器优化 => SQL路由 => SQL改写 => SQL执行 => 结果归并 io.shardingsphere.core.executor.ExecutorEngine#execute
Join 语句的解析,决定了要分发 SQL 到哪些实例节点上。对应SQL路由。
SQL 改写就是要把原始(逻辑)表名,改为实际分片的表名。
复杂情况下,Join 查询分发的最多执行的次数 = 数据库实例 × 表A分片数 × 表B分片数
示例代码工程:git@github.com:cluoHeadon/sharding-jdbc-demo.git
/**
* 执行查询 SQL 切入点,从这里可以完整 debug 执行流程
* @see ShardingPreparedStatement#execute()
* @see ParsingSQLRouter#route(String, List, SQLStatement) Join 查询实际涉及哪些表,就是在路由规则里匹配得出来的。
*/
public boolean execute() throws SQLException {
try {
// 根据参数(决定分片)和具体的SQL 来匹配相关的实际 Table。
Collection<PreparedStatementUnit> preparedStatementUnits = route();
// 使用线程池,分发执行和结果归并。
return new PreparedStatementExecutor(getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits).execute();
} finally {
JDBCShardingRefreshHandler.build(routeResult, connection).execute();
clearBatch();
}
}
启用 sql 打印,直观看到实际分发执行的 SQL
# 打印的代码,就是在上述route 得出 ExecutionUnits 后,打印的
sharding.jdbc.config.sharding.props.sql.show=true
sharding-jdbc 根据不同的SQL 语句,会有不同的路由策略。我们关注的 Join 查询,实际相关就是以下两种策略。
StandardRoutingEngine binding-tables 模式
ComplexRoutingEngine 最复杂的情况,笛卡尔组合关联关系。
-- 参数不明,不能定位分片的情况
select * from order o inner join order_item oi on o.order_id = oi.order_id
-- 路由结果
-- Actual SQL: db1 ::: select * from order_1 o inner join order_item_1 oi on o.order_id = oi.order_id
-- Actual SQL: db1 ::: select * from order_1 o inner join order_item_0 oi on o.order_id = oi.order_id
-- Actual SQL: db1 ::: select * from order_0 o inner join order_item_1 oi on o.order_id = oi.order_id
-- Actual SQL: db1 ::: select * from order_0 o inner join order_item_0 oi on o.order_id = oi.order_id
-- Actual SQL: db0 ::: select * from order_1 o inner join order_item_1 oi on o.order_id = oi.order_id
-- Actual SQL: db0 ::: select * from order_1 o inner join order_item_0 oi on o.order_id = oi.order_id
-- Actual SQL: db0 ::: select * from order_0 o inner join order_item_1 oi on o.order_id = oi.order_id
-- Actual SQL: db0 ::: select * from order_0 o inner join order_item_0 oi on o.order_id = oi.order_id
首先,对于 NoSQL 数据库,要求 Join 查询,可以考虑是不是使用场景和用法有问题。
然后,不可避免的,有些场景需要这个功能。Join 查询的实现更贴近SQL 引擎。
基于 elasticsearch-sql 组件的方案,了解大概的解决思路。
这是个elasticsearch 插件,通过提供http 服务实现类 SQL 查询的功能,高版本的elasticsearch 已经具备该功能
因为 elasticsearch 没有 Join 查询的特性,所以实现 SQL Join 功能,需要提供更加底层的功能,涉及到 Join 算法。
源码地址:git@github.com:NLPchina/elasticsearch-sql.git
/**
* Execute the ActionRequest and returns the REST response using the channel.
* @see ElasticDefaultRestExecutor#execute
* @see ESJoinQueryActionFactory#createJoinAction Join 算法选择
*/
@Override
public void execute(Client client, Map<String, String> params, QueryAction queryAction, RestChannel channel) throws Exception{
// sql parse
SqlElasticRequestBuilder requestBuilder = queryAction.explain();
// join 查询
if(requestBuilder instanceof JoinRequestBuilder){
// join 算法选择。包括:HashJoinElasticExecutor、NestedLoopsElasticExecutor
// 如果关联条件为等值(Condition.OPEAR.EQ),则使用 HashJoinElasticExecutor
ElasticJoinExecutor executor = ElasticJoinExecutor.createJoinExecutor(client,requestBuilder);
executor.run();
executor.sendResponse(channel);
}
// 其他类型查询 ...
}
三种 Join 算法:Nested Loop Join,Hash Join、 Merge Join
MySQL 只支持 NLJ 或其变种,8.0.18 版本后支持 Hash Join
NLJ 相当于两个嵌套循环,用第一张表做 Outter Loop,第二张表做 Inner Loop,Outter Loop 的每一条记录跟 Inner Loop 的记录作比较,最终符合条件的就将该数据记录。
Hash Join 分为两个阶段; build
构建阶段和 probe
探测阶段。
可以使用Explain 查看使用哪种 Join 算法。
EXPLAIN FORMAT=JSON
SELECT * FROM
sale_line_info u
JOIN sale_line_manager o ON u.sale_line_code = o.sale_line_code;
// 使用JQuery 发送 json 数据
$.ajax({
url: "some.php",
type: "POST",
dataType:"json",
contentType:"application/json",
data: JSON.stringify({ name: "John", location: "Boston" }),
success: function (msg) {
alert( "Data Saved: " + msg );
}
});
// 默认类型 from ajaxSettings
// 遵从业界规范,如果使用 curl --data,默认也是 Content-Type: application/x-www-form-urlencoded
contentType: "application/x-www-form-urlencoded; charset=UTF-8",
数据(object)默认按照表单传参格式(key/value 键值对)来序列化数据。
如果后台接收 json 格式数据,需要调用 JSON.stringify() 处理为 string。
object 如果不处理为string,会按照 object.toString() 来处理。发送的数据是:[object Object]
Get 请求,参数会追加到URL。Post 请求,数据通过请求体 (body) 发送到服务端。
// Convert data if not already a string
// processData 默认 true
// jQuery.param 处理的数据会调用 encodeURIComponent encoded
if ( s.data && s.processData && typeof s.data !== "string" ) {
s.data = jQuery.param( s.data, s.traditional );
}
// 序列化结果: a=bc&d=e%2Cf
$.param({ a: "bc", d: "e,f" })
// 序列化结果: a%5B%5D=1&a%5B%5D=2, 也就是 a[]=1&a[]=2
$.param({ a: [1,2] })
XMLHttpRequest
(XHR) objects are used to interact with servers.You can retrieve data from a URL without having to do a full page refresh. This enables a Web page to update just part of a page without disrupting what the user is doing.
// 使用 jqXHR 包装真正的 xhr, 把xhr 的事件包装为一些事件和接口,方便开发
jQuery.ajaxSettings.xhr = function() {
try {
return new window.XMLHttpRequest();
} catch ( e ) {}
};
Jquery 对于指定的 dataType, 会尝试进行反序列化的转换。
支持的类型包括:html json xml script
数据类型转换在
function ajaxConvert
完成。
// Data converters. Http 请求返回的文本,可以根据 dataType 和支持,进行转换
// 配置格式约定:Keys separate source (or catchall "*") and destination types with a single space
converters: {
// Convert anything to text
"* text": String,
// Text to html (true = no transformation)
"text html": true,
// Evaluate text as a json expression。 常用的 json ,就是这样处理的。🎈
"text json": JSON.parse,
// Parse text as xml
"text xml": jQuery.parseXML,
"text script": function( text ) {
jQuery.globalEval( text );
return text;
}
},
// Timeout 配置,通过定时器实现
if ( s.async && s.timeout > 0 ) {
// 发起请求前,添加一个定时器,到期发起取消操作
timeoutTimer = window.setTimeout( function() {
// jqXHR 作为包装类,真正的XHR 命名为 transport。包装类可以提供更加丰富的接口,流程可控。
jqXHR.abort( "timeout" );
}, s.timeout );
}
// 无论请求结果怎样,都会调用 done(), 会移除定时器
// Clear timeout if it exists
if ( timeoutTimer ) {
window.clearTimeout( timeoutTimer );
}
关于Jquery 数据传递,浏览器数据传输的疑问,可以通过 Curl 发送请求,查看具体的HTTP 协议数据格式来确认。
# 描述: use the curl command with –data and –data-raw options to send text over a POST request:
$ website="https://webhook.site/5610141b-bd31-4940-9a83-1db44ff2283e"
# used the –trace-ascii option to trace the requests and capture the trace logs in the website-data.log and website-data-raw.log files.
$ curl --data "simple_body" --trace-ascii website-data.log "$website"
$ curl --data-raw "simple_body" --trace-ascii website-data-raw.log "$website"
# 结果:website-data-raw.log:0083: Content-Type: application/x-www-form-urlencoded
$ grep --max-count=1 Content-Type website-data-raw.log website-data.log
参考:[How to Post Raw Body Data With cURL | Baeldung](https://www.baeldung.com/curl-post-raw-body-data) |
记录 Commons DBCP testOnBorrow 的作用机制,从一点去分析数据库连接池获取的过程以及架构分层设计。
笔记内容会按照每层的作用,贯穿分析整个调用流程。
The indication of whether objects will be validated before being borrowed from the pool.
If the object fails to validate, it will be dropped from the pool, and we will attempt to borrow another.
testOnBorrow 不是 dbcp 定义的,是commons-pool 定义的。commons-pool 详细的定义了资源池使用的一套规范和运行流程。
/**
* Borrow an object from the pool. get object from 资源池
* @see org.apache.commons.pool2.impl.GenericObjectPool#borrowObject(long)
*/
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
PooledObject<T> p = null;
// if validation fails, the instance is destroyed and the next available instance is examined.
// This continues until either a valid instance is returned or there are no more idle instances available.
while (p == null) {
// If there is one or more idle instance available in the pool,
// then an idle instance will be selected based on the value of getLifo(), activated and returned.
p = idleObjects.pollFirst();
if (p != null) {
// 设置 testOnBorrow 就会进行可用性校验
if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
boolean validate = false;
Throwable validationThrowable = null;
try {
// 具体的校验实现由实现类完成。
// see org.apache.commons.dbcp2.PoolableConnectionFactory
validate = factory.validateObject(p);
} catch (final Throwable t) {
PoolUtils.checkRethrow(t);
validationThrowable = t;
}
if (!validate) {
try {
// 如果校验异常,会销毁该资源。
// obj is not valid and should be dropped from the pool
destroy(p);
destroyedByBorrowValidationCount.incrementAndGet();
} catch (final Exception e) {
// Ignore - validation failure is more important
}
p = null;
}
}
}
}
return p.getObject();
}
dbcp 是特定于管理数据库连接的资源池。
PoolableConnectionFactory is a PooledObjectFactory
PoolableConnection is a PooledObject
/**
* @see PoolableConnectionFactory#validateObject(PooledObject)
*/
@Override
public boolean validateObject(final PooledObject<PoolableConnection> p) {
try {
/**
* 检测资源池对象的创建时间,是否超过生存时间
* 如果超过 maxConnLifetimeMillis, 不再委托数据库连接进行校验,直接废弃改资源
* @see PoolableConnectionFactory#setMaxConnLifetimeMillis(long)
*/
validateLifetime(p);
// 委托数据库连接进行自我校验
validateConnection(p.getObject());
return true;
} catch (final Exception e) {
return false;
}
}
/**
* 数据库连接层的校验。具体到是否已关闭、是否与 server 连接可用
* @see Connection#isValid(int)
*/
public void validateConnection(final PoolableConnection conn) throws SQLException {
if(conn.isClosed()) {
throw new SQLException("validateConnection: connection closed");
}
conn.validate(_validationQuery, _validationQueryTimeout);
}
Spring 基于 Java 配置方式,在Bean 有依赖配置的情况下,可以直接写成方法调用。框架背后的原理(magic🎭)是怎样的?
Injecting Inter-bean Dependencies 有哪些使用误区?
@Configuration
public class AppConfig {
@Bean
public BeanOne beanOne() {
// dependency is as simple as having one bean method call another
// 表面上是方法的直接调用,实际上是 Spring constructor injection
// beanTwo 方法的调用,背后是 Spring Bean 创建和初始化的过程。
return new BeanOne(beanTwo());
}
@Bean
public BeanTwo beanTwo() {
// 虽然是new instance, 但是多次调用 beanTwo 方法,得到的是同一个 instance
// singleton scope by default
return new BeanTwo();
}
}
有的开发者不理解 Inter-bean injection 的原理,理解为方法直接调用。会人工调用诸如 afterPropertiesSet 这样bean 初始化的方法,这样是没有必要的。
只有在 @Configuration 配置的类里,Inter-bean injection 才生效。
As of Spring 3.2, CGLIB classes have been repackaged under org.springframework.cglib
。 这些代码没有注释,需要去 CGLIB 查看。
@Configuration 类里的方法不能为private 或者 final,CGLIB 生成的继承类的规则限制。防止出现不生效的情况,Spring 会强制校验。
Configuration problem: @Bean method 'beanTwo' must not be private or final; change the method's modifiers to continue
容器启动时,对所有 @Configuration 注解的类进行动态代理(增强)。拦截类中的方法,对于 @Bean 注解的方法,会作为 factory-bean 方式对待,方法直接调用转化为 bean 获取的过程(get or create_and_get)。
动态代理使用 CGLIB 实现。
org.springframework.context.annotation.ConfigurationClassEnhancer 动态代理 @Configuration 类。
org.springframework.context.annotation.ConfigurationClassPostProcessor 发起代理(增强)的入口。postProcessBeanFactory
经过上述的分析,源码查看侧重在 net.sf.cglib.proxy.MethodInterceptor。
/**
* 拦截 @Bean 注解的方法,替换为 Bean 相关的操作(scoping and AOP proxying)。
* @see ConfigurationClassEnhancer
* @from org.springframework.context.annotation.ConfigurationClassEnhancer.BeanMethodInterceptor
*/
private static class BeanMethodInterceptor implements MethodInterceptor, ConditionalCallback {
@Override
@Nullable
public Object intercept(Object enhancedConfigInstance, Method beanMethod, Object[] beanMethodArgs,
MethodProxy cglibMethodProxy) throws Throwable {
// 从proxy 获取BeanFactory。代理类有个属性 $$beanFactory 持有 BeanFactory 实例。
ConfigurableBeanFactory beanFactory = getBeanFactory(enhancedConfigInstance);
String beanName = BeanAnnotationHelper.determineBeanNameFor(beanMethod);
// Determine whether this bean is a scoped-proxy
if (BeanAnnotationHelper.isScopedProxy(beanMethod)) {
String scopedBeanName = ScopedProxyCreator.getTargetBeanName(beanName);
if (beanFactory.isCurrentlyInCreation(scopedBeanName)) {
beanName = scopedBeanName;
}
}
// To handle the case of an inter-bean method reference, we must explicitly check the
// container for already cached instances.
// 常规获取 Bean, beanFactory.getBean(beanName)
return resolveBeanReference(beanMethod, beanMethodArgs, beanFactory, beanName);
}
}
bean 构造和初始化,使用 method 定义的方法来实现。
/**
* Read the given BeanMethod, registering bean definitions
* with the BeanDefinitionRegistry based on its contents.
* @from org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader
*/
private void loadBeanDefinitionsForBeanMethod(BeanMethod beanMethod) {
ConfigurationClass configClass = beanMethod.getConfigurationClass();
MethodMetadata metadata = beanMethod.getMetadata();
String methodName = metadata.getMethodName();
// 获取 method @Bean 注解配置
AnnotationAttributes bean = AnnotationConfigUtils.attributesFor(metadata, Bean.class);
Assert.state(bean != null, "No @Bean annotation attributes");
// Consider name and Register aliases...
ConfigurationClassBeanDefinition beanDef = new ConfigurationClassBeanDefinition(configClass, metadata);
beanDef.setResource(configClass.getResource());
beanDef.setSource(this.sourceExtractor.extractSource(metadata, configClass.getResource()));
if (metadata.isStatic()) {
// static @Bean method 不依赖 configClass instance, 可以直接初始化为bean
if (configClass.getMetadata() instanceof StandardAnnotationMetadata) {
beanDef.setBeanClass(((StandardAnnotationMetadata) configClass.getMetadata()).getIntrospectedClass());
}
else {
beanDef.setBeanClassName(configClass.getMetadata().getClassName());
}
beanDef.setUniqueFactoryMethodName(methodName);
}
else {
// instance @Bean method 🎈🎈🎈
beanDef.setFactoryBeanName(configClass.getBeanName());
beanDef.setUniqueFactoryMethodName(methodName);
}
// beanDef.setAttribute...
this.registry.registerBeanDefinition(beanName, beanDefToRegister);
}
Spring 框架为了使用的方便,尽可能的隐藏了实现细节。让开发更加方便。
因为隐藏了技术细节,对于诸如上述的 Inter-bean dependency 配置方式,开发者可能会误解,会显示调用框架的接口。
这次分析AOP 的使用场景,又一次加深了动态代理的理解,眼前一亮。
通过AOP 方式对Bean-Method 代理,可以用 cache 使用的角度去理解。如果存在,从 beanFactory cache 获取并返回;如果不存在,则根据 Bean-Method 去创建bean, 并put 到beanFactory cache, 再返回。✨
CPU 使用高一般原因是出现代码死循环。
寻找到死循环的代码,就需要找对应的 stack。
匹配JVM stack,就需要查找CPU 使用率高的进程/线程。
遵循上述思路,总结排查步骤。
查找java 进程id
# 获取java 进程信息。 eg: 进程id = 12309
jps -v
查找 java 进程的线程 CPU 使用率
# -p 用于指定进程,-H 用于获取每个线程的信息
top -p 12309 -H
获取java 进程的 stack
jstack -l 12309 > stack.log