团队决定在新项目中全面拥抱 Go 和 gRPC,这本身是件好事。但棘手的问题在于,新服务并非凭空存在,它需要与一套稳定运行了数年、由 Puppet 精细管理的核心数据处理集群进行通信。这套集群从未考虑过云原生,没有服务注册,没有 Consul 或 etcd。每个节点的部署、配置变更都严格遵循 Puppet 的 manifests 和 Hiera data。唯一的“服务注册中心”,就是 PuppetDB。
最初的方案简单粗暴:在 Go 服务的配置文件里硬编码数据处理集群的地址列表。这在开发环境还能应付,但一到预生产环境就立刻暴露了问题。集群节点会因维护而下线,也会根据负载进行扩缩容。每一次变更,我们都需要手动更新配置文件,重新部署 Go 服务,这完全是运维的噩梦。
我们不能改造数据处理集群,那是公司核心资产,牵一发而动全身。那么,唯一的出路就是让 Go 服务“学会”如何从 PuppetDB 这个唯一的真相来源中动态地获取服务地址。这意味着,我们需要在 gRPC 的世界里,实现一个能与 PuppetDB 对话的自定义服务发现机制。
grpc-go 库提供了一个强大的扩展点:resolver 接口。它允许我们替换掉默认的 DNS 解析逻辑,接入任何自定义的服务发现系统。我们的目标就是实现一个 puppetdb scheme 的 resolver,让客户端可以像这样发起调用,而底层所有的复杂性都被完全封装:
// 理想中的调用方式
conn, err := grpc.Dial(
"puppetdb:///data-processor.service/production",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
)
这里的 puppetdb:///data-processor.service/production 就是我们要实现魔法的地方。data-processor.service 是我们在 Puppet 中定义的一个逻辑服务标识(可能是一个 tag 或者一个 custom fact),而 production 则是对应的 Puppet environment。
剖析 gRPC Resolver 接口
要实现自定义 Resolver,首先必须理解 grpc-go/resolver 包的两个核心接口:Builder 和 Resolver。
resolver.Builder: 它的角色是一个工厂。gRPCDial函数会根据传入地址的 scheme(例如dns、unix或我们自定义的puppetdb)查找对应的Builder。Builder的职责是解析 target 地址,并创建一个Resolver实例。// resolver.Builder 接口定义 type Builder interface { Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) Scheme() string }resolver.Resolver: 这是真正执行服务发现逻辑的地方。它会连接到服务发现系统(在我们的场景下是 PuppetDB),获取地址列表,然后通过ClientConn.UpdateState()方法将地址更新给 gRPC 客户端连接。// resolver.Resolver 接口定义 type Resolver interface { ResolveNow(ResolveNowOptions) Close() }
ResolveNow 方法由 gRPC 内部触发,用于请求解析器立即进行一次地址解析。Close 则用于在连接关闭时清理资源。
整个流程的调用时序如下:
sequenceDiagram
participant Client
participant gRPC Dial
participant Resolver Builder
participant Resolver
participant PuppetDB
Client->>gRPC Dial: grpc.Dial("puppetdb:///...")
gRPC Dial->>Resolver Builder: Find builder for "puppetdb" scheme
gRPC Dial->>Resolver Builder: Build(target, cc, opts)
Resolver Builder-->>gRPC Dial: return resolver instance
gRPC Dial->>Resolver: (in background) starts resolving
activate Resolver
Resolver->>PuppetDB: Query for service addresses
PuppetDB-->>Resolver: Return JSON with node info
Resolver->>gRPC Dial: cc.UpdateState({Addresses: [...]})
deactivate Resolver
gRPC Dial->>Client: Return connection (or block until first resolve)
loop On-demand or Re-resolution
gRPC Dial->>Resolver: ResolveNow()
activate Resolver
Resolver->>PuppetDB: Query for service addresses
PuppetDB-->>Resolver: Return JSON with node info
Resolver->>gRPC Dial: cc.UpdateState({Addresses: [...]})
deactivate Resolver
end
实现 PuppetDB API 客户端
在构建 resolver 之前,我们需要一个能与 PuppetDB v4 API 通信的 Go 客户端。PuppetDB 提供了强大的 PQL (Puppet Query Language) 来查询节点信息。我们的目标是查询出所有应用了特定 Tag(例如 data_processor_service)并且处于 production 环境的节点的 certname 和 ipaddress fact。
一个典型的 PQL 查询会是这样:
{
"query": "nodes[certname, facts.ipaddress] { resources { type = 'Class' and title = 'Profile::Data_processor' } and facts.environment = 'production' }"
}
在真实项目中,我们会使用更灵活的 tag 来代替写死的类名。这里我们先构建一个能执行这类查询的客户端。
puppetdb/client.go
package puppetdb
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"
)
// NodeInfo holds the essential information for a discovered node.
type NodeInfo struct {
Certname string `json:"certname"`
IPAddress string `json:"ipaddress"`
}
// Client is a client for the PuppetDB v4 API.
type Client struct {
baseURL *url.URL
httpClient *http.Client
}
// NewClient creates a new PuppetDB client.
// It requires the base URL of the PuppetDB instance, e.g., "http://puppetdb.local:8080".
func NewClient(rawBaseURL string, timeout time.Duration) (*Client, error) {
baseURL, err := url.Parse(rawBaseURL)
if err != nil {
return nil, fmt.Errorf("failed to parse puppetdb base url: %w", err)
}
return &Client{
baseURL: baseURL,
httpClient: &http.Client{
Timeout: timeout,
},
}, nil
}
// PQLQuery represents a Puppet Query Language query.
type PQLQuery struct {
Query string `json:"query"`
}
// QueryNodesByTag queries PuppetDB for nodes that have a specific tag in a given environment.
// The serviceTag corresponds to a Puppet tag resource.
func (c *Client) QueryNodesByTag(ctx context.Context, serviceTag, environment string) ([]NodeInfo, error) {
// A more robust PQL query that finds nodes associated with a specific tag.
// This assumes you're using tags to identify services.
pql := fmt.Sprintf(
`nodes[certname, facts.ipaddress] { tag = "%s" and facts.environment = "%s" }`,
serviceTag,
environment,
)
query := PQLQuery{Query: pql}
queryBytes, err := json.Marshal(query)
if err != nil {
return nil, fmt.Errorf("failed to marshal PQL query: %w", err)
}
apiURL := c.baseURL.JoinPath("/pdb/query/v4")
req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiURL.String(), bytes.NewReader(queryBytes))
if err != nil {
return nil, fmt.Errorf("failed to create http request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("http request to puppetdb failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("puppetdb returned non-200 status: %d", resp.StatusCode)
}
var results []struct {
Certname string `json:"certname"`
Facts struct {
IPAddress string `json:"ipaddress"`
} `json:"facts"`
}
if err := json.NewDecoder(resp.Body).Decode(&results); err != nil {
return nil, fmt.Errorf("failed to decode puppetdb response: %w", err)
}
// This transformation is tricky. The PQL result is nested.
// We must flatten it for easier use.
nodes := make([]NodeInfo, 0, len(results))
for _, res := range results {
// A common mistake is to assume the fact is always present. Production code must handle this.
if res.Facts.IPAddress == "" {
// Log this event. A node matching the service tag is missing a critical fact.
continue
}
nodes = append(nodes, NodeInfo{
Certname: res.Certname,
IPAddress: res.Facts.IPAddress,
})
}
return nodes, nil
}
这个客户端是基础。在生产环境中,它还需要支持 mTLS 认证(PuppetDB 通常需要客户端证书),但这超出了本文的范围。这里的核心是封装了 PQL 查询的逻辑。
实现 Resolver Builder
Builder 的实现相对直接。它负责解析 target URL,并从中提取所需信息(服务标签、环境、PuppetDB 地址等),然后用这些信息初始化我们的 Resolver。
resolver/builder.go
package resolver
import (
"fmt"
"strings"
"sync"
"time"
"google.golang.org/grpc/resolver"
"your_project/internal/puppetdb" // Import the client we just created
)
const (
Scheme = "puppetdb"
defaultPuppetDBURL = "http://localhost:8080" // Should be configurable
defaultQueryTimeout = 5 * time.Second
)
// puppetDBBuilder implements the resolver.Builder interface.
type puppetDBBuilder struct {
puppetDBClient *puppetdb.Client
// In a real application, the client should be shared.
// A simple sync.Once ensures it's initialized only once.
initOnce sync.Once
initErr error
}
// NewBuilder creates and registers a new PuppetDB resolver builder.
func NewBuilder() *puppetDBBuilder {
return &puppetDBBuilder{}
}
func (b *puppetDBBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// Lazy initialization of the shared PuppetDB client.
b.initOnce.Do(func() {
// In a production system, this URL should come from env vars or a config file,
// not hardcoded.
b.puppetDBClient, b.initErr = puppetdb.NewClient(defaultPuppetDBURL, defaultQueryTimeout)
})
if b.initErr != nil {
return nil, fmt.Errorf("failed to initialize puppetdb client: %w", b.initErr)
}
// The target.URL.Path will be something like "/data-processor.service/production"
// We need to parse this to get the service tag and environment.
pathParts := strings.Split(strings.Trim(target.URL.Path, "/"), "/")
if len(pathParts) != 2 {
return nil, fmt.Errorf("invalid puppetdb target path: expected format '/<service_tag>/<environment>', got '%s'", target.URL.Path)
}
serviceTag := pathParts[0]
environment := pathParts[1]
// The port for the discovered services. This is a critical piece of information.
// A robust solution might get this from PuppetDB facts as well.
// For now, we'll extract it from the target Authority/Endpoint.
// e.g., puppetdb://authority:1234/service/env -> port is 1234
port := target.URL.Port()
if port == "" {
return nil, fmt.Errorf("port must be specified in the target authority, e.g., puppetdb://ignored:8000/...")
}
r := &puppetDBResolver{
client: b.puppetDBClient,
cc: cc,
serviceTag: serviceTag,
environment: environment,
port: port,
disablePolling: false, // For testing, we might want to disable periodic polling
ctx: context.Background(),
cancel: nil,
}
// The context needs a cancel function to properly close the background goroutine.
r.ctx, r.cancel = context.WithCancel(context.Background())
// The first resolution must be done synchronously to ensure the client
// doesn't try to connect before any addresses are available.
r.ResolveNow(resolver.ResolveNowOptions{})
// Start a background polling goroutine.
// A common pitfall is to forget this, making the discovery static.
go r.watch()
return r, nil
}
func (b *puppetDBBuilder) Scheme() string {
return Scheme
}
// init function to register the builder with gRPC.
func init() {
resolver.Register(NewBuilder())
}
实现核心 Resolver 逻辑
puppetDBResolver 是所有工作的核心。它需要处理后台轮询、错误、缓存以及与 gRPC ClientConn 的交互。一个简陋的实现只会在 ResolveNow 中查询一次,但一个生产级的 resolver 必须是动态的、有弹性的。
resolver/resolver.go
package resolver
import (
"context"
"fmt"
"log"
"sync"
"time"
"google.golang.org/grpc/resolver"
"your_project/internal/puppetdb"
)
const (
pollingInterval = 30 * time.Second
)
// puppetDBResolver implements the resolver.Resolver interface.
type puppetDBResolver struct {
client *puppetdb.Client
cc resolver.ClientConn
serviceTag string
environment string
port string
disablePolling bool
// For managing the background polling goroutine
ctx context.Context
cancel context.CancelFunc
// Caching last known good state
lastKnownState resolver.State
stateMutex sync.RWMutex
}
// ResolveNow is called by gRPC to trigger a resolution.
func (r *puppetDBResolver) ResolveNow(_ resolver.ResolveNowOptions) {
nodes, err := r.client.QueryNodesByTag(r.ctx, r.serviceTag, r.environment)
if err != nil {
// The key here is not to fail the connection immediately.
// We report the error, but gRPC will continue to use the last known good addresses.
// This makes the system resilient to transient PuppetDB failures.
log.Printf("ERROR: PuppetDB resolver failed to query nodes for %s: %v", r.serviceTag, err)
r.cc.ReportError(err)
return
}
newAddrs := make([]resolver.Address, 0, len(nodes))
for _, node := a_s_range nodes {
addr := fmt.Sprintf("%s:%s", node.IPAddress, r.port)
newAddrs = append(newAddrs, resolver.Address{Addr: addr})
}
if len(newAddrs) == 0 {
// If no nodes are found, this is a valid state but potentially problematic.
// We should log this clearly.
log.Printf("WARN: PuppetDB resolver found no active nodes for service tag %s in environment %s", r.serviceTag, r.environment)
}
newState := resolver.State{Addresses: newAddrs}
// Update the gRPC client connection with the new state.
if err := r.cc.UpdateState(newState); err != nil {
log.Printf("ERROR: PuppetDB resolver failed to update gRPC client connection state: %v", err)
}
// Cache the last known good state.
r.stateMutex.Lock()
r.lastKnownState = newState
r.stateMutex.Unlock()
}
// watch is the background polling loop.
func (r *puppetDBResolver) watch() {
if r.disablePolling {
return
}
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for {
select {
case <-r.ctx.Done():
// Context was cancelled, so exit the goroutine.
return
case <-ticker.C:
r.ResolveNow(resolver.ResolveNowOptions{})
}
}
}
// Close is called when the gRPC connection is closed.
func (r *puppetDBResolver) Close() {
// This is crucial to prevent goroutine leaks.
r.cancel()
}
这段代码有几个关键的设计决策:
- 后台轮询:
watch()方法使用time.Ticker来周期性地调用ResolveNow,确保服务列表能保持最新。 - 弹性错误处理: 当
QueryNodesByTag失败时,我们没有清空地址列表。我们只是通过r.cc.ReportError报告错误。gRPC 会继续使用上一次成功的地址列表。这极大地提高了系统的可用性,使其能够容忍 PuppetDB 的短暂中断。 - 上下文管理: 使用
context.WithCancel来创建一个可取消的上下文,并在Close方法中调用cancel(),这是优雅地停止后台 goroutine 的标准做法,避免了资源泄漏。 - 端口处理: 我们从
target的Authority部分提取服务端口。这是一个简化,但很实用。例如,grpc.Dial("puppetdb://ignored:9090/...")将使解析器为所有发现的IP都使用9090端口。
集成与使用
现在,所有组件都已就绪。在我们的 Go 客户端应用中,我们只需要匿名导入 resolver 包来触发它的 init() 函数,从而注册我们的 puppetDBBuilder。
main.go
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
// This blank import is critical. It executes the init() function in our resolver package,
// which registers the "puppetdb" scheme with gRPC.
_ "your_project/internal/resolver"
// Assume this is our protobuf generated code
pb "your_project/api/v1"
)
func main() {
// The target URI now uses our custom scheme.
// Format: puppetdb://<ignored_authority>:<port>/<service_tag>/<environment>
// The authority is ignored but the port is used for all discovered nodes.
target := "puppetdb://puppet-infra:8000/data-processor.service/production"
log.Printf("Dialing gRPC service at: %s", target)
// We must use a load balancing policy like "round_robin" for service discovery to work effectively.
// Otherwise, gRPC might stick to the first address in the list.
conn, err := grpc.Dial(
target,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
grpc.WithBlock(), // Block until the connection is established (and first resolve is done)
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
client := pb.NewDataProcessorClient(conn)
// Now we can make calls as usual. gRPC will handle load balancing
// across the dynamically discovered nodes from PuppetDB.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := client.Process(ctx, &pb.ProcessRequest{Data: "some payload"})
if err != nil {
log.Fatalf("could not process data: %v", err)
}
log.Printf("Processing result: %s", resp.GetResultId())
}
局限性与未来迭代路径
这个方案有效地解决了我们的燃眉之急,它将新旧世界无缝地连接起来,但它并非没有局限性。
首先,这是一个基于轮询的模型。从 Puppet agent 更新 facts、PuppetDB 接收数据,到我们的 resolver 下一次轮询命中,中间存在分钟级的延迟。对于服务地址变更不频繁的后台任务集群,这完全可以接受。但对于需要快速响应拓扑变化的前端服务,这个延迟可能是个问题。
其次,它让所有 gRPC 客户端都直接依赖于 PuppetDB。虽然我们做了容错处理,但 PuppetDB 本身的可用性成为了整个服务发现链路的关键节点。如果 PuppetDB 长时间宕机,新启动的服务实例将无法发现任何后端,导致启动失败。
一个更演进的架构可能是引入一个中间层。例如,构建一个独立的服务,它负责 watch PuppetDB 的变化(或许可以通过订阅 PuppetDB 的 webhook 或消息队列),然后将最新的服务地址列表推送到一个高可用的 KV 存储中,如 Consul 或 etcd。我们的 gRPC resolver 再去订阅这个 KV 存储。这样,gRPC 客户端就与 PuppetDB 解耦了,并且可以利用 Consul/etcd 提供的 watch 机制实现近乎实时的更新,而不是依赖于轮询。但这无疑增加了系统的复杂性,需要权衡收益与成本。