Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubeclient
import (
"context"
"encoding/json"
"fmt"
"os"
"time"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"ascend-common/api"
"ascend-common/common-utils/hwlog"
"nodeD/pkg/common"
)
const retryTime = 3
var k8sClient *ClientK8s = nil
var localNode *v1.Node
type ClientK8s struct {
ClientSet kubernetes.Interface
NodeName string
NodeInfoName string
}
func NewClientK8s() (*ClientK8s, error) {
clientCfg, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
hwlog.RunLog.Errorf("build client config err: %v", err)
return nil, err
}
client, err := kubernetes.NewForConfig(clientCfg)
if err != nil {
hwlog.RunLog.Errorf("get client err: %v", err)
return nil, err
}
nodeName, err := GetNodeNameFromEnv()
if err != nil {
return nil, err
}
k8sClient = &ClientK8s{
ClientSet: client,
NodeName: nodeName,
NodeInfoName: common.NodeInfoCMNamePrefix + nodeName,
}
return k8sClient, nil
}
func GetK8sClient() *ClientK8s {
return k8sClient
}
func GetNodeNameFromEnv() (string, error) {
nodeName := os.Getenv(api.NodeNameEnv)
if err := checkNodeName(nodeName); err != nil {
return "", fmt.Errorf("check node name failed, err is %v", err)
}
return nodeName, nil
}
func checkNodeName(nodeName string) error {
if len(nodeName) == 0 {
return fmt.Errorf("the env of 'NODE_NAME' must be set")
}
if len(nodeName) > common.KubeEnvMaxLength {
return fmt.Errorf("node name length %d is bigger than k8s env max length %d",
len(nodeName), common.KubeEnvMaxLength)
}
pattern := common.GetPattern()[common.RegexNodeNameKey]
if match := pattern.MatchString(nodeName); !match {
return fmt.Errorf("node name %s is illegal", nodeName)
}
return nil
}
func (ck *ClientK8s) CreateConfigMap(cm *v1.ConfigMap) (*v1.ConfigMap, error) {
if cm == nil {
return nil, fmt.Errorf("param cm is nil")
}
newCM, err := ck.ClientSet.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).
Create(context.TODO(), cm, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return newCM, nil
}
func (ck *ClientK8s) GetConfigMap(cmName, cmNameSpace string) (*v1.ConfigMap, error) {
newCM, err := ck.ClientSet.CoreV1().ConfigMaps(cmNameSpace).Get(context.TODO(), cmName, metav1.GetOptions{
ResourceVersion: "0",
})
if err != nil {
return nil, err
}
return newCM, nil
}
func (ck *ClientK8s) UpdateConfigMap(cm *v1.ConfigMap) (*v1.ConfigMap, error) {
if cm == nil {
return nil, fmt.Errorf("param cm is nil")
}
newCM, err := ck.ClientSet.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).
Update(context.TODO(), cm, metav1.UpdateOptions{})
if err != nil {
return nil, err
}
return newCM, nil
}
func (ck *ClientK8s) DeleteConfigMap(cmNamespace, cmName string) error {
return ck.ClientSet.CoreV1().ConfigMaps(cmNamespace).Delete(context.TODO(), cmName, metav1.DeleteOptions{})
}
func (ck *ClientK8s) CreateOrUpdateConfigMap(cm *v1.ConfigMap) error {
_, err := ck.UpdateConfigMap(cm)
if err == nil {
return nil
}
if errors.IsNotFound(err) {
if _, err := ck.CreateConfigMap(cm); err != nil {
hwlog.RunLog.Errorf("create configmap err: %v", err)
return fmt.Errorf("can not create config map, err is %v", err)
}
return nil
}
hwlog.RunLog.Errorf("update configmap err: %v", err)
return fmt.Errorf("update config map failed, err is %v", err)
}
func (ck *ClientK8s) AddAnnotation(key, value string) error {
patchMap := map[string]string{
"op": "replace",
"path": "/metadata/annotations/" + key,
"value": value,
}
patchMapByte, err := json.Marshal([]interface{}{patchMap})
if err != nil {
hwlog.RunLog.Errorf("marshal patchMap failed, err is %v", err)
return err
}
for i := 0; i < retryTime; i++ {
_, err = ck.ClientSet.CoreV1().Nodes().Patch(context.TODO(), ck.NodeName,
types.JSONPatchType, patchMapByte, metav1.PatchOptions{})
if err != nil {
hwlog.RunLog.Errorf("patch node annotation failed, err is %v", err)
time.Sleep(time.Second)
continue
}
break
}
return err
}
func (ck *ClientK8s) UpdatePodAnnotation(key, value string, pod *v1.Pod) error {
if pod == nil {
return fmt.Errorf("param pod is nil")
}
annotation := map[string]string{key: value}
newPodMetaData := common.PodMetaData{common.MetaData: common.Data{Annotation: annotation}}
podUpdateMetaData, err := json.Marshal(newPodMetaData)
if err != nil {
hwlog.RunLog.Errorf("failed to marshal the pod meta data, error is %v", err)
return err
}
for i := 0; i < retryTime; i++ {
if _, err = ck.ClientSet.CoreV1().Pods(pod.Namespace).Patch(context.Background(),
pod.Name, types.StrategicMergePatchType, podUpdateMetaData, metav1.PatchOptions{}); err == nil {
return nil
}
if errors.IsNotFound(err) {
return err
}
hwlog.RunLog.Warnf("patch pod annotation failed: %v, try again", err)
time.Sleep(time.Second)
}
return fmt.Errorf("patch pod annotation failed, exceeded max number of retries")
}
func (ck *ClientK8s) GetNodeWithCache() (*v1.Node, error) {
if localNode != nil {
return localNode, nil
}
nodeInfo, err := ck.ClientSet.CoreV1().Nodes().Get(context.Background(), ck.NodeName, metav1.GetOptions{
ResourceVersion: "0",
})
if err == nil {
localNode = nodeInfo
}
return nodeInfo, err
}
func (ck *ClientK8s) GetDaemonSet(name, namespace string) (*appsv1.DaemonSet, error) {
ds, err := ck.ClientSet.AppsV1().DaemonSets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return ds, nil
}