508c3ae3创建于 2022年6月10日历史提交
# coding: utf-8
import traceback
from urllib.parse import unquote_plus
import time
import os
import json
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkmoderation.v2.region.moderation_region import \
    ModerationRegion
from huaweicloudsdkmoderation.v2 import ModerationClient, \
    RunImageModerationRequest, ImageDetectionReq
from obs import ObsClient

default_region = 'cn-north-4'
currentDir = os.environ["RUNTIME_CODE_ROOT"]
default_image = currentDir + '/data/default.png'

def check_configuration(context):
    region = context.getUserData('region')
    if not region:
        return 'region is not configured'
    obs_server = context.getUserData('obs_server')
    if not obs_server:
        return 'obs_server is not configured'
    ak = context.getAccessKey().strip()
    sk = context.getSecretKey().strip()
    if not ak or not sk:
        ak = context.getUserData('ak', '').strip()
        sk = context.getUserData('sk', '').strip()
        if not ak or not sk:
            return 'ak or sk is empty'


def handler(event, context):
    log = context.getLogger()
    result = check_configuration(context)
    if result is not None:
        return result
    records = event.get("Records", None)
    if records is None:
        return 'Records is empty'

    processor = Processor(context)
    try:
        for record in records:
            processor.process(record)
    except:
        log.error("failed to process image, "
                  f"exception£º{traceback.format_exc()}")
    finally:
        
        processor.obs_client.close()

    return 'Done'


class Processor:
    def __init__(self, context=None):
        self.log = context.getLogger()
        self.obs_client = new_obs_client(context)
        self.moderation_client = new_moderation_client(context)
        self.dump_bucket = context.getUserData('dump_bucket')
        self.image_url = None
        self.src_bucket = None
        self.src_object_key = None
        self.log_object_key = None

    def parse_record(self, record):
        region = get_region(record)
        (bucket_name, object_key) = get_obs_obj_info(record)
        self.log.info("input bucket_name: %s", bucket_name)
        object_key = unquote_plus(object_key)
        self.log.info("input object: %s", object_key)
        self.src_bucket = bucket_name
        self.src_object_key = object_key
        self.image_url = "https://" + bucket_name + ".obs." + \
                    region + ".myhuaweicloud.com/" + object_key
        (path, filename) = os.path.split(object_key)
        (filename, _) = os.path.splitext(filename)
        
        self.log_object_key = path + filename + ".json"

    def process(self, record):
        
        self.parse_record(record)

        
        response = self.run_image_moderation(self.image_url)
        if response is None or response.status_code != 200:
            self.log.error("failed to run image moderation")
            return

        result = response.result
        if result.suggestion == "pass":
            return
        elif result.suggestion == "block":
            self.dump_image()
            self.replace_image()
            self.report_to_obs(result)
        elif result.suggestion == "review":
            self.report_to_obs(result)
        else:
            self.log.warn("suggestion %s is not in (pass, block, review)",
                          result.suggestion)

    def report_to_obs(self, result):
        labels = {
            "moderation_result": result.suggestion,
            "original_path": "obs://" + self.src_bucket + "/" +
                             self.src_object_key,
            "dump_path": "obs://" + self.dump_bucket + "/" +
                         self.src_object_key,
        }
        payload = {"log_time_ns": int(round(time.time() * 1000000000)),
                   "contents": str(result), "labels": labels}
        resp = self.obs_client.putObject(self.dump_bucket, self.log_object_key,
                                         json.dumps(payload))
        if resp.status < 300:
            return True
        else:
            self.log.error('failed to upload log to obs, errorCode: %s, '
                           'errorMessage: %s' % (resp.errorCode,
                                                 resp.errorMessage))
            return False

    def run_image_moderation(self, image_url):
        try:
            request = RunImageModerationRequest()
            listImageDetectionReqCategoriesbody = [
                "politics",
                "terrorism",
                "porn",
                "ad"
            ]
            request.body = ImageDetectionReq(
                threshold=0,
                categories=listImageDetectionReqCategoriesbody,
                moderation_rule="default",
                url=image_url
            )
            response = self.moderation_client.run_image_moderation(request)
            return response
        except exceptions.ClientRequestException as e:
            self.log.error(f"failed to run image moderation£º"
                           f"status_code£º{e.status_code}, "
                           f"request_id:{e.request_id}, "
                           f"error_code:{e.error_code}. "
                           f"error_msg:{e.error_msg}")

    def replace_image(self):
        try:
            resp = self.obs_client.putFile(self.src_bucket, self.src_object_key,
                                           default_image)
            if resp.status < 300:
                self.log.info("succeed to replace image.")
            else:
                self.log.error("failed to replace image, "
                               f"requestId£º{resp.requestId} "
                               f"errorCode£º{resp.errorCode} "
                               f"errorMessage£º{resp.errorMessage}")
        except:
            self.log.error("failed to replace image, "
                           f"exception£º{traceback.format_exc()}")

    def dump_image(self):
        try:
            resp = self.obs_client.copyObject(self.src_bucket,
                                              self.src_object_key,
                                              self.dump_bucket,
                                              self.src_object_key)
            if resp.status < 300:
                self.log.info("succeed to dump image.")
            else:
                self.log.error("failed to dump image, "
                               f"requestId£º{resp.requestId} "
                               f"errorCode£º{resp.errorCode} "
                               f"errorMessage£º{resp.errorMessage}")
        except:
            self.log.error("failed to dump image, "
                           f"exception£º{traceback.format_exc()}")


def get_obs_obj_info(record):
    if 's3' in record:
        s3 = record['s3']
        return (s3['bucket']['name'], s3['object']['key'])
    else:
        obs_info = record['obs']
        return (obs_info['bucket']['name'], obs_info['object']['key'])


def new_obs_client(context):
    return ObsClient(
        access_key_id=context.getAccessKey(),
        secret_access_key=context.getSecretKey(),
        server=context.getUserData('obs_server'),
    )


def new_moderation_client(context):
    credentials = BasicCredentials(context.getAccessKey(),
                                   context.getSecretKey())
    return ModerationClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(ModerationRegion.value_of(context.getUserData('region'))) \
        .build()


def get_region(record):
    if 'eventRegion' in record:
        return record.get("eventRegion", default_region)
    else:
        return record.get("awsRegion", default_region)