import traceback
from urllib.parse import unquote_plus
import time
import os
import json
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkmoderation.v2.region.moderation_region import \
ModerationRegion
from huaweicloudsdkmoderation.v2 import ModerationClient, \
RunImageModerationRequest, ImageDetectionReq
from obs import ObsClient
default_region = 'cn-north-4'
currentDir = os.environ["RUNTIME_CODE_ROOT"]
default_image = currentDir + '/data/default.png'
def check_configuration(context):
region = context.getUserData('region')
if not region:
return 'region is not configured'
obs_server = context.getUserData('obs_server')
if not obs_server:
return 'obs_server is not configured'
ak = context.getAccessKey().strip()
sk = context.getSecretKey().strip()
if not ak or not sk:
ak = context.getUserData('ak', '').strip()
sk = context.getUserData('sk', '').strip()
if not ak or not sk:
return 'ak or sk is empty'
def handler(event, context):
log = context.getLogger()
result = check_configuration(context)
if result is not None:
return result
records = event.get("Records", None)
if records is None:
return 'Records is empty'
processor = Processor(context)
try:
for record in records:
processor.process(record)
except:
log.error("failed to process image, "
f"exception£º{traceback.format_exc()}")
finally:
processor.obs_client.close()
return 'Done'
class Processor:
def __init__(self, context=None):
self.log = context.getLogger()
self.obs_client = new_obs_client(context)
self.moderation_client = new_moderation_client(context)
self.dump_bucket = context.getUserData('dump_bucket')
self.image_url = None
self.src_bucket = None
self.src_object_key = None
self.log_object_key = None
def parse_record(self, record):
region = get_region(record)
(bucket_name, object_key) = get_obs_obj_info(record)
self.log.info("input bucket_name: %s", bucket_name)
object_key = unquote_plus(object_key)
self.log.info("input object: %s", object_key)
self.src_bucket = bucket_name
self.src_object_key = object_key
self.image_url = "https://" + bucket_name + ".obs." + \
region + ".myhuaweicloud.com/" + object_key
(path, filename) = os.path.split(object_key)
(filename, _) = os.path.splitext(filename)
self.log_object_key = path + filename + ".json"
def process(self, record):
self.parse_record(record)
response = self.run_image_moderation(self.image_url)
if response is None or response.status_code != 200:
self.log.error("failed to run image moderation")
return
result = response.result
if result.suggestion == "pass":
return
elif result.suggestion == "block":
self.dump_image()
self.replace_image()
self.report_to_obs(result)
elif result.suggestion == "review":
self.report_to_obs(result)
else:
self.log.warn("suggestion %s is not in (pass, block, review)",
result.suggestion)
def report_to_obs(self, result):
labels = {
"moderation_result": result.suggestion,
"original_path": "obs://" + self.src_bucket + "/" +
self.src_object_key,
"dump_path": "obs://" + self.dump_bucket + "/" +
self.src_object_key,
}
payload = {"log_time_ns": int(round(time.time() * 1000000000)),
"contents": str(result), "labels": labels}
resp = self.obs_client.putObject(self.dump_bucket, self.log_object_key,
json.dumps(payload))
if resp.status < 300:
return True
else:
self.log.error('failed to upload log to obs, errorCode: %s, '
'errorMessage: %s' % (resp.errorCode,
resp.errorMessage))
return False
def run_image_moderation(self, image_url):
try:
request = RunImageModerationRequest()
listImageDetectionReqCategoriesbody = [
"politics",
"terrorism",
"porn",
"ad"
]
request.body = ImageDetectionReq(
threshold=0,
categories=listImageDetectionReqCategoriesbody,
moderation_rule="default",
url=image_url
)
response = self.moderation_client.run_image_moderation(request)
return response
except exceptions.ClientRequestException as e:
self.log.error(f"failed to run image moderation£º"
f"status_code£º{e.status_code}, "
f"request_id:{e.request_id}, "
f"error_code:{e.error_code}. "
f"error_msg:{e.error_msg}")
def replace_image(self):
try:
resp = self.obs_client.putFile(self.src_bucket, self.src_object_key,
default_image)
if resp.status < 300:
self.log.info("succeed to replace image.")
else:
self.log.error("failed to replace image, "
f"requestId£º{resp.requestId} "
f"errorCode£º{resp.errorCode} "
f"errorMessage£º{resp.errorMessage}")
except:
self.log.error("failed to replace image, "
f"exception£º{traceback.format_exc()}")
def dump_image(self):
try:
resp = self.obs_client.copyObject(self.src_bucket,
self.src_object_key,
self.dump_bucket,
self.src_object_key)
if resp.status < 300:
self.log.info("succeed to dump image.")
else:
self.log.error("failed to dump image, "
f"requestId£º{resp.requestId} "
f"errorCode£º{resp.errorCode} "
f"errorMessage£º{resp.errorMessage}")
except:
self.log.error("failed to dump image, "
f"exception£º{traceback.format_exc()}")
def get_obs_obj_info(record):
if 's3' in record:
s3 = record['s3']
return (s3['bucket']['name'], s3['object']['key'])
else:
obs_info = record['obs']
return (obs_info['bucket']['name'], obs_info['object']['key'])
def new_obs_client(context):
return ObsClient(
access_key_id=context.getAccessKey(),
secret_access_key=context.getSecretKey(),
server=context.getUserData('obs_server'),
)
def new_moderation_client(context):
credentials = BasicCredentials(context.getAccessKey(),
context.getSecretKey())
return ModerationClient.new_builder() \
.with_credentials(credentials) \
.with_region(ModerationRegion.value_of(context.getUserData('region'))) \
.build()
def get_region(record):
if 'eventRegion' in record:
return record.get("eventRegion", default_region)
else:
return record.get("awsRegion", default_region)