k8s的定制化开发
k8s的定制化开发主要有三种:webhook开发、operator开发、核心组件的定制化开发
webhook开发
webhook 指的是在apiserver将请求数据写入到etcd的链路中,拦截请求做定制化处理,本质上也是 api-server 的一个 webhook 调用。主要有mutating webhook
和admission Webhook
两种,一般是mutating webhook
在前,admission Webhook
在后。
1 2 3
| apiserver --------------------------|--------------------------|-------------------------------->> etcd | | mutating webhook关卡 admission Webhook关卡
|
mutate
在 API Server 接收到请求、执行授权检查后触发,拦截请求并做修改,之后扔回原链路。
一般有以下应用场景
- 自动注入sidecar容器
- 自动配置资源限制
- 注入配置或者标签等。
实战案例:为所有新创建的pod打上标签:environment: production
基于flask框架开发一个web程序webhook.py
,对外提供api接口
需要先安装python3解释器,为解释器环境安装flask代码包pip3 install flask
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| from flask import Flask, request, jsonify import json import ssl import base64
app = Flask(__name__) def create_patch(metadata): """ 创建 JSON Patch 以添加 'mutate' 注释。 如果 metadata.annotations 不存在,则首先创建该路径。 """ if 'labels' in metadata: dic = metadata['labels'] else: dic = {}
patch = [ {'op': 'add', 'path': '/metadata/labels', 'value': dic}, {'op': 'add', 'path': '/metadata/labels/environment', 'value': 'production'} ]
patch_json = json.dumps(patch) patch_base64 = base64.b64encode(patch_json.encode('utf-8')).decode('utf-8') return patch_base64
@app.route('/mutate', methods=['POST']) def mutate(): """ 处理 Mutating Webhook 的请求,对 Pod 对象应用 JSON Patch。 """ admission_review = request.get_json()
if 'request' not in admission_review or 'object' not in admission_review['request']: return jsonify({ 'kind': 'AdmissionReview', 'apiVersion': 'admission.k8s.io/v1', 'response': { 'allowed': False, 'status': {'message': 'Invalid AdmissionReview format'} } })
req = admission_review['request'] print('--->',req) metata = req['object']['metadata'] patch_json = create_patch(metata)
admission_response = { 'kind': 'AdmissionReview', 'apiVersion': 'admission.k8s.io/v1', 'response': { 'uid': req['uid'], 'allowed': True, 'patchType': 'JSONPatch', 'patch': patch_json } }
print(admission_response) return jsonify(admission_response)
if __name__ == '__main__': context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.load_cert_chain('/certs/tls.crt', '/certs/tls.key')
app.run(host='0.0.0.0', port=443, ssl_context=context)
|
制作镜像(包含webhook.py的运行环境,依赖python3解释器、依赖flask框架)
Dockerfile文件如下:
1 2 3 4 5 6 7 8 9 10
| FROM python:3.9-slim
WORKDIR /app
COPY webhook.py .
RUN pip install Flask
CMD ["python", "webhook.py"]
|
配置 Webhook 的 Secret
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| openssl genrsa -out ca.key 2048
openssl req -x509 -new -nodes -key ca.key -subj "/CN=webhook-service.default.svc" -days 36500 -out ca.crt
cat > webhook-openssl.cnf << 'EOF' [req] default_bits = 2048 prompt = no default_md = sha256 req_extensions = req_ext distinguished_name = dn
[ dn ] C = CN ST = Shanghai L = Shanghai O = egonlin OU = egonlin CN = webhook-service.default.svc
[ req_ext ] subjectAltName = @alt_names
[alt_names] DNS.1 = webhook-service DNS.2 = webhook-service.default DNS.3 = webhook-service.default.svc DNS.4 = webhook-service.default.svc.cluster.local
[req_distinguished_name] CN = webhook-service.default.svc
[v3_req] keyUsage = critical, digitalSignature, keyEncipherment extendedKeyUsage = serverAuth subjectAltName = @alt_names
[ v3_ext ] authorityKeyIdentifier=keyid,issuer:always basicConstraints=CA:FALSE keyUsage=keyEncipherment,dataEncipherment extendedKeyUsage=serverAuth,clientAuth subjectAltName=@alt_names
EOF
openssl genrsa -out webhook.key 2048
openssl req -new -key webhook.key -out webhook.csr -config webhook-openssl.cnf
openssl x509 -req -in webhook.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out webhook.crt -days 36500 -extensions v3_ext -extfile webhook-openssl.cnf
kubectl create secret tls webhook-certs --cert=webhook.crt --key=webhook.key --namespace=default --dry-run=client -o yaml | kubectl apply -f -
|
创建deployment来部署webhook服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| apiVersion: apps/v1 kind: Deployment metadata: name: webhook-deployment namespace: default spec: replicas: 1 selector: matchLabels: app: webhook template: metadata: labels: app: webhook spec: containers: - name: webhook image: registry.cn-shanghai.aliyuncs.com/dengjinjun-mute-webhook:v1.0 volumeMounts: - name: webhook-certs mountPath: /certs readOnly: true volumes: - name: webhook-certs secret: secretName: webhook-certs --- apiVersion: v1 kind: Service metadata: name: webhook-service namespace: default spec: ports: - port: 443 targetPort: 443 selector: app: webhook
|
创建mutatingwebhookconfiguration 资源,声明把请求转给的目标webhook程序的api地址
基于脚本来生成yaml文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| #!/bin/bash
base64 -w 0 ca.crt > ca.crt.base64
ca_base64_file="ca.crt.base64" yaml_file="m-w-c.yaml"
ca_base64_content=$(cat "$ca_base64_file" | tr -d '\n')
cat <<EOF > "$yaml_file" apiVersion: admissionregistration.k8s.io/v1 kind: MutatingWebhookConfiguration metadata: name: example-mutating-webhook webhooks: - name: example.webhook.com clientConfig: service: name: webhook-service namespace: default path: "/mutate" # 替换为 cat ca.crt.base64的内容 caBundle: "$ca_base64_content" rules: - operations: ["CREATE"] apiGroups: [""] apiVersions: ["v1"] resources: ["pods"] admissionReviewVersions: ["v1"] sideEffects: None EOF
echo "YAML 文件已更新。"
|
部署完上述服务后,可以创建一个Pod进行测试,观察是否自动添加了environment: production
标签
1 2 3 4 5 6 7 8
| apiVersion: v1 kind: Pod metadata: name: test-pod spec: containers: - name: nginx image: nginx:1.18
|
validate
请求持久化到 etcd 之前触发,对请求的数据做校验,不符合规定的直接扔掉。一般是先对资源进行修改,然后再进行验证。
一般有以下应用场景:
- 强制标签和注解策略
- 必须用私有仓库的镜像
- 安全策略审计:例如不允许使用特权模式
实战案例:仅允许创建具有environment标签的Pod
其他步骤基本相同,此处仅给出webhook.py
的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| from flask import Flask, request, jsonify import ssl import logging
app = Flask(__name__) logging.basicConfig(level=logging.INFO)
@app.route('', methods=['POST']) def validate(): admission_review = request.get_json() if 'request' not in admission_review or 'object' not in admission_review['request']: return jsonify({ 'kind': 'AdmissionReview', 'apiVersion': 'admission.k8s.io/v1', 'response': { 'allowed': False, 'status': {'message': 'Invalid AdmissionReview format'} } })
req = admission_review['request'] if req['kind']['kind'] == 'Pod': pod = req['object'] labels = pod.get('metadata', {}).get('labels', {})
if 'environment' not in labels: return jsonify({ 'kind': 'AdmissionReview', 'apiVersion': 'admission.k8s.io/v1', 'response': { 'uid': req['uid'], 'allowed': False, 'status': { 'metadata': {}, 'code': 400, 'message': 'Pod must have an "environment" label' } } })
return jsonify({ 'kind': 'AdmissionReview', 'apiVersion': 'admission.k8s.io/v1', 'response': { 'uid': req['uid'], 'allowed': True, 'status': { 'metadata': {}, 'code': 200 } } })
return jsonify({ 'kind': 'AdmissionReview', 'apiVersion': 'admission.k8s.io/v1', 'response': { 'allowed': True, 'status': { 'metadata': {}, 'code': 200 } } }) if __name__ == '__main__': context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.load_cert_chain('/certs/tls.crt', '/certs/tls.key') app.run(host='0.0.0.0', port=443, ssl_context=context)
|
webhook流程总结
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| 1. 用户提交请求:kubectl apply -f test.yaml | | 2. API Server 接收请求:API Server 解析并检查请求是否匹配 Mutating Webhook 的规则 | | 3. 触发 Mutating Webhook:如果请求匹配 Webhook 规则,API Server 中断正常流程,准备将请求发送给 Webhook 服务 | | 4. 转换请求为 AdmissionReview:API Server 将原始请求转换为 AdmissionReview 格式 | | 5. 通过 HTTPS 发送请求:API Server 通过 HTTPS 将请求转发到 clientConfig 中指定的 URL | | 6. Webhook Service 接收请求:Kubernetes Service 监听端口(如 443),并将请求路由到 Webhook Pod | | 7. Webhook Pod 处理请求:Webhook Pod 内的应用(例如 Flask 服务器)接收到 AdmissionReview 请求,执行相应的逻辑 | | 8. Webhook 返回 AdmissionResponse:Webhook 处理完成后,生成 AdmissionResponse,并通过 HTTPS 返回给 API Server | | 9. API Server 处理 AdmissionResponse:API Server 接收到 Webhook 的响应,根据响应内容决定是否允许操作或进行其他变更
|
operator开发
CRD和CR
当k8s中内置的资源类型无法满足我的业务需求,那就需要定制自己的新的资源类型,这时需要用到CRD机制。
CRD全称Custom Resource Define
,是k8s中用来自定义新的资源类型的机制,新建CRD,即声明一个关于CRD的yaml清单,相当于在etcd中定义了一张表,就可以得到一个新的resource type。
CR,全称Custom Resource
,通过声明式yaml文件得到一种自定义的资源,kind指定为上面创建的CRD,相当于在表中插入了记录。
自定义控制器
虽然我们可以自定义资源类型,并创建这种自定义资源,但是每种资源类型背后必须有控制器来负责维护其状态,这就要求我们为这种资源类型开发专门的控制器逻辑,也称为自定义控制器。
自定义控制器主要有以下几种开发框架:
Client-Go
最底层的库,提供了对Kubernetes API的原生支持,适合需要高度自定义控制器的开发者。
Controller-Runtime
在 Client-Go 之上提供了更高层次的抽象,简化了控制器的开发过程,适合大多数控制器开发需求。
Kubebuilder
最高层次的框架,基于 Controller-Runtime,提供了完整的控制器开发工具链,非常适合快速开发和部署。
Operator
Operator是一种开发模型,可以理解为 Operator
= 自定义资源(CRD)
+ 自定义的控制逻辑(controller)
。
Operator框架指的是实现开发模型的具体框架,基于该框架可以非常方便的自定义资源和控制逻辑。不用去记忆CRD每个字段的含义,直接按照框架规定的标准去定义一些字段即可,框架可以自动生成CRD.yaml文件。对于控制逻辑,只需要去固定函数中填充代码即可。
Operator的开发步骤:
按照框架依赖的环境,下载好框架(就是一个代码文件夹)。
去指定文件中按照规定填写好新资源类型应该有的字段,框架会帮你自动生成CRD.yaml。
去指定的文件中填写你的控制器业务逻辑。
1 2 3 4 5 6
| for { desired := getDesiredState() current := getCurrentState() makeChanges(desired, current) }
|
裸跑控制器不方便管理,通常会容器化之后以pod+deployment的形式跑在k8s中。
基于CRD来创建CR,创建成功之后,控制器就会从apiserver中watch到该数据或者变更,完成调谐,确保其始终处于预期状态。
核心组件的定制开发
对k8s中的核心组件,比如apiserver,etcd等,进行定制开发。一般是top大厂才有能力组建k8s核心组件的定制化开发团队。