* This file is part of the KubeVirt project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright The KubeVirt Authors.
*
*/
package virthandler
import (
"context"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"time"
k8sv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
v1 "kubevirt.io/api/core/v1"
"kubevirt.io/client-go/kubecli"
"kubevirt.io/client-go/log"
"kubevirt.io/kubevirt/pkg/controller"
hostdisk "kubevirt.io/kubevirt/pkg/host-disk"
"kubevirt.io/kubevirt/pkg/pointer"
"kubevirt.io/kubevirt/pkg/util/migrations"
virtconfig "kubevirt.io/kubevirt/pkg/virt-config"
cmdclient "kubevirt.io/kubevirt/pkg/virt-handler/cmd-client"
"kubevirt.io/kubevirt/pkg/virt-handler/isolation"
launcherclients "kubevirt.io/kubevirt/pkg/virt-handler/launcher-clients"
migrationproxy "kubevirt.io/kubevirt/pkg/virt-handler/migration-proxy"
"kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/api"
)
var errWaitingForTargetPorts = errors.New("waiting for target to publish migration ports")
type passtRepairSourceHandler interface {
HandleMigrationSource(*v1.VirtualMachineInstance, func(*v1.VirtualMachineInstance) (string, error)) error
}
type MigrationSourceController struct {
*BaseController
vmiExpectations *controller.UIDTrackingControllerExpectations
passtRepairHandler passtRepairSourceHandler
}
func NewMigrationSourceController(
recorder record.EventRecorder,
clientset kubecli.KubevirtClient,
host string,
launcherClients launcherclients.LauncherClientsManager,
vmiInformer cache.SharedIndexInformer,
domainInformer cache.SharedInformer,
clusterConfig *virtconfig.ClusterConfig,
podIsolationDetector isolation.PodIsolationDetector,
migrationProxy migrationproxy.ProxyManager,
virtLauncherFSRunDirPattern string,
netStat netstat,
passtRepairHandler passtRepairSourceHandler,
) (*MigrationSourceController, error) {
queue := workqueue.NewTypedRateLimitingQueueWithConfig[string](
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "virt-handler-source"},
)
logger := log.Log.With("controller", "migration-source")
baseCtrl, err := NewBaseController(
logger,
host,
recorder,
clientset,
queue,
vmiInformer,
domainInformer,
clusterConfig,
podIsolationDetector,
launcherClients,
migrationProxy,
virtLauncherFSRunDirPattern,
netStat,
)
if err != nil {
return nil, err
}
c := &MigrationSourceController{
BaseController: baseCtrl,
vmiExpectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
passtRepairHandler: passtRepairHandler,
}
_, err = vmiInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addFunc,
DeleteFunc: c.deleteFunc,
UpdateFunc: c.updateFunc,
})
if err != nil {
return nil, err
}
_, err = domainInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addDeleteDomainFunc,
DeleteFunc: c.addDeleteDomainFunc,
UpdateFunc: c.updateDomainFunc,
})
if err != nil {
return nil, err
}
return c, nil
}
func (c *MigrationSourceController) hasTargetDetectedReadyDomain(vmi *v1.VirtualMachineInstance) (bool, int64) {
migrationTargetDelayTimeout := 60
if vmi.Status.MigrationState == nil ||
vmi.Status.MigrationState.EndTimestamp == nil {
return false, int64(migrationTargetDelayTimeout)
}
if vmi.Status.MigrationState != nil &&
vmi.Status.MigrationState.TargetState != nil &&
vmi.Status.MigrationState.TargetState.DomainDetected &&
vmi.Status.MigrationState.TargetState.DomainReadyTimestamp != nil {
return true, 0
}
nowUnix := time.Now().UTC().Unix()
migrationEndUnix := vmi.Status.MigrationState.EndTimestamp.Time.UTC().Unix()
diff := nowUnix - migrationEndUnix
if diff > int64(migrationTargetDelayTimeout) {
return false, 0
}
timeLeft := int64(migrationTargetDelayTimeout) - diff
enqueueTime := timeLeft
if enqueueTime < 5 {
enqueueTime = 5
}
c.queue.AddAfter(controller.VirtualMachineInstanceKey(vmi), time.Duration(enqueueTime)*time.Second)
return false, timeLeft
}
func domainMigrated(domain *api.Domain) bool {
return domain != nil && domain.Status.Status == api.Shutoff && domain.Status.Reason == api.ReasonMigrated
}
func (c *MigrationSourceController) setMigrationProgressStatus(vmi *v1.VirtualMachineInstance, domain *api.Domain) {
if domain == nil ||
domain.Spec.Metadata.KubeVirt.Migration == nil ||
vmi.Status.MigrationState == nil ||
!c.isMigrationSource(vmi) {
return
}
migrationMetadata := domain.Spec.Metadata.KubeVirt.Migration
if migrationMetadata.UID != vmi.Status.MigrationState.MigrationUID {
return
}
if migrationMetadata.StartTimestamp != nil {
vmi.Status.MigrationState.StartTimestamp = migrationMetadata.StartTimestamp
}
vmi.Status.MigrationState.Failed = migrationMetadata.Failed
if migrationMetadata.Failed {
vmi.Status.MigrationState.EndTimestamp = migrationMetadata.EndTimestamp
vmi.Status.MigrationState.FailureReason = migrationMetadata.FailureReason
c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("VirtualMachineInstance migration uid %s failed. reason:%s", string(migrationMetadata.UID), migrationMetadata.FailureReason))
}
vmi.Status.MigrationState.AbortStatus = v1.MigrationAbortStatus(migrationMetadata.AbortStatus)
if migrationMetadata.AbortStatus == string(v1.MigrationAbortSucceeded) {
vmi.Status.MigrationState.EndTimestamp = migrationMetadata.EndTimestamp
}
vmi.Status.MigrationState.Mode = migrationMetadata.Mode
}
func (c *MigrationSourceController) updateStatus(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
c.setMigrationProgressStatus(vmi, domain)
migrationHost := ""
if vmi.Status.MigrationState != nil {
migrationHost = vmi.Status.MigrationState.TargetNode
}
targetNodeDetectedDomain, timeLeft := c.hasTargetDetectedReadyDomain(vmi)
if migrationHost == "" {
vmi.Status.Phase = v1.Failed
vmi.Status.MigrationState.Completed = true
vmi.Status.MigrationState.Failed = true
if vmi.Status.MigrationState.EndTimestamp == nil {
vmi.Status.MigrationState.EndTimestamp = pointer.P(metav1.NewTime(time.Now()))
}
c.logger.Object(vmi).Warning("the vmi migrated to an unknown host")
c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance migrated to unknown host."))
} else if !targetNodeDetectedDomain {
if timeLeft <= 0 {
vmi.Status.Phase = v1.Failed
vmi.Status.MigrationState.Completed = true
vmi.Status.MigrationState.Failed = true
if vmi.Status.MigrationState.EndTimestamp == nil {
vmi.Status.MigrationState.EndTimestamp = pointer.P(metav1.NewTime(time.Now()))
}
c.logger.Object(vmi).Warning("the domain was never observed on the taget after the migration completed within the timeout period")
c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance's domain was never observed on the target after the migration completed within the timeout period."))
}
}
if vmi.Status.Phase == v1.Failed && vmi.IsDecentralizedMigration() {
vmi.Status.MigrationState.Completed = true
vmi.Status.MigrationState.Failed = true
c.logger.Object(vmi).Warning("the decentralized migration failed due to the source VMI being failed")
c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.Migrated.String(), fmt.Sprintf("The VirtualMachineInstance's decentralized migration failed due to the source VMI being failed."))
}
if targetNodeDetectedDomain && vmi.IsDecentralizedMigration() && vmi.Status.MigrationState != nil && vmi.Status.MigrationState.Completed {
c.logger.Object(vmi).V(2).Infof("decentralized migration completed successfully, marking VMI as succeeded")
vmi.Status.Phase = v1.Succeeded
}
return nil
}
func (c *MigrationSourceController) Run(threadiness int, stopCh chan struct{}) {
defer c.queue.ShutDown()
c.logger.Info("Starting virt-handler source controller.")
cache.WaitForCacheSync(stopCh, c.hasSynced)
for _, domain := range c.domainStore.List() {
d := domain.(*api.Domain)
vmiRef := v1.NewVMIReferenceWithUUID(
d.ObjectMeta.Namespace,
d.ObjectMeta.Name,
d.Spec.Metadata.KubeVirt.UID)
key := controller.VirtualMachineInstanceKey(vmiRef)
_, exists, _ := c.vmiStore.GetByKey(key)
if !exists {
c.queue.Add(key)
}
}
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
c.logger.Info("Stopping virt-handler source controller.")
}
func (c *MigrationSourceController) runWorker() {
for c.Execute() {
}
}
func (c *MigrationSourceController) Execute() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
if err := c.execute(key); err != nil {
c.logger.Reason(err).Infof("re-enqueuing VirtualMachineInstance %v", key)
c.queue.AddRateLimited(key)
} else {
c.logger.V(4).Infof("processed VirtualMachineInstance %v", key)
c.queue.Forget(key)
}
return true
}
func (c *MigrationSourceController) sync(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
if domain != nil {
c.logger.Object(vmi).Infof("VMI is in phase: %v | Domain status: %v, reason: %v", vmi.Status.Phase, domain.Status.Status, domain.Status.Reason)
} else {
c.logger.Object(vmi).Infof("VMI is in phase: %v", vmi.Status.Phase)
}
oldStatus := vmi.Status.DeepCopy()
syncErr := c.processVMI(vmi, domain)
if syncErr != nil {
c.recorder.Event(vmi, k8sv1.EventTypeWarning, v1.SyncFailed.String(), syncErr.Error())
c.logger.Object(vmi).Reason(syncErr).Error("Synchronizing the VirtualMachineInstance failed.")
}
updateErr := c.updateStatus(vmi, domain)
if updateErr != nil {
c.logger.Object(vmi).Reason(updateErr).Error("Updating the migration status failed.")
}
netUpdateErr := c.netStat.UpdateStatus(vmi, domain)
if netUpdateErr != nil {
log.Log.Object(vmi).Reason(updateErr).Error("Updating network interfaces status failed.")
}
if !equality.Semantic.DeepEqual(*oldStatus, vmi.Status) {
key := controller.VirtualMachineInstanceKey(vmi)
c.vmiExpectations.SetExpectations(key, 1, 0)
_, err := c.clientset.VirtualMachineInstance(vmi.ObjectMeta.Namespace).Update(context.Background(), vmi, metav1.UpdateOptions{})
if err != nil {
c.vmiExpectations.SetExpectations(key, 0, 0)
return err
}
}
if syncErr != nil {
return syncErr
}
if updateErr != nil {
return updateErr
}
if netUpdateErr != nil {
return netUpdateErr
}
c.logger.Object(vmi).V(4).Info("Source synchronization loop succeeded.")
return nil
}
func (c *MigrationSourceController) execute(key string) error {
vmi, vmiExists, err := c.getVMIFromCache(key)
if err != nil {
return err
}
if !vmiExists || ((vmi.IsDecentralizedMigration() && vmi.Status.Phase == v1.Succeeded) ||
!vmi.IsDecentralizedMigration() && vmi.IsFinal()) ||
vmi.DeletionTimestamp != nil {
c.logger.V(4).Infof("vmi for key %v is terminating, succeeded or does not exists", key)
return nil
}
if !c.vmiExpectations.SatisfiedExpectations(key) {
c.logger.V(4).Object(vmi).Info("waiting for expectations to be satisfied")
return nil
}
domain, domainExists, _, err := c.getDomainFromCache(key)
if err != nil {
return err
}
if domainExists && domain.Spec.Metadata.KubeVirt.UID != vmi.UID {
c.logger.V(4).Object(vmi).Infof("Detected stale vmi %s that still needs cleanup before new vmi with identical name/namespace can be processed", vmi.UID)
return nil
}
if vmi.Status.MigrationState == nil {
c.logger.V(4).Object(vmi).Info("no migration is in progress")
return nil
}
if isMigrationDone(vmi.Status.MigrationState) {
c.migrationProxy.StopSourceListener(string(vmi.UID))
return nil
}
if !c.isMigrationSource(vmi) {
c.logger.Object(vmi).V(4).Info("not a migration source")
return nil
}
return c.sync(vmi.DeepCopy(), domain)
}
func (c *MigrationSourceController) isMigrationSource(vmi *v1.VirtualMachineInstance) bool {
if vmi.IsDecentralizedMigration() {
return vmi.Status.MigrationState != nil &&
vmi.Status.MigrationState.SourceNode == c.host &&
vmi.IsMigrationSource()
}
return vmi.Status.MigrationState != nil &&
vmi.Status.NodeName == c.host &&
vmi.Status.MigrationState.SourceNode == c.host
}
func (c *MigrationSourceController) handleSourceMigrationProxy(vmi *v1.VirtualMachineInstance) error {
res, err := c.podIsolationDetector.Detect(vmi)
if err != nil {
return err
}
baseDir := fmt.Sprintf(filepath.Join(c.virtLauncherFSRunDirPattern, "kubevirt"), res.Pid())
if vmi.Status.MigrationState.TargetDirectMigrationNodePorts == nil {
return errWaitingForTargetPorts
}
err = c.migrationProxy.StartSourceListener(
string(vmi.UID),
vmi.Status.MigrationState.TargetNodeAddress,
vmi.Status.MigrationState.TargetDirectMigrationNodePorts,
baseDir,
)
if err != nil {
return err
}
return nil
}
func (c *MigrationSourceController) migrateVMI(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
shouldReturn, err := c.checkLauncherClient(vmi)
if shouldReturn {
return err
}
client, err := c.launcherClients.GetLauncherClient(vmi)
if err != nil {
return fmt.Errorf(unableCreateVirtLauncherConnectionFmt, err)
}
if vmi.Status.MigrationState.AbortRequested {
err = c.handleMigrationAbort(vmi, client)
return err
}
if isMigrationInProgress(vmi, domain) {
c.logger.Object(vmi).V(4).Infof("migration %s has already been started", vmi.Status.MigrationState.MigrationUID)
return nil
}
err = c.handleSourceMigrationProxy(vmi)
if errors.Is(err, errWaitingForTargetPorts) {
c.logger.Object(vmi).V(4).Info("waiting for target node to publish migration ports")
c.queue.AddAfter(controller.VirtualMachineInstanceKey(vmi), 1*time.Second)
return nil
} else if err != nil {
return fmt.Errorf("failed to handle migration proxy: %v", err)
}
var migrationConfiguration *v1.MigrationConfiguration
if vmi.Status.MigrationState.MigrationConfiguration == nil {
migrationConfiguration = c.clusterConfig.GetMigrationConfiguration()
} else {
migrationConfiguration = vmi.Status.MigrationState.MigrationConfiguration.DeepCopy()
}
if migrationConfiguration.AllowWorkloadDisruption == nil {
migrationConfiguration.AllowWorkloadDisruption = pointer.P(*migrationConfiguration.AllowPostCopy)
}
options := &cmdclient.MigrationOptions{
Bandwidth: *migrationConfiguration.BandwidthPerMigration,
ProgressTimeout: *migrationConfiguration.ProgressTimeout,
CompletionTimeoutPerGiB: *migrationConfiguration.CompletionTimeoutPerGiB,
UnsafeMigration: *migrationConfiguration.UnsafeMigrationOverride,
AllowAutoConverge: *migrationConfiguration.AllowAutoConverge,
AllowPostCopy: *migrationConfiguration.AllowPostCopy,
AllowWorkloadDisruption: *migrationConfiguration.AllowWorkloadDisruption,
}
configureParallelMigrationThreads(options, vmi)
marshalledOptions, err := json.Marshal(options)
if err != nil {
c.logger.Object(vmi).Warning("failed to marshall matched migration options")
} else {
c.logger.Object(vmi).Infof("migration options matched for vmi %s: %s", vmi.Name, string(marshalledOptions))
}
vmiCopy := vmi.DeepCopy()
err = hostdisk.ReplacePVCByHostDisk(vmiCopy)
if err != nil {
return err
}
if c.clusterConfig.PasstIPStackMigrationEnabled() {
if err := c.passtRepairHandler.HandleMigrationSource(vmi, c.passtSocketDirOnHostForVMI); err != nil {
c.logger.Object(vmi).Warningf("failed to call passt-repair for migration source, %v", err)
}
}
err = client.MigrateVirtualMachine(vmiCopy, options)
if err != nil {
return err
}
c.recorder.Event(vmi, k8sv1.EventTypeNormal, v1.Migrating.String(), VMIMigrating)
return nil
}
func isMigrationDone(state *v1.VirtualMachineInstanceMigrationState) bool {
return state == nil || (state.EndTimestamp != nil && (state.Completed || state.Failed))
}
func (c *MigrationSourceController) processVMI(vmi *v1.VirtualMachineInstance, domain *api.Domain) error {
domainAlive := domain != nil &&
domain.Status.Status != api.Shutoff &&
domain.Status.Status != api.Crashed &&
domain.Status.Status != ""
if !domainAlive {
c.logger.V(4).Object(vmi).Info("domain is not alive")
return nil
}
return c.migrateVMI(vmi, domain)
}
func (c *MigrationSourceController) addFunc(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err == nil {
c.vmiExpectations.SetExpectations(key, 0, 0)
c.queue.Add(key)
}
}
func (c *MigrationSourceController) deleteFunc(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
}
func (c *MigrationSourceController) updateFunc(_, new interface{}) {
key, err := controller.KeyFunc(new)
if err == nil {
c.vmiExpectations.SetExpectations(key, 0, 0)
c.queue.Add(key)
}
}
func (c *MigrationSourceController) addDeleteDomainFunc(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
}
func (c *MigrationSourceController) updateDomainFunc(_, new interface{}) {
key, err := controller.KeyFunc(new)
if err == nil {
c.queue.Add(key)
}
}
func (c *MigrationSourceController) handleMigrationAbort(vmi *v1.VirtualMachineInstance, client cmdclient.LauncherClient) error {
if vmi.Status.MigrationState.AbortStatus == v1.MigrationAbortInProgress || vmi.Status.MigrationState.AbortStatus == v1.MigrationAbortSucceeded {
return nil
}
if err := client.CancelVirtualMachineMigration(vmi); err != nil {
if err.Error() == migrations.CancelMigrationFailedVmiNotMigratingErr {
c.logger.Object(vmi).Infof("skipping migration cancellation since vmi is not migrating")
}
return err
}
c.recorder.Event(vmi, k8sv1.EventTypeNormal, v1.Migrating.String(), VMIAbortingMigration)
return nil
}
func configureParallelMigrationThreads(options *cmdclient.MigrationOptions, vm *v1.VirtualMachineInstance) {
if cpuLimit, cpuLimitExists := vm.Spec.Domain.Resources.Limits[k8sv1.ResourceCPU]; cpuLimitExists && !cpuLimit.IsZero() {
return
}
if options.AllowPostCopy {
return
}
options.ParallelMigrationThreads = pointer.P(parallelMultifdMigrationThreads)
}