为开发团队提供一致、可重复的应用环境是一项非结构化的混乱工作。工单系统、手动点击云控制台、散落在各处的Terraform脚本,这些方式不仅效率低下,而且极易引入配置漂移,导致开发、测试与生产环境之间的差异最终演变成线上故障。我们需要一个平台,让开发者能通过一个稳定的API,以声明式的方式自助申请所需资源组合,而无需关心底层是AWS、GCP还是私有机房。
这里的核心挑战在于构建一个健壮的控制平面,它需要将一个简单的业务意图(“我需要一个带PostgreSQL数据库和Redis缓存的后端环境”)转化为一系列具体的、幂等的基础设施操作。我们将通过组合一系列看似无关的技术来构建这个控制平面的原型:Crossplane作为基础设施的统一声明式API层,一个Go服务作为核心控制器来处理业务逻辑与API转换,一个高性能的Rust服务用于实时状态监控与事件通知,以及一个Headless UI组件概念作为交互界面。
整个系统的架构设计如下:
graph TD
subgraph "开发者界面 (React + Headless UI)"
A[表单: 创建新环境] -->|POST /api/v1/environments| B(Go API Controller)
C[UI状态: Provisioning...] -->|WebSocket| D{Rust Status Watcher}
end
subgraph "Kubernetes 控制平面"
B -- 1. 创建 --> E(Custom Resource: ApplicationEnvironment)
E -- 2. Crossplane调谐 --> F[Composition: aws-standard-env]
F -- 3. 实例化 --> G((Managed Resource: AWS RDS))
F -- 3. 实例化 --> H((Managed Resource: AWS ElastiCache))
D -- 4. Watch --> E
end
subgraph "云厂商 (AWS)"
G -- API调用 --> I[RDS 实例]
H -- API调用 --> J[ElastiCache 集群]
end
subgraph "外部通知"
D -- 5. 推送状态更新 --> C
end
第一步:用 Crossplane 定义统一的基础设施抽象
一切始于抽象。我们不希望暴露任何特定于云厂商的资源细节,如RDSInstance或CloudSQL。我们需要一个更高层次的抽象,我们称之为ApplicationEnvironment。这通过Crossplane的CompositeResourceDefinition (XRD) 实现。
这个XRD定义了我们平台的“API契约”。它声明了一个ApplicationEnvironment对象应该包含哪些参数,例如数据库大小、缓存类型和所属的团队。
# xrds/environment.xrd.yaml
apiVersion: apiextensions.crossplane.io/v1
kind: CompositeResourceDefinition
metadata:
name: applicationenvironments.platform.acme.io
spec:
group: platform.acme.io
names:
kind: ApplicationEnvironment
plural: applicationenvironments
singular: applicationenvironment
claimNames:
kind: AppEnv
plural: appenvs
versions:
- name: v1alpha1
served: true
referenceable: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
parameters:
type: object
properties:
# 定义开发者需要关心的参数
team:
type: string
description: "Owner of this environment."
dbSize:
type: string
description: "Size of the database instance."
enum: ["small", "medium", "large"]
default: "small"
cacheNodes:
type: integer
description: "Number of cache nodes."
minimum: 1
maximum: 5
default: 1
required:
- team
- dbSize
- cacheNodes
required:
- parameters
status:
type: object
properties:
# 运行时状态,由底层资源填充
dbEndpoint:
type: string
cacheEndpoint:
type: string
phase:
type: string
有了定义,我们还需要一个Composition来告诉Crossplane如何将这个抽象的ApplicationEnvironment实例化为一组具体的云资源。这里以AWS为例。
# compositions/aws-environment.composition.yaml
apiVersion: apiextensions.crossplane.io/v1
kind: Composition
metadata:
name: aws.standard.environment
labels:
provider: aws
spec:
compositeTypeRef:
apiVersion: platform.acme.io/v1alpha1
kind: ApplicationEnvironment
# 使用patch从高层抽象映射参数到底层资源
patchSets:
- name: metadata
patches:
- fromFieldPath: "spec.parameters.team"
toFieldPath: "metadata.labels.team"
resources:
- name: rds-database
base:
apiVersion: database.aws.upbound.io/v1beta1
kind: RDSInstance
spec:
forProvider:
region: us-west-2
engine: postgres
engineVersion: "14.6"
publiclyAccessible: false
skipFinalSnapshot: true
instanceClass: db.t3.micro # 默认值
masterUsername: myadmin
# 密钥管理是关键,这里使用k8s secret来存储密码
passwordSecretRef:
namespace: crossplane-system
name: db-password
key: password
patches:
- fromFieldPath: "metadata.name"
toFieldPath: "metadata.annotations[crossplane.io/external-name]"
transforms:
- type: string
string:
fmt: "%s-db"
- fromFieldPath: "spec.parameters.dbSize"
toFieldPath: "spec.forProvider.instanceClass"
transforms:
- type: map
map:
small: db.t3.micro
medium: db.t3.medium
large: db.m5.large
# 将数据库连接地址写回 ApplicationEnvironment 的 status
- fromFieldPath: "status.atProvider.endpoint"
toFieldPath: "status.dbEndpoint"
policy:
toCompositeField: Merge
- name: redis-cache
base:
apiVersion: cache.aws.upbound.io/v1beta1
kind: ReplicationGroup
spec:
forProvider:
region: us-west-2
replicationGroupDescription: "Managed by Crossplane"
engine: redis
engineVersion: "6.2"
nodeType: cache.t3.small # 默认值
atRestEncryptionEnabled: true
transitEncryptionEnabled: true
patches:
- fromFieldPath: "metadata.name"
toFieldPath: "metadata.annotations[crossplane.io/external-name]"
transforms:
- type: string
string:
fmt: "%s-cache"
- fromFieldPath: "spec.parameters.cacheNodes"
toFieldPath: "spec.forProvider.numCacheClusters"
# 将缓存连接地址写回 ApplicationEnvironment 的 status
- fromFieldPath: "status.atProvider.primaryEndpointAddress"
toFieldPath: "status.cacheEndpoint"
policy:
toCompositeField: Merge
至此,我们的基础设施层已经准备就绪。任何人在集群中创建一个ApplicationEnvironment对象,Crossplane都会自动在AWS上创建对应的RDS和ElastiCache。
第二步:Go 控制器,业务逻辑与声明式API的桥梁
开发者不应该直接编写或提交YAML。他们需要的是一个简单的、符合直觉的RESTful API。这个API的实现就是我们的Go控制器。它的职责是接收一个简单的JSON请求,然后将其转换为符合我们XRD规范的ApplicationEnvironment Kubernetes对象。
在真实项目中,我们会使用像Gin或Echo这样的框架,但为了聚焦核心逻辑,我们使用标准的net/http。关键在于与Kubernetes API的交互,这里client-go是标准选择。
// main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
// API请求体结构
type CreateEnvironmentRequest struct {
Team string `json:"team"`
DBSize string `json:"dbSize"`
CacheNodes int `json:"cacheNodes"`
}
// 定义 ApplicationEnvironment 的GVR (GroupVersionResource)
var appEnvGVR = schema.GroupVersionResource{
Group: "platform.acme.io",
Version: "v1alpha1",
Resource: "applicationenvironments",
}
type APIHandler struct {
kubeClient dynamic.Interface
}
func NewAPIHandler() (*APIHandler, error) {
config, err := rest.InClusterConfig()
if err != nil {
// 如果不在集群内运行,回退到kubeconfig文件
log.Println("Not in cluster, trying kubeconfig")
kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, fmt.Errorf("failed to build kubeconfig: %w", err)
}
}
client, err := dynamic.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create dynamic client: %w", err)
}
return &APIHandler{kubeClient: client}, nil
}
// 核心处理逻辑
func (h *APIHandler) handleCreateEnvironment(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
return
}
var req CreateEnvironmentRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
log.Printf("ERROR: Failed to decode request body: %v", err)
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
// 参数校验
if req.Team == "" {
http.Error(w, "Team name is required", http.StatusBadRequest)
return
}
// ... 其他校验逻辑
// 生成一个唯一的资源名称
resourceName := fmt.Sprintf("%s-%s", strings.ToLower(req.Team), uuid.New().String()[:8])
log.Printf("INFO: Creating ApplicationEnvironment '%s' for team '%s'", resourceName, req.Team)
// 构建 Unstructured 对象,这是与自定义资源交互的标准方式
appEnv := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "platform.acme.io/v1alpha1",
"kind": "ApplicationEnvironment",
"metadata": map[string]interface{}{
"name": resourceName,
"namespace": "default", // 在真实场景中,可能会为每个团队设置一个namespace
},
"spec": map[string]interface{}{
"compositionSelector": map[string]interface{}{
"matchLabels": map[string]interface{}{
"provider": "aws", // 硬编码为aws,可以动态选择
},
},
"parameters": map[string]interface{}{
"team": req.Team,
"dbSize": req.DBSize,
"cacheNodes": int64(req.CacheNodes), // JSON数字默认是float64,需要转换
},
},
},
}
// 使用 dynamic client 创建资源
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
createdResource, err := h.kubeClient.Resource(appEnvGVR).Namespace("default").Create(ctx, appEnv, metav1.CreateOptions{})
if err != nil {
log.Printf("ERROR: Failed to create ApplicationEnvironment resource: %v", err)
http.Error(w, "Failed to provision environment", http.StatusInternalServerError)
return
}
log.Printf("INFO: Successfully created ApplicationEnvironment '%s'", createdResource.GetName())
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{
"message": "Environment provisioning started.",
"resourceName": createdResource.GetName(),
})
}
func main() {
handler, err := NewAPIHandler()
if err != nil {
log.Fatalf("FATAL: Failed to initialize API handler: %v", err)
}
http.HandleFunc("/api/v1/environments", handler.handleCreateEnvironment)
log.Println("INFO: Starting controller API server on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatalf("FATAL: Server failed: %v", err)
}
}
这个Go服务非常纯粹:它是一个无状态的转换层。它不知道AWS,不知道RDS,只知道ApplicationEnvironment这个我们自己定义的API。这使得它非常容易测试和维护。单元测试可以简单地验证对于给定的HTTP请求,是否生成了正确的unstructured.Unstructured对象。
第三步:Rust 状态观察者,提供高性能的实时反馈
当Go控制器创建了ApplicationEnvironment资源后,请求就结束了。但对开发者而言,工作才刚刚开始。资源的创建过程可能需要几分钟甚至更久。他们需要实时了解当前状态:是Provisioning,Ready,还是Failed?
轮询API是一种低效的方式。更好的方案是使用Kubernetes的watch机制。我们可以构建一个服务,持续监听ApplicationEnvironment资源的变化,并将这些变化实时推送给前端。考虑到这个服务需要长时间运行,保持大量并发连接(例如WebSockets),并且对资源消耗和稳定性要求很高,Rust是一个绝佳的选择。Rocket框架提供了优雅的Web服务能力,而kube-rs库则让我们能方便地与Kubernetes API交互。
```rust
// main.rs
#[macro_use]
extern crate rocket;
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::{
api::{Api, DynamicObject, ListParams},
runtime::{watcher, WatchStreamExt},
Client,
};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
// 定义我们关心的 ApplicationEnvironment 的 spec 和 status 结构
// 这有助于强类型地解析 DynamicObject
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct AppEnvParameters {
team: String,
#[serde(rename = “dbSize”)]
db_size: String,
#[serde(rename = “cacheNodes”)]
cache_nodes: i64,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct AppEnvSpec {
parameters: AppEnvParameters,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
pub struct AppEnvStatus {
#[serde(rename = “dbEndpoint”, default)]
db_endpoint: String,
#[serde(rename = “cacheEndpoint”, default)]
cache_endpoint: String,
#[serde(default)]
phase: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct ApplicationEnvironment {
pub metadata: ObjectMeta,
pub spec: AppEnvSpec,
#[serde(default)]
pub status: Option
}
// 核心的watch逻辑
async fn watch_environments() -> Result<(), Box
let client = Client::try_default().await?;
let gvr = kube::api::GroupVersionKind::gvk(
"platform.acme.io",
"v1alpha1",
"ApplicationEnvironment",
);
let api: Api<DynamicObject> = Api::all_with(client, &gvr);
let lp = ListParams::default();
println!("Starting to watch ApplicationEnvironments...");
let mut watcher = watcher(api, lp).applied_objects().boxed();
// 持续监听事件流
while let Some(res) = watcher.next().await {
match res {
Ok(obj) => {
// 这里我们只是打印日志,在生产环境中,
// 会将事件推送到一个消息队列或直接通过WebSocket广播给前端
let name = obj.metadata.name.as_deref().unwrap_or("unknown");
// 从DynamicObject中提取status
let status_map = obj.data.get("status").and_then(|s| s.as_object());
if let Some(status) = status_map {
// 使用BTreeMap来安全地访问嵌套字段
let status_btreemap: BTreeMap<String, serde_json::Value> = status.clone().into_iter().map(|(k,v)|(k.clone(), v.clone())).collect();
// Crossplane 会在 conditions 中报告资源的真实状态
if let Some(conditions) = status_btreemap.get("conditions").and_then(|c| c.as_array()) {
let ready = conditions.iter().any(|cond| {
cond.get("type").and_then(|t| t.as_str()) == Some("Ready") &&
cond.get("status").and_then(|s| s.as_str()) == Some("True")
});
if ready {
let db_endpoint = status_btreemap.get("dbEndpoint").and_then(|v| v.as_str()).unwrap_or("");
let cache_endpoint = status_btreemap.get("cacheEndpoint").and_then(|v| v.as_str()).unwrap_or("");
println!(
"EVENT: Environment '{}' is READY. DB: {}, Cache: {}",
name, db_endpoint, cache_endpoint
);
} else {
println!("EVENT: Environment '{}' is still provisioning...", name);
}
} else {
println!("EVENT: Environment '{}' status changed, but no conditions yet.", name);
}
} else {
println!("EVENT: Environment '{}' created, waiting for status...", name);
}
}
Err(e) => {
eprintln!("Watcher error: {}", e);
}
}
}
Ok(())
}
#[get(“/health”)]
fn health() -> &’static str {
“OK”
}
#[launch]
fn rocket() -> _ {
// 在后台启动Kubernetes watcher
tokio::spawn(async {
if let Err(e) = watch_environments().await {
eprintln!(“Watcher task failed: {:?}”, e);
}
});
rocket::build().mount("/", routes