多线程和Reactor模式下解决数据丢失
到了此章节,相信你对 link-flow 的整体流程有了大致的掌握了,但在整个流程中,对于线程方面有很多的细节,稍微处理不好就容易丢失,所以此章节将会讲清楚这些细节问题。
多线程的切换
首先来看这四个部分,在这四个部分中都进行了线程名的输出
第1处:服务过滤执行的开始
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
System.out.println("第一处执行的线程;"+Thread.currentThread().getName());
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
第2处:获取到所有服务列表 第3处:服务的路由过滤
/**
* 当要调用服务的时候,会调用此方法,此方法会返回所有的服务列表。路由过滤器会在此处执行
* */
@Override
public Flux<List<ServiceInstance>> get() {
//获取所有的服务列表
Flux<List<ServiceInstance>> listFlux = super.get();
//从ThreadLocal获取参数
Map<String, Object> parameterMap = BaseParameterHolder.getParameterMap();
Map<String,Object> newMap = new HashMap<>(BaseParameterHolder.getParameterMap().size());
newMap.putAll(parameterMap);
System.out.println("第2处执行的线程;"+Thread.currentThread().getName());
listFlux = listFlux.map(serviceInstances -> {
System.out.println("第3处执行的线程;"+Thread.currentThread().getName());
//到这里线程已经发生变化了,所以要把之前线程的将参数放入到ThreadLocal中,这样才能在后续的过滤器中获取到参数
BaseParameterHolder.setParameterMap(newMap);
List<ServiceInstance> allServers = new ArrayList<>();
Optional.ofNullable(serviceInstances).ifPresent(allServers::addAll);
//执行过滤器
linkFlowFilterLoadBalance.selectServer(allServers);
return allServers;
});
//返回结果
return listFlux;
}
第4处:服务版本权重的执行
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
System.out.println("第四处执行的线程;"+Thread.currentThread().getName());
//经过权重过滤选择后,肯定是一个服务实例了
WeightInfoWrapper weightInfoWrapper = linkFlowWeight.parseWeightInfo();
if (linkFlowWeight.isServiceWeight(instances,weightInfoWrapper)) {
ServiceInstance serviceInstance = linkFlowWeight.selectServiceInstance(instances, weightInfoWrapper);
instances.clear();
instances.add(serviceInstance);
}
// Do not move position when there is only 1 instance, especially some suppliers
// have already filtered instances
if (instances.size() == 1) {
return new DefaultResponse(instances.get(0));
}
// Ignore the sign bit, this allows pos to loop sequentially from 0 to
// Integer.MAX_VALUE
int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
付费内容提示
该文档的全部内容仅对「JavaUp项目实战&技术讲解」知识星球用户开放
加入星球后,你可以获得:
- 超级八股文:100万+字的全栈技术知识库,涵盖技术核心、数据库、中间件、分布式等深度剖析的讲解
- 讲解文档:黑马点评Plus、大麦、大麦pro、大麦AI、流量切换、数据中台的从0到1的550+详细文档
- 讲解视频:黑马点评Plus、大麦、大麦pro、大麦AI、流量切换、数据中台的核心业务详细讲解
- 1 对 1 解答:可以对我进行1对1的问题提问,而不仅仅只限于项目
- 针对性服务:有没理解的地方,文档或者视频还没有讲到可以提出,本人会补充
- 面试与简历指导:提供面试回答技巧,项目怎样写才能在简历中具有独特的亮点
- 中间件环境:对于项目中需要使用的中间件,可直接替换成我提供的云环境
- 面试后复盘:小伙伴去面试后,如果哪里被面试官问住了,可以再找我解答
- 远程的解决:如果在启动项目遇到问题,本人可以帮你远程解决
进入星球后,即可享受上述所有服务,保证不会再有其他隐藏费用。
