2d62036f创建于 2025年8月30日历史提交
import cv2
import numpy as np
import torch
from torch.autograd import Variable
import torch.nn.functional as F
import torch.nn as nn
import pickle
import os
from torch.nn.modules import CrossMapLRN2d as SpatialCrossMapLRN
#from torch.legacy.nn import SpatialCrossMapLRN as SpatialCrossMapLRNOld
from torch.autograd import Function, Variable
from torch.nn import Module


def clip_boxes(boxes, im_shape):
    """
    Clip boxes to image boundaries.
    """
    boxes = np.asarray(boxes)
    if boxes.shape[0] == 0:
        return boxes
    boxes = np.copy(boxes)
    # x1 >= 0
    boxes[:, 0::4] = np.maximum(np.minimum(boxes[:, 0::4], im_shape[1] - 1), 0)
    # y1 >= 0
    boxes[:, 1::4] = np.maximum(np.minimum(boxes[:, 1::4], im_shape[0] - 1), 0)
    # x2 < im_shape[1]
    boxes[:, 2::4] = np.maximum(np.minimum(boxes[:, 2::4], im_shape[1] - 1), 0)
    # y2 < im_shape[0]
    boxes[:, 3::4] = np.maximum(np.minimum(boxes[:, 3::4], im_shape[0] - 1), 0)
    return boxes


def load_net(fname, net, prefix='', load_state_dict=False):
    import h5py
    with h5py.File(fname, mode='r') as h5f:
        h5f_is_module = True
        for k in h5f.keys():
            if not str(k).startswith('module.'):
                h5f_is_module = False
                break
        if prefix == '' and not isinstance(net, nn.DataParallel) and h5f_is_module:
            prefix = 'module.'

        for k, v in net.state_dict().items():
            k = prefix + k
            if k in h5f:
                param = torch.from_numpy(np.asarray(h5f[k]))
                if v.size() != param.size():
                    print('Inconsistent shape: {}, {}'.format(v.size(), param.size()))
                else:
                    v.copy_(param)
            else:
                print.warning('No layer: {}'.format(k))

        epoch = h5f.attrs['epoch'] if 'epoch' in h5f.attrs else -1

        if not load_state_dict:
            if 'learning_rates' in h5f.attrs:
                lr = h5f.attrs['learning_rates']
            else:
                lr = h5f.attrs.get('lr', -1)
                lr = np.asarray([lr] if lr > 0 else [], dtype=np.float)

            return epoch, lr

        state_file = fname + '.optimizer_state.pk'
        if os.path.isfile(state_file):
            with open(state_file, 'rb') as f:
                state_dicts = pickle.load(f)
                if not isinstance(state_dicts, list):
                    state_dicts = [state_dicts]
        else:
            state_dicts = None
        return epoch, state_dicts


# class SpatialCrossMapLRNFunc(Function):

#     def __init__(self, size, alpha=1e-4, beta=0.75, k=1):
#         self.size = size
#         self.alpha = alpha
#         self.beta = beta
#         self.k = k

#     def forward(self, input):
#         self.save_for_backward(input)
#         self.lrn = SpatialCrossMapLRNOld(self.size, self.alpha, self.beta, self.k)
#         self.lrn.type(input.type())
#         return self.lrn.forward(input)

#     def backward(self, grad_output):
#         input, = self.saved_tensors
#         return self.lrn.backward(input, grad_output)


# # use this one instead
# class SpatialCrossMapLRN(Module):
#     def __init__(self, size, alpha=1e-4, beta=0.75, k=1):
#         super(SpatialCrossMapLRN, self).__init__()
#         self.size = size
#         self.alpha = alpha
#         self.beta = beta
#         self.k = k

#     def forward(self, input):
#         return SpatialCrossMapLRNFunc(self.size, self.alpha, self.beta, self.k)(input)


class Inception(nn.Module):
    def __init__(self, in_planes, n1x1, n3x3red, n3x3, n5x5red, n5x5, pool_planes):
        super(Inception, self).__init__()
        # 1x1 conv branch
        self.b1 = nn.Sequential(
            nn.Conv2d(in_planes, n1x1, kernel_size=1),
            nn.ReLU(True),
        )

        # 1x1 conv -> 3x3 conv branch
        self.b2 = nn.Sequential(
            nn.Conv2d(in_planes, n3x3red, kernel_size=1),
            nn.ReLU(True),
            nn.Conv2d(n3x3red, n3x3, kernel_size=3, padding=1),
            nn.ReLU(True),
        )

        # 1x1 conv -> 5x5 conv branch
        self.b3 = nn.Sequential(
            nn.Conv2d(in_planes, n5x5red, kernel_size=1),
            nn.ReLU(True),

            nn.Conv2d(n5x5red, n5x5, kernel_size=5, padding=2),
            nn.ReLU(True),
        )

        # 3x3 pool -> 1x1 conv branch
        self.b4 = nn.Sequential(
            nn.MaxPool2d(3, stride=1, padding=1),

            nn.Conv2d(in_planes, pool_planes, kernel_size=1),
            nn.ReLU(True),
        )

    def forward(self, x):
        y1 = self.b1(x)
        y2 = self.b2(x)
        y3 = self.b3(x)
        y4 = self.b4(x)
        return torch.cat([y1,y2,y3,y4], 1)


