当前位置: 首页 > news >正文

统一门户网站全网vip视频网站建设

统一门户网站,全网vip视频网站建设,软件正版化情况及网站建设情况,好听的域名取名本文基于Istio 1.18.0版本进行源码学习 5、xDS的异步分发 DiscoveryService主要包含下述逻辑#xff1a; 启动GRPC Server并接收来自Envoy端的连接请求接收Envoy端的xDS请求#xff0c;从ConfigController和ServiceController中获取配置和服务信息#xff0c;生成响应消息… 本文基于Istio 1.18.0版本进行源码学习 5、xDS的异步分发 DiscoveryService主要包含下述逻辑 启动GRPC Server并接收来自Envoy端的连接请求接收Envoy端的xDS请求从ConfigController和ServiceController中获取配置和服务信息生成响应消息发送给Envoy监听来自ConfigController的配置变化消息和ServiceController的服务变化消息并将配置和服务变化内容通过xDS接口推送到Envoy 1、DiscoveryService初始化 DiscoveryService初始化流程如下图 1xds.NewDiscoveryServer DiscoveryServer是通过调用xds.NewDiscoveryServer()方法初始化的返回的是一个DiscoveryServer实例。DiscoveryServer定义如下 // pilot/pkg/xds/discovery.go type DiscoveryServer struct {// Env is the model environment.Env *model.Environment// ConfigGenerator is responsible for generating data plane configuration using Istio networking// APIs and service registry info// xds配置的生成器ConfigGenerator core.ConfigGenerator// Generators allow customizing the generated config, based on the client metadata.// Key is the generator type - will match the Generator metadata to set the per-connection// default generator, or the combination of Generator metadata and TypeUrl to select a// different generator for a type.// Normal istio clients use the default generator - will not be impacted by this.// 针对不同配置类型的生成器Generators map[string]model.XdsResourceGenerator// ProxyNeedsPush is a function that determines whether a push can be completely skipped. Individual generators// may also choose to not send any updates.// 判断本次推送是否涉及该envoyProxyNeedsPush func(proxy *model.Proxy, req *model.PushRequest) bool// concurrentPushLimit is a semaphore that limits the amount of concurrent XDS pushes.// 控制推送并发数concurrentPushLimit chan struct{}// RequestRateLimit limits the number of new XDS requests allowed. This helps prevent thundering hurd of incoming requests.RequestRateLimit *rate.Limiter// InboundUpdates describes the number of configuration updates the discovery server has receivedInboundUpdates *atomic.Int64// CommittedUpdates describes the number of configuration updates the discovery server has// received, process, and stored in the push context. If this number is less than InboundUpdates,// there are updates we have not yet processed.// Note: This does not mean that all proxies have received these configurations; it is strictly// the push context, which means that the next push to a proxy will receive this configuration.CommittedUpdates *atomic.Int64// pushChannel is the buffer used for debouncing.// after debouncing the pushRequest will be sent to pushQueue// 统一接收其他组件发来的pushRequest的channelpushChannel chan *model.PushRequest// mutex used for protecting Environment.PushContextupdateMutex sync.RWMutex// pushQueue is the buffer that used after debounce and before the real xds push.// 主要是在真正xds推送前做防抖缓存pushQueue *PushQueue// debugHandlers is the list of all the supported debug handlers.debugHandlers map[string]string// adsClients reflect active gRPC channels, for both ADS and EDS.// ads和eds的grpc连接adsClients map[string]*ConnectionadsClientsMutex sync.RWMutexStatusReporter DistributionStatusCache// Authenticators for XDS requests. Should be same/subset of the CA authenticators.Authenticators []security.Authenticator// StatusGen is notified of connect/disconnect/nack on all connectionsStatusGen *StatusGenWorkloadEntryController *autoregistration.Controller// serverReady indicates caches have been synced up and server is ready to process requests.serverReady atomic.BooldebounceOptions debounceOptionsinstanceID stringclusterID cluster.ID// Cache for XDS resources// xds资源缓存Cache model.XdsCache// JwtKeyResolver holds a reference to the JWT key resolver instance.JwtKeyResolver *model.JwksResolver// ListRemoteClusters collects debug information about other clusters this istiod reads from.ListRemoteClusters func() []cluster.DebugInfo// ClusterAliases are aliase names for cluster. When a proxy connects with a cluster ID// and if it has a different alias we should use that a cluster ID for proxy.ClusterAliases map[cluster.ID]cluster.ID }2initGrpcServer 在初始化GRPC Server时调用了XDSServer.Register()方法向GRPC Server注册服务 // pilot/pkg/xds/discovery.go func (s *DiscoveryServer) Register(rpcs *grpc.Server) {// Register v3 serverdiscovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s) }DiscoveryServer实现了AggregatedDiscoveryServiceServer接口 // envoy/service/discovery/v3/ads.pb.go type AggregatedDiscoveryServiceServer interface {// This is a gRPC-only API.// 全量ads stream接口StreamAggregatedResources(AggregatedDiscoveryService_StreamAggregatedResourcesServer) error// 增量ads stream接口DeltaAggregatedResources(AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error }StreamAggregatedResources接收DiscoveryRequest返回DiscoveryResponse流包含全量的xDS数据 调用流程如下图 3initDiscoveryService initDiscoveryService()方法中将DiscoveryServer启动函数添加到Server的startFuncs队列中会在初始化完毕之后调用 // pilot/pkg/bootstrap/server.go func (s *Server) initDiscoveryService() {log.Infof(starting discovery service)// Implement EnvoyXdsServer grace shutdowns.addStartFunc(func(stop -chan struct{}) error {log.Infof(Starting ADS server)s.XDSServer.Start(stop)return nil}) }2、DiscoveryService启动 DiscoveryServer的Start()方法主要分别启动了四个协程 // pilot/pkg/xds/discovery.go func (s *DiscoveryServer) Start(stopCh -chan struct{}) {go s.WorkloadEntryController.Run(stopCh)go s.handleUpdates(stopCh)go s.periodicRefreshMetrics(stopCh)go s.sendPushes(stopCh)go s.Cache.Run(stopCh) }比较重要的是handleUpdates和sendPushes Config、Service、Endpoint对资源的处理最后都会调用ConfigUpdate()方法向DiscoveryServer的pushChannel队列发送PushRequest实现的处理流程如下 DiscoveryServer首先通过handleUpdates协程阻塞式地接收并处理更新请求并将PushRequest发送到DiscoveryServer的pushQueue中然后由sendPushes协程并发地将PushRequest发送给每一条连接的pushChannel最后由DiscoveryServer的流处理接口处理分发请求 3、handleUpdates // pilot/pkg/xds/discovery.go func (s *DiscoveryServer) handleUpdates(stopCh -chan struct{}) {debounce(s.pushChannel, stopCh, s.debounceOptions, s.Push, s.CommittedUpdates) }func debounce(ch chan *model.PushRequest, stopCh -chan struct{}, opts debounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) {var timeChan -chan time.Timevar startDebounce time.Timevar lastConfigUpdateTime time.TimepushCounter : 0debouncedEvents : 0// Keeps track of the push requests. If updates are debounce they will be merged.var req *model.PushRequestfree : truefreeCh : make(chan struct{}, 1)push : func(req *model.PushRequest, debouncedEvents int, startDebounce time.Time) {pushFn(req)updateSent.Add(int64(debouncedEvents))debounceTime.Record(time.Since(startDebounce).Seconds())freeCh - struct{}{}}pushWorker : func() {eventDelay : time.Since(startDebounce)quietTime : time.Since(lastConfigUpdateTime)// it has been too long or quiet enough// 当以下两个条件满足任意一个时,进行更新事件处理// 距离本轮第1次更新时间超过最大延迟时间(debounceMax默认10s)// 距离上次更新时间超过最大静默时间(debounceAfter默认100ms)if eventDelay opts.debounceMax || quietTime opts.debounceAfter {if req ! nil {pushCounterif req.ConfigsUpdated nil {log.Infof(Push debounce stable[%d] %d for reason %s: %v since last change, %v since last push, full%v,pushCounter, debouncedEvents, reasonsUpdated(req),quietTime, eventDelay, req.Full)} else {log.Infof(Push debounce stable[%d] %d for config %s: %v since last change, %v since last push, full%v,pushCounter, debouncedEvents, configsUpdated(req),quietTime, eventDelay, req.Full)}free falsego push(req, debouncedEvents, startDebounce)req nildebouncedEvents 0}} else {timeChan time.After(opts.debounceAfter - quietTime)}}for {select {case -freeCh:free truepushWorker()case r : -ch:// If reason is not set, record it as an unknown reasonif len(r.Reason) 0 {r.Reason []model.TriggerReason{model.UnknownTrigger}}if !opts.enableEDSDebounce !r.Full {// 立即触发eds推送// trigger push now, just for EDSgo func(req *model.PushRequest) {pushFn(req)updateSent.Inc()}(r)continue}lastConfigUpdateTime time.Now()if debouncedEvents 0 {// 启动新一轮的配置下发定时器,定时长度为最小静默时间timeChan time.After(opts.debounceAfter)// 记录第1次事件更新的时间startDebounce lastConfigUpdateTime}debouncedEvents// 合并pushRequestreq req.Merge(r)case -timeChan:if free {pushWorker()}case -stopCh:return}} }debounce()方法中通过最小静默时间debounceAfter合并更新时间通过最大延迟时间debounceMax控制xDS配置下发的时延。由于DiscoveryServer.Push需要初始化PushContext会消耗大量内存所以为了避免OOMdebounce()方法中控制DiscoveryServer.Push串行执行 DiscoveryServer.Push方法会一直往下调用直到把数据推入到DiscoveryServer的pushQueue管道中代码调用逻辑如下 3、PushContext的初始化 PushContext是xDS生成中最重要的结构对象几乎包含所有网格资源信息PushContext结构体定义如下 // pilot/pkg/model/push_context.go type PushContext struct {proxyStatusMutex sync.RWMutex// ProxyStatus is keyed by the error code, and holds a map keyed// by the ID.ProxyStatus map[string]map[string]ProxyPushStatus// Synthesized from env.MeshexportToDefaults exportToDefaults// ServiceIndex is the index of services by various fields.// service缓存ServiceIndex serviceIndex// serviceAccounts contains a map of hostname and port to service accounts.serviceAccounts map[serviceAccountKey][]string// virtualServiceIndex is the index of virtual services by various fields.// virtualService缓存virtualServiceIndex virtualServiceIndex// destinationRuleIndex is the index of destination rules by various fields.// destinationRule缓存destinationRuleIndex destinationRuleIndex// gatewayIndex is the index of gateways.gatewayIndex gatewayIndex// clusterLocalHosts extracted from the MeshConfigclusterLocalHosts ClusterLocalHosts// sidecarIndex stores sidecar resources// sidecar缓存sidecarIndex sidecarIndex// envoy filters for each namespace including global config namespace// envoyFilter缓存envoyFiltersByNamespace map[string][]*EnvoyFilterWrapper// wasm plugins for each namespace including global config namespacewasmPluginsByNamespace map[string][]*WasmPluginWrapper// AuthnPolicies contains Authn policies by namespace.AuthnPolicies *AuthenticationPolicies json:-// AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there// are no authorization policies in the cluster.AuthzPolicies *AuthorizationPolicies json:-// Telemetry stores the existing Telemetry resources for the cluster.Telemetry *Telemetries json:-// ProxyConfig stores the existing ProxyConfig resources for the cluster.ProxyConfigs *ProxyConfigs json:-// The following data is either a global index or used in the inbound path.// Namespace specific views do not apply here.// Mesh configuration for the mesh.Mesh *meshconfig.MeshConfig json:-// PushVersion describes the push version this push context was computed forPushVersion string// LedgerVersion is the version of the configuration ledgerLedgerVersion string// JwtKeyResolver holds a reference to the JWT key resolver instance.JwtKeyResolver *JwksResolver// GatewayAPIController holds a reference to the gateway API controller.GatewayAPIController GatewayController// cache gateways addresses for each network// this is mainly used for kubernetes multi-cluster scenarionetworkMgr *NetworkManagerNetworks *meshconfig.MeshNetworksInitDone atomic.BoolinitializeMutex sync.MutexambientIndex AmbientIndexes }PushContext对象的缓存为后续xDS配置的生成提供了快捷的资源索引 5、sendPushes // pilot/pkg/xds/discovery.go func (s *DiscoveryServer) sendPushes(stopCh -chan struct{}) {doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue) }func doSendPushes(stopCh -chan struct{}, semaphore chan struct{}, queue *PushQueue) {for {select {case -stopCh:returndefault:// We can send to it until it is full, then it will block until a pushes finishes and reads from it.// This limits the number of pushes that can happen concurrently// semaphore默认容量100,用来控制推送并发数semaphore - struct{}{}// Get the next proxy to push. This will block if there are no updates required.// 从pushQueue出队一个xdsConnectionclient, push, shuttingdown : queue.Dequeue()if shuttingdown {return}recordPushTriggers(push.Reason...)// Signals that a push is done by reading from the semaphore, allowing another send on it.doneFunc : func() {queue.MarkDone(client)-semaphore}proxiesQueueTime.Record(time.Since(push.Start).Seconds())var closed -chan struct{}if client.stream ! nil {closed client.stream.Context().Done()} else {closed client.deltaStream.Context().Done()}go func() {// 初始化pushEventpushEv : Event{pushRequest: push,done: doneFunc,}select {// pushEvent添加到xdsConnection的pushChannel中case client.pushChannel - pushEv:returncase -closed: // grpc stream was closeddoneFunc()log.Infof(Client closed connection %v, client.conID)}}()}} }doSendPushes()方法内启动了一个无限循环在default代码块中实现了主要的功能逻辑 push事件面向所有xDS客户端使用semaphore来控制推送并发数当semaphore满了之后会阻塞如果semaphore允许为每个客户端都启动一个发送协程尝试发送pushEvent到客户端队列pushChannel中 向pushChannel发送的pushEvent会在StreamAggregatedResources()方法中被处理 // pilot/pkg/xds/ads.go func (s *DiscoveryServer) StreamAggregatedResources(stream DiscoveryStream) error {return s.Stream(stream) }func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {if knativeEnv ! firstRequest.Load() {// How scaling works in knative is the first request is the loading request. During// loading request, concurrency1. Once that request is done, concurrency is enabled.// However, the XDS stream is long lived, so the first request would block all others. As a// result, we should exit the first request immediately; clients will retry.firstRequest.Store(false)return status.Error(codes.Unavailable, server warmup not complete; try again)}// Check if server is ready to accept clients and process new requests.// Currently ready means caches have been synced and hence can build// clusters correctly. Without this check, InitContext() call below would// initialize with empty config, leading to reconnected Envoys loosing// configuration. This is an additional safety check inaddition to adding// cachesSynced logic to readiness probe to handle cases where kube-proxy// ip tables update latencies.// See https://github.com/istio/istio/issues/25495.if !s.IsServerReady() {return status.Error(codes.Unavailable, server is not ready to serve discovery information)}ctx : stream.Context()peerAddr : 0.0.0.0if peerInfo, ok : peer.FromContext(ctx); ok {peerAddr peerInfo.Addr.String()}if err : s.WaitForRequestLimit(stream.Context()); err ! nil {log.Warnf(ADS: %q exceeded rate limit: %v, peerAddr, err)return status.Errorf(codes.ResourceExhausted, request rate limit exceeded: %v, err)}ids, err : s.authenticate(ctx)if err ! nil {return status.Error(codes.Unauthenticated, err.Error())}if ids ! nil {log.Debugf(Authenticated XDS: %v with identity %v, peerAddr, ids)} else {log.Debugf(Unauthenticated XDS: %s, peerAddr)}// InitContext returns immediately if the context was already initialized.if err s.globalPushContext().InitContext(s.Env, nil, nil); err ! nil {// Error accessing the data - log and close, maybe a different pilot replica// has more lucklog.Warnf(Error reading config %v, err)return status.Error(codes.Unavailable, error reading config)}con : newConnection(peerAddr, stream)// Do not call: defer close(con.pushChannel). The push channel will be garbage collected// when the connection is no longer used. Closing the channel can cause subtle race conditions// with push. According to the spec: Its only necessary to close a channel when it is important// to tell the receiving goroutines that all data have been sent.// Block until either a request is received or a push is triggered.// We need 2 go routines because read blocks in Recv().go s.receive(con, ids)// Wait for the proxy to be fully initialized before we start serving traffic. Because// initialization doesnt have dependencies that will block, there is no need to add any timeout// here. Prior to this explicit wait, we were implicitly waiting by receive() not sending to// reqChannel and the connection not being enqueued for pushes to pushChannel until the// initialization is complete.-con.initializedfor {// Go select{} statements are not ordered; the same channel can be chosen many times.// For requests, these are higher priority (client may be blocked on startup until these are done)// and often very cheap to handle (simple ACK), so we check it first.select {case req, ok : -con.reqChan:if ok {if err : s.processRequest(req, con); err ! nil {return err}} else {// Remote side closed connection or error processing the request.return -con.errorChan}case -con.stop:return nildefault:}// If there wasnt already a request, poll for requests and pushes. Note: if we have a huge// amount of incoming requests, we may still send some pushes, as we do not continue above;// however, requests will be handled ~2x as much as pushes. This ensures a wave of requests// cannot completely starve pushes. However, this scenario is unlikely.select {case req, ok : -con.reqChan:if ok {if err : s.processRequest(req, con); err ! nil {return err}} else {// Remote side closed connection or error processing the request.return -con.errorChan}case pushEv : -con.pushChannel:// 调用pushConnection推送xds配置err : s.pushConnection(con, pushEv)// 通知semaphore,本次推送完成pushEv.done()if err ! nil {return err}case -con.stop:return nil}} }每个客户端在通过pushConnection将本次xDS推送完后都会调用pushEv.done()方法通知semaphore 从pushQueue到最终推送xDS配置流程如下图 6、xDS配置的生成与分发 pushConnection()方法负责向Envoy发送xDS配置代码如下 // pilot/pkg/xds/ads.go func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {pushRequest : pushEv.pushRequestif pushRequest.Full {// Update Proxy with current information.s.computeProxyState(con.proxy, pushRequest)}// 根据资源的变化情况,判断是否需要为proxy更新xdsif !s.ProxyNeedsPush(con.proxy, pushRequest) {log.Debugf(Skipping push to %v, no updates required, con.conID)if pushRequest.Full {// Only report for full versions, incremental pushes do not have a new version.reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, nil)}return nil}// Send pushes to all generators// Each Generator is responsible for determining if the push event requires a pushwrl, ignoreEvents : con.pushDetails()// 遍历proxy监听的资源类型for _, w : range wrl {// 根据订阅的资源类型生成xds配置并发送到客户端if err : s.pushXds(con, w, pushRequest); err ! nil {return err}}if pushRequest.Full {// Report all events for unwatched resources. Watched resources will be reported in pushXds or on ack.reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, ignoreEvents)}proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds())return nil }pushConnection()方法核心逻辑如下 根据资源的变化情况判断是否需要为该Envoy代理更新xDS如果不需要更新直接返回遍历该Envoy代理监听的资源类型根据订阅的资源类型生成xds配置并发送到客户端 1ProxyNeedsPush ProxyNeedsPush函数根据资源的变化情况判断是否需要为该Envoy代理更新xDS默认实现为DefaultProxyNeedsPush函数最主要使用的是根据SidecarProxy判断是否依赖对应资源核心方法如下 // pilot/pkg/model/sidecar.go var (sidecarScopeKnownConfigTypes map[kind.Kind]struct{}{kind.ServiceEntry: {},kind.VirtualService: {},kind.DestinationRule: {},kind.Sidecar: {},}// clusterScopedConfigTypes includes configs when they are in root namespace,// they will be applied to all namespaces within the cluster.clusterScopedConfigTypes map[kind.Kind]struct{}{kind.EnvoyFilter: {},kind.AuthorizationPolicy: {},kind.RequestAuthentication: {},kind.WasmPlugin: {},} )func (sc *SidecarScope) DependsOnConfig(config ConfigKey) bool {if sc nil {return true}// This kind of config will trigger a change if made in the root namespace or the same namespace// clusterScopedConfigTypes类型配置,如果变更的配置在root namespace或相同namespace需要xds推送if _, f : clusterScopedConfigTypes[config.Kind]; f {return config.Namespace sc.RootNamespace || config.Namespace sc.Namespace}// This kind of config is unknown to sidecarScope.if _, f : sidecarScopeKnownConfigTypes[config.Kind]; !f {return true}// 如果sidecarScope包含对应配置需要进行xds推送_, exists : sc.configDependencies[config.HashCode()]return exists }DependsOnConfig()方法核心逻辑如下 如果变更的clusterScopedConfigTypes类型配置在root namespace或相同namespace需要xDS推送如果SidecarScope包含对应配置需要进行xDS推送 2pushXds // pilot/pkg/xds/xdsgen.go func (s *DiscoveryServer) pushXds(con *Connection, w *model.WatchedResource, req *model.PushRequest) error {if w nil {return nil}// 获取xds生成器gen : s.findGenerator(w.TypeUrl, con)if gen nil {return nil}t0 : time.Now()// If delta is set, client is requesting new resources or removing old ones. We should just generate the// new resources it needs, rather than the entire set of known resources.// Note: we do not need to account for unsubscribed resources as these are handled by parent removal;// See https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#deleting-resources.// This means if there are only removals, we will not respond.var logFiltered stringif !req.Delta.IsEmpty() features.PartialFullPushes !con.proxy.IsProxylessGrpc() {logFiltered filtered: strconv.Itoa(len(w.ResourceNames)-len(req.Delta.Subscribed))w model.WatchedResource{TypeUrl: w.TypeUrl,ResourceNames: req.Delta.Subscribed.UnsortedList(),}}// xds生成器生成xds配置res, logdata, err : gen.Generate(con.proxy, w, req)info : if len(logdata.AdditionalInfo) 0 {info logdata.AdditionalInfo}if len(logFiltered) 0 {info logFiltered}if err ! nil || res nil {// If we have nothing to send, report that we got an ACK for this version.if s.StatusReporter ! nil {s.StatusReporter.RegisterEvent(con.conID, w.TypeUrl, req.Push.LedgerVersion)}if log.DebugEnabled() {log.Debugf(%s: SKIP%s for node:%s%s, v3.GetShortType(w.TypeUrl), req.PushReason(), con.proxy.ID, info)}// If we are sending a request, we must respond or we can get Envoy stuck. Assert we do.// One exception is if Envoy is simply unsubscribing from some resources, in which case we can skip.isUnsubscribe : features.PartialFullPushes !req.Delta.IsEmpty() req.Delta.Subscribed.IsEmpty()if features.EnableUnsafeAssertions err nil res nil req.IsRequest() !isUnsubscribe {log.Fatalf(%s: SKIPPED%s for node:%s%s but expected a response for request, v3.GetShortType(w.TypeUrl), req.PushReason(), con.proxy.ID, info)}return err}defer func() { recordPushTime(w.TypeUrl, time.Since(t0)) }()resp : discovery.DiscoveryResponse{ControlPlane: ControlPlane(),TypeUrl: w.TypeUrl,// TODO: send different version for incremental edsVersionInfo: req.Push.PushVersion,Nonce: nonce(req.Push.LedgerVersion),Resources: model.ResourcesToAny(res),}configSize : ResourceSize(res)configSizeBytes.With(typeTag.Value(w.TypeUrl)).Record(float64(configSize))ptype : PUSHif logdata.Incremental {ptype PUSH INC}// 调用send方法将discoveryResponse发送出去if err : con.send(resp); err ! nil {if recordSendError(w.TypeUrl, err) {log.Warnf(%s: Send failure for node:%s resources:%d size:%s%s: %v,v3.GetShortType(w.TypeUrl), con.proxy.ID, len(res), util.ByteCount(configSize), info, err)}return err}switch {case !req.Full:if log.DebugEnabled() {log.Debugf(%s: %s%s for node:%s resources:%d size:%s%s,v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res), util.ByteCount(configSize), info)}default:debug : if log.DebugEnabled() {// Add additional information to logs when debug mode enabled.debug nonce: resp.Nonce version: resp.VersionInfo}log.Infof(%s: %s%s for node:%s resources:%d size:%v%s%s, v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res),util.ByteCount(ResourceSize(res)), info, debug)}return nil }pushXds()方法首先根据订阅的资源类型找到对应的xDS生成器然后通过生成器相应的xDS配置最后通过send接口发送出去 3XdsResourceGenerator.Generate Pilot主要负责6种xDS配置资源CDS、EDS、LDS、RDS、ECDS、NDS的生成及下发。接来下以CDS生成器为例看看DiscoveryServer是如何根据代理的属性及PushContext缓存生成原始的Cluster配置的 CDS配置通过ConfigGenerator.BuildClusters()方法生成代码如下 // pilot/pkg/networking/core/v1alpha3/cluster.go func (configgen *ConfigGeneratorImpl) BuildClusters(proxy *model.Proxy, req *model.PushRequest) ([]*discovery.Resource, model.XdsLogDetails) {// In Sotw, we care about all services.var services []*model.Serviceif features.FilterGatewayClusterConfig proxy.Type model.Router {services req.Push.GatewayServices(proxy)} else {services proxy.SidecarScope.Services()}return configgen.buildClusters(proxy, req, services) }func (configgen *ConfigGeneratorImpl) buildClusters(proxy *model.Proxy, req *model.PushRequest,services []*model.Service, ) ([]*discovery.Resource, model.XdsLogDetails) {clusters : make([]*cluster.Cluster, 0)resources : model.Resources{}envoyFilterPatches : req.Push.EnvoyFilters(proxy)// 创建cluster生成器cb : NewClusterBuilder(proxy, req, configgen.Cache)instances : proxy.ServiceInstancescacheStats : cacheStats{}switch proxy.Type {// 生成sidecarProxy clustercase model.SidecarProxy:// Setup outbound clustersoutboundPatcher : clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_OUTBOUND}// 构建outbound clusterob, cs : configgen.buildOutboundClusters(cb, proxy, outboundPatcher, services)cacheStats cacheStats.merge(cs)resources append(resources, ob...)// Add a blackhole and passthrough cluster for catching traffic to unresolved routes// 添加blackhole和passthrough cluster为默认的路由转发流量clusters outboundPatcher.conditionallyAppend(clusters, nil, cb.buildBlackHoleCluster(), cb.buildDefaultPassthroughCluster())clusters append(clusters, outboundPatcher.insertedClusters()...)// Setup inbound clustersinboundPatcher : clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_INBOUND}// 构建inbound clusterclusters append(clusters, configgen.buildInboundClusters(cb, proxy, instances, inboundPatcher)...)if proxy.EnableHBONE() {clusters append(clusters, configgen.buildInboundHBONEClusters())}// Pass through clusters for inbound traffic. These cluster bind loopback-ish src address to access node local service.// 添加passthrough cluster为默认的路由转发流量clusters inboundPatcher.conditionallyAppend(clusters, nil, cb.buildInboundPassthroughClusters()...)clusters append(clusters, inboundPatcher.insertedClusters()...)case model.Waypoint:svcs : findWaypointServices(proxy, req.Push)// Waypoint proxies do not need outbound clusters in most cases, unless we have a route pointing to somethingoutboundPatcher : clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_OUTBOUND}ob, cs : configgen.buildOutboundClusters(cb, proxy, outboundPatcher, filterWaypointOutboundServices(req.Push.ServicesAttachedToMesh(), svcs, services))cacheStats cacheStats.merge(cs)resources append(resources, ob...)// Setup inbound clustersinboundPatcher : clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_INBOUND}clusters append(clusters, configgen.buildWaypointInboundClusters(cb, proxy, req.Push, svcs)...)clusters append(clusters, inboundPatcher.insertedClusters()...)default: // Gatewayspatcher : clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_GATEWAY}ob, cs : configgen.buildOutboundClusters(cb, proxy, patcher, services)cacheStats cacheStats.merge(cs)resources append(resources, ob...)// Gateways do not require the default passthrough cluster as they do not have original dst listeners.clusters patcher.conditionallyAppend(clusters, nil, cb.buildBlackHoleCluster())if proxy.Type model.Router proxy.MergedGateway ! nil proxy.MergedGateway.ContainsAutoPassthroughGateways {clusters append(clusters, configgen.buildOutboundSniDnatClusters(proxy, req, patcher)...)}clusters append(clusters, patcher.insertedClusters()...)}// OutboundTunnel cluster is needed for sidecar and gateway.if proxy.EnableHBONE() {clusters append(clusters, cb.buildConnectOriginate(proxy, req.Push, nil))}// if credential socket exists, create a cluster for itif proxy.Metadata ! nil proxy.Metadata.Raw[security.CredentialMetaDataName] true {clusters append(clusters, cb.buildExternalSDSCluster(security.CredentialNameSocketPath))}for _, c : range clusters {resources append(resources, discovery.Resource{Name: c.Name, Resource: protoconv.MessageToAny(c)})}resources cb.normalizeClusters(resources)if cacheStats.empty() {return resources, model.DefaultXdsLogDetails}return resources, model.XdsLogDetails{AdditionalInfo: fmt.Sprintf(cached:%v/%v, cacheStats.hits, cacheStats.hitscacheStats.miss)} }7、响应Envoy主动发起的xDS请求 StreamAggregatedResources()方法中开启receive协程接收Envoy的xDS请求代码如下 // pilot/pkg/xds/ads.go func (s *DiscoveryServer) receive(con *Connection, identities []string) {defer func() {close(con.errorChan)close(con.reqChan)// Close the initialized channel, if its not already closed, to prevent blocking the stream.select {case -con.initialized:default:close(con.initialized)}}()firstRequest : truefor {// 接收discoveryRequestreq, err : con.stream.Recv()if err ! nil {if istiogrpc.IsExpectedGRPCError(err) {log.Infof(ADS: %q %s terminated, con.peerAddr, con.conID)return}con.errorChan - errlog.Errorf(ADS: %q %s terminated with error: %v, con.peerAddr, con.conID, err)totalXDSInternalErrors.Increment()return}// This should be only set for the first request. The node id may not be set - for example malicious clients.if firstRequest {// probe happens before envoy sends first xDS requestif req.TypeUrl v3.HealthInfoType {log.Warnf(ADS: %q %s send health check probe before normal xDS request, con.peerAddr, con.conID)continue}firstRequest falseif req.Node nil || req.Node.Id {con.errorChan - status.New(codes.InvalidArgument, missing node information).Err()return}if err : s.initConnection(req.Node, con, identities); err ! nil {con.errorChan - errreturn}defer s.closeConnection(con)log.Infof(ADS: new connection for node:%s, con.conID)}select {// 将discoveryRequest添加到reqChancase con.reqChan - req:case -con.stream.Context().Done():log.Infof(ADS: %q %s terminated with stream closed, con.peerAddr, con.conID)return}} }receive()方法中接收Envoy的DiscoveryRequest然后将DiscoveryRequest添加到reqChan中 StreamAggregatedResources()方法中从reqChan中获取DiscoveryRequest然后调用processRequest()方法 // pilot/pkg/xds/ads.go func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error {stype : v3.GetShortType(req.TypeUrl)log.Debugf(ADS:%s: REQ %s resources:%d nonce:%s version:%s , stype,con.conID, len(req.ResourceNames), req.ResponseNonce, req.VersionInfo)if req.TypeUrl v3.HealthInfoType {s.handleWorkloadHealthcheck(con.proxy, req)return nil}// For now, dont let xDS piggyback debug requests start watchers.if strings.HasPrefix(req.TypeUrl, v3.DebugType) {return s.pushXds(con,model.WatchedResource{TypeUrl: req.TypeUrl, ResourceNames: req.ResourceNames},model.PushRequest{Full: true, Push: con.proxy.LastPushContext})}if s.StatusReporter ! nil {s.StatusReporter.RegisterEvent(con.conID, req.TypeUrl, req.ResponseNonce)}shouldRespond, delta : s.shouldRespond(con, req)if !shouldRespond {return nil}request : model.PushRequest{Full: true,Push: con.proxy.LastPushContext,Reason: []model.TriggerReason{model.ProxyRequest},// The usage of LastPushTime (rather than time.Now()), is critical here for correctness; This time// is used by the XDS cache to determine if a entry is stale. If we use Now() with an old push context,// we may end up overriding active cache entries with stale ones.Start: con.proxy.LastPushTime,Delta: delta,}// SidecarScope for the proxy may not have been updated based on this pushContext.// It can happen when processRequest comes after push context has been updated(s.initPushContext),// but proxys SidecarScope has been updated(s.computeProxyState - SetSidecarScope) due to optimizations that skip sidecar scope// computation.if con.proxy.SidecarScope ! nil con.proxy.SidecarScope.Version ! request.Push.PushVersion {s.computeProxyState(con.proxy, request)}// 推送xds配置return s.pushXds(con, con.Watched(req.TypeUrl), request) }processRequest()方法中根据DiscoveryRequest信息推送xDS配置 8、小结 配置变化后向Envoy推送xDS时序 响应Envoy主动发起的xDS时序 参考 《Istio权威指南 下》 4.深入Istio源码Pilot的Discovery Server如何执行xDS异步分发 Istio Pilot代码深度解析
http://www.sadfv.cn/news/64608/

