基于 PuppetDB 后端为 gRPC-Go 构建动态服务发现解析器


团队决定在新项目中全面拥抱 Go 和 gRPC,这本身是件好事。但棘手的问题在于,新服务并非凭空存在,它需要与一套稳定运行了数年、由 Puppet 精细管理的核心数据处理集群进行通信。这套集群从未考虑过云原生,没有服务注册,没有 Consul 或 etcd。每个节点的部署、配置变更都严格遵循 Puppet 的 manifests 和 Hiera data。唯一的“服务注册中心”,就是 PuppetDB。

最初的方案简单粗暴:在 Go 服务的配置文件里硬编码数据处理集群的地址列表。这在开发环境还能应付,但一到预生产环境就立刻暴露了问题。集群节点会因维护而下线,也会根据负载进行扩缩容。每一次变更,我们都需要手动更新配置文件,重新部署 Go 服务,这完全是运维的噩梦。

我们不能改造数据处理集群,那是公司核心资产,牵一发而动全身。那么,唯一的出路就是让 Go 服务“学会”如何从 PuppetDB 这个唯一的真相来源中动态地获取服务地址。这意味着,我们需要在 gRPC 的世界里,实现一个能与 PuppetDB 对话的自定义服务发现机制。

grpc-go 库提供了一个强大的扩展点:resolver 接口。它允许我们替换掉默认的 DNS 解析逻辑,接入任何自定义的服务发现系统。我们的目标就是实现一个 puppetdb scheme 的 resolver,让客户端可以像这样发起调用,而底层所有的复杂性都被完全封装:

// 理想中的调用方式
conn, err := grpc.Dial(
	"puppetdb:///data-processor.service/production",
	grpc.WithTransportCredentials(insecure.NewCredentials()),
	grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
)

这里的 puppetdb:///data-processor.service/production 就是我们要实现魔法的地方。data-processor.service 是我们在 Puppet 中定义的一个逻辑服务标识(可能是一个 tag 或者一个 custom fact),而 production 则是对应的 Puppet environment。

剖析 gRPC Resolver 接口

要实现自定义 Resolver,首先必须理解 grpc-go/resolver 包的两个核心接口:BuilderResolver

  • resolver.Builder: 它的角色是一个工厂。gRPC Dial 函数会根据传入地址的 scheme(例如 dnsunix 或我们自定义的 puppetdb)查找对应的 BuilderBuilder 的职责是解析 target 地址,并创建一个 Resolver 实例。

    // resolver.Builder 接口定义
    type Builder interface {
        Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
        Scheme() string
    }
  • resolver.Resolver: 这是真正执行服务发现逻辑的地方。它会连接到服务发现系统(在我们的场景下是 PuppetDB),获取地址列表,然后通过 ClientConn.UpdateState() 方法将地址更新给 gRPC 客户端连接。

    // resolver.Resolver 接口定义
    type Resolver interface {
        ResolveNow(ResolveNowOptions)
        Close()
    }

ResolveNow 方法由 gRPC 内部触发,用于请求解析器立即进行一次地址解析。Close 则用于在连接关闭时清理资源。

整个流程的调用时序如下:

sequenceDiagram
    participant Client
    participant gRPC Dial
    participant Resolver Builder
    participant Resolver
    participant PuppetDB

    Client->>gRPC Dial: grpc.Dial("puppetdb:///...")
    gRPC Dial->>Resolver Builder: Find builder for "puppetdb" scheme
    gRPC Dial->>Resolver Builder: Build(target, cc, opts)
    Resolver Builder-->>gRPC Dial: return resolver instance
    
    gRPC Dial->>Resolver: (in background) starts resolving
    activate Resolver
    Resolver->>PuppetDB: Query for service addresses
    PuppetDB-->>Resolver: Return JSON with node info
    Resolver->>gRPC Dial: cc.UpdateState({Addresses: [...]})
    deactivate Resolver
    
    gRPC Dial->>Client: Return connection (or block until first resolve)
    
    loop On-demand or Re-resolution
        gRPC Dial->>Resolver: ResolveNow()
        activate Resolver
        Resolver->>PuppetDB: Query for service addresses
        PuppetDB-->>Resolver: Return JSON with node info
        Resolver->>gRPC Dial: cc.UpdateState({Addresses: [...]})
        deactivate Resolver
    end

实现 PuppetDB API 客户端

在构建 resolver 之前,我们需要一个能与 PuppetDB v4 API 通信的 Go 客户端。PuppetDB 提供了强大的 PQL (Puppet Query Language) 来查询节点信息。我们的目标是查询出所有应用了特定 Tag(例如 data_processor_service)并且处于 production 环境的节点的 certnameipaddress fact。

