package app
import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"github.com/barnettZQG/gotty/server"
"github.com/barnettZQG/gotty/webtty"
k8sutil "github.com/goodrain/rainbond/util/k8s"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
restclient "k8s.io/client-go/rest"
)
var ExecuteCommandTotal float64
var ExecuteCommandFailed float64
type App struct {
upgrader *websocket.Upgrader
restClient *restclient.RESTClient
coreClient kubernetes.Interface
config *restclient.Config
}
type Options struct {
Address string `hcl:"address"`
Port string `hcl:"port"`
PermitWrite bool `hcl:"permit_write"`
IndexFile string `hcl:"index_file"`
TitleFormat string `hcl:"title_format"`
EnableReconnect bool `hcl:"enable_reconnect"`
ReconnectTime int `hcl:"reconnect_time"`
PermitArguments bool `hcl:"permit_arguments"`
CloseSignal int `hcl:"close_signal"`
RawPreferences map[string]interface{} `hcl:"preferences"`
SessionKey string `hcl:"session_key"`
K8SConfPath string
}
var DefaultOptions = Options{
Address: "",
Port: "8080",
PermitWrite: true,
IndexFile: "",
TitleFormat: "GRTTY Command",
EnableReconnect: true,
ReconnectTime: 10,
CloseSignal: 1,
SessionKey: "_auth_user_id",
}
type InitMessage struct {
TenantID string `json:"T_id"`
ServiceID string `json:"S_id"`
PodName string `json:"C_id"`
ContainerName string `json:"containerName"`
Md5 string `json:"Md5"`
Namespace string `json:"namespace"`
}
func (app *App) SetUpgrader(u *websocket.Upgrader) {
app.upgrader = u
}
func (app *App) HandleWS(w http.ResponseWriter, r *http.Request) {
logrus.Printf("New client connected: %s", r.RemoteAddr)
if r.Method != "GET" {
http.Error(w, "Method not allowed", 405)
return
}
conn, err := app.upgrader.Upgrade(w, r, nil)
if err != nil {
logrus.Print("Failed to upgrade connection: " + err.Error())
return
}
_, stream, err := conn.ReadMessage()
if err != nil {
logrus.Print("Failed to authenticate websocket connection " + err.Error())
conn.Close()
return
}
message := string(stream)
logrus.Print("message=", message)
var init InitMessage
err = json.Unmarshal(stream, &init)
if init.PodName == "" {
logrus.Print("Parameter is error, pod name is empty")
conn.WriteMessage(websocket.TextMessage, []byte("pod name can not be empty"))
conn.Close()
return
}
key := init.TenantID + "_" + init.ServiceID + "_" + init.PodName
md5 := md5Func(key)
if md5 != init.Md5 {
logrus.Print("Auth is not allowed !")
conn.WriteMessage(websocket.TextMessage, []byte("Auth is not allowed!"))
conn.Close()
return
}
if init.Namespace == "" {
init.Namespace = init.TenantID
}
var containerName, ip string
var args []string
maxRetries := 30
retryInterval := time.Second
for i := 0; i < maxRetries; i++ {
containerName, ip, args, err = app.GetContainerArgs(init.Namespace, init.PodName, init.ContainerName)
if err == nil {
break
}
errMsg := err.Error()
if strings.Contains(errMsg, "not running yet") ||
strings.Contains(errMsg, "not ready yet") ||
strings.Contains(errMsg, "status not found") {
logrus.Infof("waiting for container to be ready (attempt %d/%d): %s", i+1, maxRetries, errMsg)
time.Sleep(retryInterval)
continue
}
logrus.Errorf("get default container failure %s", err.Error())
conn.WriteMessage(websocket.TextMessage, []byte("Get default container name failure!"))
ExecuteCommandFailed++
return
}
if err != nil {
logrus.Errorf("container not ready after %d retries: %s", maxRetries, err.Error())
conn.WriteMessage(websocket.TextMessage, []byte("Container is not ready, please try again later!"))
ExecuteCommandFailed++
return
}
request := app.NewRequest(init.PodName, init.Namespace, containerName, args)
var slave server.Slave
slave, err = NewExecContext(request, app.config)
if err != nil {
logrus.Errorf("open exec context failure %s", err.Error())
conn.WriteMessage(websocket.TextMessage, []byte("open tty failure!"))
ExecuteCommandFailed++
return
}
defer slave.Close()
opts := []webtty.Option{
webtty.WithWindowTitle([]byte(ip)),
webtty.WithReconnect(60),
webtty.WithPermitWrite(),
}
tty, err := webtty.New(&WsWrapper{conn}, slave, opts...)
if err != nil {
logrus.Errorf("open web tty context failure %s", err.Error())
conn.WriteMessage(websocket.TextMessage, []byte("open tty failure!"))
ExecuteCommandFailed++
return
}
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
err = tty.Run(ctx)
if err != nil {
if strings.Contains(err.Error(), "master closed") {
logrus.Infof("client close connection")
return
}
logrus.Errorf("run web tty failure %s", err.Error())
conn.WriteMessage(websocket.TextMessage, []byte("run tty failure!"))
ExecuteCommandFailed++
return
}
}
func (app *App) CreateKubeClient() error {
config, err := k8sutil.NewRestConfig("")
if err != nil {
return err
}
config.UserAgent = "rainbond/webcli"
coreAPI, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}
SetConfigDefaults(config)
app.config = config
restClient, err := restclient.RESTClientFor(config)
if err != nil {
return err
}
app.restClient = restClient
app.coreClient = coreAPI
return nil
}
func SetConfigDefaults(config *rest.Config) error {
if config.APIPath == "" {
config.APIPath = "/api"
}
config.GroupVersion = &schema.GroupVersion{Group: "", Version: "v1"}
config.NegotiatedSerializer = serializer.NewCodecFactory(runtime.NewScheme())
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
return nil
}
func (app *App) GetContainerArgs(namespace, podname, containerName string) (string, string, []string, error) {
var args = []string{"/bin/sh"}
pod, err := app.coreClient.CoreV1().Pods(namespace).Get(context.Background(), podname, metav1.GetOptions{})
if err != nil {
return "", "", args, err
}
if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed {
return "", "", args, fmt.Errorf("cannot exec into a container in a completed pod; current phase is %s", pod.Status.Phase)
}
if pod.Status.Phase != api.PodRunning {
return "", "", args, fmt.Errorf("pod is not running yet; current phase is %s", pod.Status.Phase)
}
var targetContainerName string
for i, container := range pod.Spec.Containers {
if container.Name == containerName || (containerName == "" && i == 0) {
targetContainerName = container.Name
for _, env := range container.Env {
if env.Name == "ES_DEFAULT_EXEC_ARGS" {
args = strings.Split(env.Value, " ")
}
}
break
}
}
if targetContainerName == "" {
return "", "", args, fmt.Errorf("not have container in pod %s/%s", namespace, podname)
}
containerReady := false
for _, status := range pod.Status.ContainerStatuses {
if status.Name == targetContainerName {
if !status.Ready || status.State.Running == nil {
return "", "", args, fmt.Errorf("container %s is not ready yet", targetContainerName)
}
containerReady = true
break
}
}
if !containerReady {
return "", "", args, fmt.Errorf("container %s status not found", targetContainerName)
}
return targetContainerName, pod.Status.PodIP, args, nil
}
func (app *App) NewRequest(podName, namespace, containerName string, command []string) *restclient.Request {
req := app.restClient.Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
Param("container", containerName).
Param("stdin", "true").
Param("stdout", "true").
Param("stderr", "false").
Param("tty", "true")
for _, c := range command {
req.Param("command", c)
}
return req
}
func md5Func(str string) string {
h := md5.New()
h.Write([]byte(str))
cipherStr := h.Sum(nil)
return hex.EncodeToString(cipherStr)
}