相关文章:

  • 百度商桥怎样绑定网站群英云服务器
  • 上海哪家做公司网站网站建设背景是什么
  • 广州越秀建网站的公司怎么使用dw做一个网站
  • 如何做网站优化并快速提高权重做网站的图片要求大小
  • 装修网站模板下载网站目录结构图
  • 国外网站国内备案最新行业动态
  • 湛江做网站seo的iis网站开发教程
  • 网站平台免费中国企业网聚焦中原
  • 支付网站开发怎么做账沈阳网站制作机构
  • 哪些大学网站做的比较好织梦 电影网站 模板
  • 网站导航栏代码网贷之家网站建设
  • 任县企业做网站建设旅游网站数据库设计
  • 网站开发 外文文献网站建设与维护 唐清安
  • 网站名字重复如何在已建设好的网站做修改
  • 电子商务网站规划 分析 设计阿里云主机安装wordpress
  • 网站是否备案怎么查询贵阳官网seo诊断
  • 深圳网站建设选哪家关于国家对网站建设
  • 糟糕的网站设计济南有做五合一网站公司
  • 汕头教育学会网站建设涿州网站建设推广
  • 山西 网站制作百度做网站要多久
  • 绵阳网站关键词佛山网站外包
  • app推广代理去哪里找安阳seo公司
  • 盐城网站建设要多少钱岳阳商城网站建设
  • 湖南天辰建设责任公司网站音乐介绍网站怎么做
  • 网站建设和seo的工作好不好网站的建设有什么好处
  • 网站备案公告久久建筑网资料全吗
  • 诚信网站认证必需做吗北京建筑设计公司排行榜
  • 免费网站建设推广服务wordpress read more
  • 网站评价公司网址怎么注册
  • 亳州蒙城网站建设免费小程序制作网站