class GoogLeNet(nn.Module):

    output_channels = 832

    def __init__(self):
        super(GoogLeNet, self).__init__()
        self.pre_layers = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
            nn.ReLU(True),

            nn.MaxPool2d(3, stride=2, ceil_mode=True),
            SpatialCrossMapLRN(5),

            nn.Conv2d(64, 64, 1),
            nn.ReLU(True),

            nn.Conv2d(64, 192, 3, padding=1),
            nn.ReLU(True),

            SpatialCrossMapLRN(5),
            nn.MaxPool2d(3, stride=2, ceil_mode=True),
        )

        self.a3 = Inception(192,  64,  96, 128, 16, 32, 32)
        self.b3 = Inception(256, 128, 128, 192, 32, 96, 64)

        self.maxpool = nn.MaxPool2d(3, stride=2, ceil_mode=True)

        self.a4 = Inception(480, 192,  96, 208, 16,  48,  64)
        self.b4 = Inception(512, 160, 112, 224, 24,  64,  64)
        self.c4 = Inception(512, 128, 128, 256, 24,  64,  64)
        self.d4 = Inception(512, 112, 144, 288, 32,  64,  64)
        self.e4 = Inception(528, 256, 160, 320, 32, 128, 128)

    def forward(self, x):
        out = self.pre_layers(x)
        out = self.a3(out)
        out = self.b3(out)
        out = self.maxpool(out)
        out = self.a4(out)
        out = self.b4(out)
        out = self.c4(out)
        out = self.d4(out)
        out = self.e4(out)

        return out


class Model(nn.Module):
    def __init__(self, n_parts=8):
        super(Model, self).__init__()
        self.n_parts = n_parts

        self.feat_conv = GoogLeNet()
        self.conv_input_feat = nn.Conv2d(self.feat_conv.output_channels, 512, 1)

        # part net
        self.conv_att = nn.Conv2d(512, self.n_parts, 1)

        for i in range(self.n_parts):
            setattr(self, 'linear_feature{}'.format(i+1), nn.Linear(512, 64))

    def forward(self, x):
        feature = self.feat_conv(x)
        feature = self.conv_input_feat(feature)

        att_weights = torch.sigmoid(self.conv_att(feature))

        linear_feautres = []
        for i in range(self.n_parts):
            masked_feature = feature * torch.unsqueeze(att_weights[:, i], 1)
            pooled_feature = F.avg_pool2d(masked_feature, masked_feature.size()[2:4])
            linear_feautres.append(
                getattr(self, 'linear_feature{}'.format(i+1))(pooled_feature.view(pooled_feature.size(0), -1))
            )

        concat_features = torch.cat(linear_feautres, 1)
        normed_feature = concat_features / torch.clamp(torch.norm(concat_features, 2, 1, keepdim=True), min=1e-6)

        return normed_feature


def load_reid_model(ckpt):
    model = Model(n_parts=8)
    model.inp_size = (80, 160)
    load_net(ckpt, model)
    print('Load ReID model from {}'.format(ckpt))

    model = model.cuda()
    model.eval()
    return model


def im_preprocess(image):
    image = np.asarray(image, np.float32)
    image -= np.array([104, 117, 123], dtype=np.float32).reshape(1, 1, -1)
    image = image.transpose((2, 0, 1))
    return image


def extract_image_patches(image, bboxes):
    bboxes = np.round(bboxes).astype(np.int)
    bboxes = clip_boxes(bboxes, image.shape)
    patches = [image[box[1]:box[3], box[0]:box[2]] for box in bboxes]
    return patches


def extract_reid_features(reid_model, image, tlbrs):
    if len(tlbrs) == 0:
        return torch.FloatTensor()

    patches = extract_image_patches(image, tlbrs)
    patches = np.asarray([im_preprocess(cv2.resize(p, reid_model.inp_size)) for p in patches], dtype=np.float32)

    with torch.no_grad():
        im_var = Variable(torch.from_numpy(patches))
        im_var = im_var.cuda()
        features = reid_model(im_var).data
    return features