一个典型的 PQL 查询会是这样:

{
  "query": "nodes[certname, facts.ipaddress] { resources { type = 'Class' and title = 'Profile::Data_processor' } and facts.environment = 'production' }"
}

在真实项目中,我们会使用更灵活的 tag 来代替写死的类名。这里我们先构建一个能执行这类查询的客户端。

puppetdb/client.go

package puppetdb

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"net/url"
	"time"
)

// NodeInfo holds the essential information for a discovered node.
type NodeInfo struct {
	Certname  string `json:"certname"`
	IPAddress string `json:"ipaddress"`
}

// Client is a client for the PuppetDB v4 API.
type Client struct {
	baseURL    *url.URL
	httpClient *http.Client
}

// NewClient creates a new PuppetDB client.
// It requires the base URL of the PuppetDB instance, e.g., "http://puppetdb.local:8080".
func NewClient(rawBaseURL string, timeout time.Duration) (*Client, error) {
	baseURL, err := url.Parse(rawBaseURL)
	if err != nil {
		return nil, fmt.Errorf("failed to parse puppetdb base url: %w", err)
	}

	return &Client{
		baseURL: baseURL,
		httpClient: &http.Client{
			Timeout: timeout,
		},
	}, nil
}

// PQLQuery represents a Puppet Query Language query.
type PQLQuery struct {
	Query string `json:"query"`
}

// QueryNodesByTag queries PuppetDB for nodes that have a specific tag in a given environment.
// The serviceTag corresponds to a Puppet tag resource.
func (c *Client) QueryNodesByTag(ctx context.Context, serviceTag, environment string) ([]NodeInfo, error) {
	// A more robust PQL query that finds nodes associated with a specific tag.
	// This assumes you're using tags to identify services.
	pql := fmt.Sprintf(
		`nodes[certname, facts.ipaddress] { tag = "%s" and facts.environment = "%s" }`,
		serviceTag,
		environment,
	)

	query := PQLQuery{Query: pql}
	queryBytes, err := json.Marshal(query)
	if err != nil {
		return nil, fmt.Errorf("failed to marshal PQL query: %w", err)
	}

	apiURL := c.baseURL.JoinPath("/pdb/query/v4")
	req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiURL.String(), bytes.NewReader(queryBytes))
	if err != nil {
		return nil, fmt.Errorf("failed to create http request: %w", err)
	}
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Accept", "application/json")

	resp, err := c.httpClient.Do(req)
	if err != nil {
		return nil, fmt.Errorf("http request to puppetdb failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("puppetdb returned non-200 status: %d", resp.StatusCode)
	}

	var results []struct {
		Certname string `json:"certname"`
		Facts    struct {
			IPAddress string `json:"ipaddress"`
		} `json:"facts"`
	}

	if err := json.NewDecoder(resp.Body).Decode(&results); err != nil {
		return nil, fmt.Errorf("failed to decode puppetdb response: %w", err)
	}

	// This transformation is tricky. The PQL result is nested.
	// We must flatten it for easier use.
	nodes := make([]NodeInfo, 0, len(results))
	for _, res := range results {
		// A common mistake is to assume the fact is always present. Production code must handle this.
		if res.Facts.IPAddress == "" {
			// Log this event. A node matching the service tag is missing a critical fact.
			continue
		}
		nodes = append(nodes, NodeInfo{
			Certname:  res.Certname,
			IPAddress: res.Facts.IPAddress,
		})
	}

	return nodes, nil
}

这个客户端是基础。在生产环境中,它还需要支持 mTLS 认证(PuppetDB 通常需要客户端证书),但这超出了本文的范围。这里的核心是封装了 PQL 查询的逻辑。

实现 Resolver Builder

Builder 的实现相对直接。它负责解析 target URL,并从中提取所需信息(服务标签、环境、PuppetDB 地址等),然后用这些信息初始化我们的 Resolver

resolver/builder.go

package resolver

import (
	"fmt"
	"strings"
	"sync"
	"time"

	"google.golang.org/grpc/resolver"
	
	"your_project/internal/puppetdb" // Import the client we just created
)

const (
	Scheme = "puppetdb"
	defaultPuppetDBURL = "http://localhost:8080" // Should be configurable
	defaultQueryTimeout = 5 * time.Second
)

// puppetDBBuilder implements the resolver.Builder interface.
type puppetDBBuilder struct {
	puppetDBClient *puppetdb.Client
	// In a real application, the client should be shared.
	// A simple sync.Once ensures it's initialized only once.
	initOnce sync.Once
	initErr  error
}

