package controller
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"strconv"
"strings"
"github.com/go-chi/chi"
"github.com/goodrain/rainbond-operator/util/constants"
"github.com/goodrain/rainbond/api/handler"
ctxutil "github.com/goodrain/rainbond/api/util/ctx"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/pkg/component/k8s"
"github.com/goodrain/rainbond/util"
utils "github.com/goodrain/rainbond/util"
httputil "github.com/goodrain/rainbond/util/http"
"github.com/goodrain/rainbond/worker/server"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type PodController struct{}
func Pods(w http.ResponseWriter, r *http.Request) {
serviceIDs := strings.Split(r.FormValue("service_ids"), ",")
if serviceIDs == nil || len(serviceIDs) == 0 {
tenant := r.Context().Value(ctxutil.ContextKey("tenant")).(*model.Tenants)
services, _ := db.GetManager().TenantServiceDao().GetServicesByTenantID(tenant.UUID)
for _, s := range services {
serviceIDs = append(serviceIDs, s.ServiceID)
}
}
var allpods []*handler.K8sPodInfo
podinfo, err := handler.GetServiceManager().GetMultiServicePods(serviceIDs)
if err != nil {
logrus.Errorf("get service pod failure %s", err.Error())
}
if podinfo != nil {
var pods []*handler.K8sPodInfo
if podinfo.OldPods != nil {
pods = append(podinfo.NewPods, podinfo.OldPods...)
} else {
pods = podinfo.NewPods
}
for _, pod := range pods {
allpods = append(allpods, pod)
}
}
httputil.ReturnSuccess(r, w, allpods)
}
func PodNums(w http.ResponseWriter, r *http.Request) {
componentIDs := strings.Split(r.FormValue("service_ids"), ",")
podNums, err := handler.GetServiceManager().GetComponentPodNums(r.Context(), componentIDs)
if err != nil {
httputil.ReturnBcodeError(r, w, err)
return
}
httputil.ReturnSuccess(r, w, podNums)
}
func (p *PodController) SystemPodDetail(w http.ResponseWriter, r *http.Request) {
ns := r.URL.Query().Get("ns")
name := r.URL.Query().Get("name")
list, err := k8s.Default().Clientset.CoreV1().Pods(ns).List(r.Context(), metav1.ListOptions{
LabelSelector: "name=" + name,
})
if err != nil {
logrus.Errorf("error getting pod detail: %v", err)
return
}
httputil.ReturnSuccess(r, w, list)
}
func (p *PodController) PodDetail(w http.ResponseWriter, r *http.Request) {
podName := chi.URLParam(r, "pod_name")
tenant := r.Context().Value(ctxutil.ContextKey("tenant")).(*model.Tenants)
pd, err := handler.GetPodHandler().PodDetail(tenant.Namespace, podName)
if err != nil {
logrus.Errorf("error getting pod detail: %v", err)
if err == server.ErrPodNotFound {
httputil.ReturnError(r, w, 404, fmt.Sprintf("error getting pod detail: %v", err))
return
}
httputil.ReturnError(r, w, 500, fmt.Sprintf("error getting pod detail: %v", err))
return
}
httputil.ReturnSuccess(r, w, pd)
}
func logs(w http.ResponseWriter, r *http.Request, podName string, namespace string) {
lines, err := strconv.Atoi(r.URL.Query().Get("lines"))
if err != nil {
lines = 100
}
tailLines := int64(lines)
follow, err := parseFollow(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
container := r.URL.Query().Get("container")
logrus.Infof(
"resource center pod logs request path=%s namespace=%s pod=%s container=%s lines=%d follow=%t",
r.URL.String(), namespace, podName, container, lines, follow,
)
pod, err := k8s.Default().Clientset.CoreV1().Pods(namespace).Get(r.Context(), podName, metav1.GetOptions{})
if err != nil {
logrus.Errorf("Error getting pod %s: %v", podName, err)
http.Error(w, fmt.Sprintf("Error getting pod: %v", err), http.StatusInternalServerError)
return
}
var containers []string
if container != "" {
containers = append(containers, container)
logrus.Infof("Streaming logs from specified container: %s", container)
} else {
if strings.HasPrefix(podName, "rbd-gateway") {
containers = append(containers, "apisix")
logrus.Infof("rbd-gateway pod detected, using container: ingress-apisix")
} else {
for _, c := range pod.Spec.Containers {
containers = append(containers, c.Name)
}
logrus.Infof("No container specified, streaming logs from all containers: %v", containers)
}
}
if len(containers) == 0 {
http.Error(w, "No containers found in pod", http.StatusNotFound)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
logrus.Errorf("Streaming not supported")
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
if len(containers) == 1 {
streamContainerLogs(w, r, podName, namespace, containers[0], tailLines, follow, flusher)
return
}
logrus.Infof("Opening log streams for pod %s with %d containers", podName, len(containers))
logChan := make(chan string, 100)
doneChan := make(chan struct{})
for _, containerName := range containers {
go func(cName string) {
req := k8s.Default().Clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
Follow: follow,
Timestamps: true,
TailLines: &tailLines,
Container: cName,
})
stream, err := req.Stream(r.Context())
if err != nil {
logrus.Errorf("Error opening log stream for container %s: %v", cName, err)
return
}
defer stream.Close()
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
select {
case <-r.Context().Done():
return
case <-doneChan:
return
default:
logLine := fmt.Sprintf("[%s] %s", cName, scanner.Text())
logChan <- logLine
}
}
if err := scanner.Err(); err != nil {
if isExpectedLogStreamClose(r.Context(), err) {
logrus.Infof("Log stream closed for container %s in pod %s/%s: %v", cName, namespace, podName, err)
return
}
logrus.Errorf("Error scanning log stream for container %s in pod %s/%s: %v", cName, namespace, podName, err)
} else {
logrus.Infof("Log stream ended for container %s in pod %s/%s", cName, namespace, podName)
}
}(containerName)
}
for {
select {
case <-r.Context().Done():
close(doneChan)
logrus.Warningf("Request context done: %v", r.Context().Err())
return
case logLine := <-logChan:
msg := "data: " + logLine + "\n\n"
_, err := fmt.Fprintf(w, msg)
flusher.Flush()
if err != nil {
logrus.Errorf("Error writing to response: %v", err)
close(doneChan)
return
}
}
}
}
func streamContainerLogs(
w http.ResponseWriter, r *http.Request, podName, namespace, container string, tailLines int64, follow bool,
flusher http.Flusher,
) {
req := k8s.Default().Clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
Follow: follow,
Timestamps: true,
TailLines: &tailLines,
Container: container,
})
logrus.Infof("Opening log stream for pod %s, container %s", podName, container)
stream, err := req.Stream(r.Context())
if err != nil {
logrus.Errorf("Error opening log stream: %v", err)
http.Error(w, "Error opening log stream", http.StatusInternalServerError)
return
}
defer stream.Close()
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
select {
case <-r.Context().Done():
logrus.Warningf("Request context done: %v", r.Context().Err())
return
default:
msg := "data: " + scanner.Text() + "\n\n"
_, err := fmt.Fprintf(w, msg)
flusher.Flush()
if err != nil {
logrus.Errorf("Error writing to response: %v", err)
}
}
}
if err := scanner.Err(); err != nil {
if isExpectedLogStreamClose(r.Context(), err) {
logrus.Infof("Single-container log stream closed for pod %s/%s container %s: %v", namespace, podName, container, err)
return
}
logrus.Errorf("Error scanning single-container log stream for pod %s/%s container %s: %v", namespace, podName, container, err)
} else {
logrus.Infof("Single-container log stream ended for pod %s/%s container %s", namespace, podName, container)
}
}
func parseFollow(r *http.Request) (bool, error) {
followValue := r.URL.Query().Get("follow")
if followValue == "" {
return true, nil
}
follow, err := strconv.ParseBool(followValue)
if err != nil {
return false, fmt.Errorf("invalid follow value: %s", followValue)
}
return follow, nil
}
func isExpectedLogStreamClose(ctx context.Context, err error) bool {
if err == nil {
return false
}
if ctx != nil && errors.Is(ctx.Err(), context.Canceled) {
return true
}
return errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed)
}
func (p *PodController) SystemPodLogs(w http.ResponseWriter, r *http.Request) {
ns, err := util.GetMyNamespace()
if err != nil {
httputil.ReturnError(r, w, 500, fmt.Sprintf("error getting namespace: %v", err))
return
}
if ns == "" {
ns = utils.GetenvDefault("RBD_NAMESPACE", constants.Namespace)
}
name := r.URL.Query().Get("name")
logs(w, r, name, ns)
}
func (p *PodController) PodLogs(w http.ResponseWriter, r *http.Request) {
tenant := r.Context().Value(ctxutil.ContextKey("tenant")).(*model.Tenants)
logs(w, r, chi.URLParam(r, "pod_name"), tenant.Namespace)
}