跟代码过程中发现的问题记录下 ServiceFactory是一切的基础,它是通过静态代码块初始化的 static { try { ProviderBootStrap.init(); String appname = ConfigManagerLoader.getConfigManager().getAppName(); if (StringUtils.isBlank(appname) || "NULL".equalsIgnoreCase(appname)) { throw new RuntimeException("appname is not assigned"); } } catch (Throwable var1) { logger.error("error while initializing service factory:", var1); System.exit(1); } } 实际调用的是 ProviderBootStrap ServiceFactory ServiceFactory.addService(ProviderConfig<T> providerConfig)
public static <T> void addService(ProviderConfig<T> providerConfig) throws RpcException { if (StringUtils.isBlank(providerConfig.getUrl())) { providerConfig.setUrl(getServiceUrl(providerConfig)); } try { ServicePublisher.addService(providerConfig); ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig); providerConfig.setServerConfig(serverConfig); ServicePublisher.publishService(providerConfig, false); } catch (RegistryException var2) { throw new RpcException("error while adding service:" + providerConfig, var2); } catch (Throwable var3) { throw new RpcException("error while adding service:" + providerConfig, var3); } }
public static ServerConfig startup(ProviderConfig<?> providerConfig) { ServerConfig serverConfig = providerConfig.getServerConfig(); if (serverConfig == null) { throw new IllegalArgumentException("server config is required"); } // 根据protocol+port判断当前服务器列表中是否存在相关的服务器实例 Server server = serversMap.get(serverConfig.getProtocol() + serverConfig.getPort()); if (server != null) { // 存在直接返回 server.addService(providerConfig); return server.getServerConfig(); } else { // 不存在先创建一个 synchronized (ProviderBootStrap.class) { List<Server> servers = ExtensionLoader.newExtensionList(Server.class); for (Server s : servers) { if (!s.isStarted()) { if (s.support(serverConfig)) { s.start(serverConfig); // 添加服务 s.addService(providerConfig); serversMap.put(s.getProtocol() + serverConfig.getPort(), s); logger.warn("pigeon " + s + "[version:" + VersionUtils.VERSION + "] has been started"); break; } } } server = serversMap.get(serverConfig.getProtocol() + serverConfig.getPort()); // 预启动内部的请求处理器核心线程 if (server != null) { server.getRequestProcessor().getRequestProcessThreadPool().prestartAllCoreThreads(); return server.getServerConfig(); } return null; } } }
public <T> void addService(ProviderConfig<T> providerConfig) { this.requestProcessor.addService(providerConfig); this.doAddService(providerConfig); List<ServiceChangeListener> listeners = ServiceChangeListenerContainer.getListeners(); Iterator i$ = listeners.iterator(); while(i$.hasNext()) { ServiceChangeListener listener = (ServiceChangeListener)i$.next(); listener.notifyServiceAdded(providerConfig); } } RequestThreadPoolProcessor.addService(ProviderConfig<T> providerConfig) 负责创建线程池并缓存起来 public synchronized <T> void addService(ProviderConfig<T> providerConfig) { String url = providerConfig.getUrl(); Map<String, ProviderMethodConfig> methodConfigs = providerConfig.getMethods(); ServiceMethodCache methodCache = ServiceMethodFactory.getServiceMethodCache(url); Set<String> methodNames = methodCache.getMethodMap().keySet(); if (this.needStandalonePool(providerConfig)) { if (this.methodThreadPools == null) { this.methodThreadPools = new ConcurrentHashMap(); } if (this.serviceThreadPools == null) { this.serviceThreadPools = new ConcurrentHashMap(); } if (providerConfig.getActives() > 0 && CollectionUtils.isEmpty(methodConfigs)) { DynamicThreadPool pool = (DynamicThreadPool)this.serviceThreadPools.get(url); if (pool == null) { int actives = providerConfig.getActives(); int coreSize = (int)((float)actives / DEFAULT_POOL_RATIO_CORE) > 0 ? (int)((float)actives / DEFAULT_POOL_RATIO_CORE) : actives; pool = new DynamicThreadPool("Pigeon-Server-Request-Processor-service", coreSize, actives, actives); this.serviceThreadPools.putIfAbsent(url, pool); } } if (!CollectionUtils.isEmpty(methodConfigs)) { Iterator i$ = methodNames.iterator(); while(i$.hasNext()) { String name = (String)i$.next(); if (methodConfigs.containsKey(name)) { String key = url + "#" + name; DynamicThreadPool pool = (DynamicThreadPool)this.methodThreadPools.get(key); if (pool == null) { int actives = DEFAULT_POOL_ACTIVES; ProviderMethodConfig methodConfig = (ProviderMethodConfig)methodConfigs.get(name); if (methodConfig != null && methodConfig.getActives() > 0) { actives = methodConfig.getActives(); } int coreSize = (int)((float)actives / DEFAULT_POOL_RATIO_CORE) > 0 ? (int)((float)actives / DEFAULT_POOL_RATIO_CORE) : actives; pool = new DynamicThreadPool("Pigeon-Server-Request-Processor-method", coreSize, actives, actives); this.methodThreadPools.putIfAbsent(key, pool); } } } } } } 二 服务的发布
1 调用链路 ServiceFactory.addService(ProviderConfig<T> providerConfig) -> ServicePublisher.addService(providerConfig); 这里调用主要是缓存服务信息 -> RequestThreadPoolProcessor.addService(ProviderConfig<T> providerConfig) 不知道怎么就走到这里了,以后还得多跟代码 -> AbstractServer.doAddService AbstractServer的实现类默认的是NettyServer -> ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig); -> ServicePublisher.publishService String url, String registryUrl, int port, String group url = http://service./cargo-detail/DriverVisitDetailFacade_1.0.0 registryUrl = @HTTP@http://service./cargo-detail/DriverVisitDetailFacade_1.0.0 port = 4080 group = hfd33 2 写zk节点 RegistryManager.getInstance().registerSupportNewProtocol(serverAddress, registryUrl, false); serverAddress = 10.190.20.66:4080 registryUrl = @HTTP@http://service./cargo-detail/DriverVisitDetailFacade_1.0.0 通过跟代码解析出来的zk地址是 10.13.65.186:2181,10.13.65.187:2181,10.13.65.188:2181
CuratorRegistry.setSupportNewProtocol
上面的是protocol在zk上的格式,还有另一个更重要的zk节点 registerPersistentNode
该方法不止会写一个zk节点,分别如下 1 this.client.set(weightPath, "" + weight); /DP/WEIGHT/10.190.20.66:4080 1 2 this.client.create(servicePath, serviceAddress); /DP/SERVER/@HTTP@http:^^service.^cargo-detail^DriverVisitDetailFacade_1.0.0/hfd33 10.190.20.66:4080 3 public void setServerApp(String serverAddress, String app) {
4 RegistryManager.getInstance().setServerVersion(serverAddress, "2.7.8"); // serverAddress = 10.190.20.66:4080 最后写入zk的path是 /DP/VERSION/10.190.20.66:4080
|
|