// NewBuilder creates and registers a new PuppetDB resolver builder.
func NewBuilder() *puppetDBBuilder {
	return &puppetDBBuilder{}
}

func (b *puppetDBBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	// Lazy initialization of the shared PuppetDB client.
	b.initOnce.Do(func() {
		// In a production system, this URL should come from env vars or a config file,
		// not hardcoded.
		b.puppetDBClient, b.initErr = puppetdb.NewClient(defaultPuppetDBURL, defaultQueryTimeout)
	})
	if b.initErr != nil {
		return nil, fmt.Errorf("failed to initialize puppetdb client: %w", b.initErr)
	}

	// The target.URL.Path will be something like "/data-processor.service/production"
	// We need to parse this to get the service tag and environment.
	pathParts := strings.Split(strings.Trim(target.URL.Path, "/"), "/")
	if len(pathParts) != 2 {
		return nil, fmt.Errorf("invalid puppetdb target path: expected format '/<service_tag>/<environment>', got '%s'", target.URL.Path)
	}
	serviceTag := pathParts[0]
	environment := pathParts[1]
	
	// The port for the discovered services. This is a critical piece of information.
	// A robust solution might get this from PuppetDB facts as well.
	// For now, we'll extract it from the target Authority/Endpoint.
	// e.g., puppetdb://authority:1234/service/env -> port is 1234
	port := target.URL.Port()
	if port == "" {
		return nil, fmt.Errorf("port must be specified in the target authority, e.g., puppetdb://ignored:8000/...")
	}

	r := &puppetDBResolver{
		client:       b.puppetDBClient,
		cc:           cc,
		serviceTag:   serviceTag,
		environment:  environment,
		port:         port,
		disablePolling: false, // For testing, we might want to disable periodic polling
		ctx:          context.Background(),
		cancel:       nil,
	}

	// The context needs a cancel function to properly close the background goroutine.
	r.ctx, r.cancel = context.WithCancel(context.Background())
	
	// The first resolution must be done synchronously to ensure the client
	// doesn't try to connect before any addresses are available.
	r.ResolveNow(resolver.ResolveNowOptions{})

	// Start a background polling goroutine.
	// A common pitfall is to forget this, making the discovery static.
	go r.watch()

	return r, nil
}

func (b *puppetDBBuilder) Scheme() string {
	return Scheme
}

// init function to register the builder with gRPC.
func init() {
	resolver.Register(NewBuilder())
}

实现核心 Resolver 逻辑

puppetDBResolver 是所有工作的核心。它需要处理后台轮询、错误、缓存以及与 gRPC ClientConn 的交互。一个简陋的实现只会在 ResolveNow 中查询一次,但一个生产级的 resolver 必须是动态的、有弹性的。

resolver/resolver.go

package resolver

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"google.golang.org/grpc/resolver"
	
	"your_project/internal/puppetdb"
)

const (
	pollingInterval = 30 * time.Second
)

// puppetDBResolver implements the resolver.Resolver interface.
type puppetDBResolver struct {
	client       *puppetdb.Client
	cc           resolver.ClientConn
	serviceTag   string
	environment  string
	port         string
	disablePolling bool
	
	// For managing the background polling goroutine
	ctx    context.Context
	cancel context.CancelFunc
	
	// Caching last known good state
	lastKnownState resolver.State
	stateMutex   sync.RWMutex
}

// ResolveNow is called by gRPC to trigger a resolution.
func (r *puppetDBResolver) ResolveNow(_ resolver.ResolveNowOptions) {
	nodes, err := r.client.QueryNodesByTag(r.ctx, r.serviceTag, r.environment)
	if err != nil {
		// The key here is not to fail the connection immediately.
		// We report the error, but gRPC will continue to use the last known good addresses.
		// This makes the system resilient to transient PuppetDB failures.
		log.Printf("ERROR: PuppetDB resolver failed to query nodes for %s: %v", r.serviceTag, err)
		r.cc.ReportError(err)
		return
	}

	newAddrs := make([]resolver.Address, 0, len(nodes))
	for _, node := a_s_range nodes {
		addr := fmt.Sprintf("%s:%s", node.IPAddress, r.port)
		newAddrs = append(newAddrs, resolver.Address{Addr: addr})
	}

	if len(newAddrs) == 0 {
		// If no nodes are found, this is a valid state but potentially problematic.
		// We should log this clearly.
		log.Printf("WARN: PuppetDB resolver found no active nodes for service tag %s in environment %s", r.serviceTag, r.environment)
	}

	newState := resolver.State{Addresses: newAddrs}
	
	// Update the gRPC client connection with the new state.
	if err := r.cc.UpdateState(newState); err != nil {
		log.Printf("ERROR: PuppetDB resolver failed to update gRPC client connection state: %v", err)
	}

	// Cache the last known good state.
	r.stateMutex.Lock()
	r.lastKnownState = newState
	r.stateMutex.Unlock()
}

// watch is the background polling loop.
func (r *puppetDBResolver) watch() {
	if r.disablePolling {
		return
	}
	ticker := time.NewTicker(pollingInterval)
	defer ticker.Stop()

	for {
		select {
		case <-r.ctx.Done():
			// Context was cancelled, so exit the goroutine.
			return
		case <-ticker.C:
			r.ResolveNow(resolver.ResolveNowOptions{})
		}
	}
}

// Close is called when the gRPC connection is closed.
func (r *puppetDBResolver) Close() {
	// This is crucial to prevent goroutine leaks.
	r.cancel()
}

这段代码有几个关键的设计决策:

  1. 后台轮询: watch() 方法使用 time.Ticker 来周期性地调用 ResolveNow,确保服务列表能保持最新。
  2. 弹性错误处理: 当 QueryNodesByTag 失败时,我们没有清空地址列表。我们只是通过 r.cc.ReportError 报告错误。gRPC 会继续使用上一次成功的地址列表。这极大地提高了系统的可用性,使其能够容忍 PuppetDB 的短暂中断。
  3. 上下文管理: 使用 context.WithCancel 来创建一个可取消的上下文,并在 Close 方法中调用 cancel(),这是优雅地停止后台 goroutine 的标准做法,避免了资源泄漏。
  4. 端口处理: 我们从 targetAuthority 部分提取服务端口。这是一个简化,但很实用。例如,grpc.Dial("puppetdb://ignored:9090/...") 将使解析器为所有发现的IP都使用9090端口。

集成与使用

现在,所有组件都已就绪。在我们的 Go 客户端应用中,我们只需要匿名导入 resolver 包来触发它的 init() 函数,从而注册我们的 puppetDBBuilder

main.go

package main

import (
	"context"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"

	// This blank import is critical. It executes the init() function in our resolver package,
	// which registers the "puppetdb" scheme with gRPC.
	_ "your_project/internal/resolver"
	
	// Assume this is our protobuf generated code
	pb "your_project/api/v1"
)

func main() {
	// The target URI now uses our custom scheme.
	// Format: puppetdb://<ignored_authority>:<port>/<service_tag>/<environment>
	// The authority is ignored but the port is used for all discovered nodes.
	target := "puppetdb://puppet-infra:8000/data-processor.service/production"

	log.Printf("Dialing gRPC service at: %s", target)

	// We must use a load balancing policy like "round_robin" for service discovery to work effectively.
	// Otherwise, gRPC might stick to the first address in the list.
	conn, err := grpc.Dial(
		target,
		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
		grpc.WithBlock(), // Block until the connection is established (and first resolve is done)
	)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewDataProcessorClient(conn)

	// Now we can make calls as usual. gRPC will handle load balancing
	// across the dynamically discovered nodes from PuppetDB.
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	resp, err := client.Process(ctx, &pb.ProcessRequest{Data: "some payload"})
	if err != nil {
		log.Fatalf("could not process data: %v", err)
	}

	log.Printf("Processing result: %s", resp.GetResultId())
}

局限性与未来迭代路径

这个方案有效地解决了我们的燃眉之急,它将新旧世界无缝地连接起来,但它并非没有局限性。

首先,这是一个基于轮询的模型。从 Puppet agent 更新 facts、PuppetDB 接收数据,到我们的 resolver 下一次轮询命中,中间存在分钟级的延迟。对于服务地址变更不频繁的后台任务集群,这完全可以接受。但对于需要快速响应拓扑变化的前端服务,这个延迟可能是个问题。

其次,它让所有 gRPC 客户端都直接依赖于 PuppetDB。虽然我们做了容错处理,但 PuppetDB 本身的可用性成为了整个服务发现链路的关键节点。如果 PuppetDB 长时间宕机,新启动的服务实例将无法发现任何后端,导致启动失败。

一个更演进的架构可能是引入一个中间层。例如,构建一个独立的服务,它负责 watch PuppetDB 的变化(或许可以通过订阅 PuppetDB 的 webhook 或消息队列),然后将最新的服务地址列表推送到一个高可用的 KV 存储中,如 Consul 或 etcd。我们的 gRPC resolver 再去订阅这个 KV 存储。这样,gRPC 客户端就与 PuppetDB 解耦了,并且可以利用 Consul/etcd 提供的 watch 机制实现近乎实时的更新,而不是依赖于轮询。但这无疑增加了系统的复杂性,需要权衡收益与成本。


  目录