/* -------------------------------------------------------------------------
 *
 * dropcmds.cpp
 *	  handle various "DROP" operations
 *
 * Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
 * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  src/gausskernel/optimizer/commands/dropcmds.cpp
 *
 * -------------------------------------------------------------------------
 */
#include "postgres.h"
#include "knl/knl_variable.h"
#include "catalog/indexing.h"
#include "utils/fmgroids.h"
#include "access/heapam.h"
#include "catalog/dependency.h"
#include "catalog/gs_db_privilege.h"
#include "catalog/namespace.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_class.h"
#include "catalog/pg_proc.h"
#include "commands/defrem.h"
#include "commands/trigger.h"
#include "commands/typecmds.h"
#include "gs_policy/gs_policy_masking.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "parser/parse_type.h"
#include "storage/tcap.h"
#include "utils/acl.h"
#include "utils/snapmgr.h"
#include "utils/inval.h"
#include "utils/builtins.h"
#include "utils/syscache.h"
#include "utils/lsyscache.h"

static void does_not_exist_skipping(ObjectType objtype, List* objname, List* objargs, bool missing_ok);
static bool schema_does_not_exist_skipping(List *object, char **msg, char **name);

static bool CheckObjectDropPrivilege(ObjectType removeType, Oid objectId, const Relation relation)
{
    AclResult aclresult = ACLCHECK_NO_PRIV;
    bool anyresult = false;
    switch (removeType) {
        case OBJECT_FUNCTION:
            aclresult = pg_proc_aclcheck(objectId, GetUserId(), ACL_DROP);
            break;
        case OBJECT_PACKAGE:
            aclresult = pg_package_aclcheck(objectId, GetUserId(), ACL_DROP);
            break;
        case OBJECT_SCHEMA:
            aclresult = pg_namespace_aclcheck(objectId, GetUserId(), ACL_DROP);
            break;
        case OBJECT_TYPE:
            aclresult = pg_type_aclcheck(objectId, GetUserId(), ACL_DROP);
            break;
        case OBJECT_FOREIGN_SERVER:
            aclresult = pg_foreign_server_aclcheck(objectId, GetUserId(), ACL_DROP);
            break;
        case OBJECT_TRIGGER:
            if (!IsSysSchema(RelationGetNamespace(relation))) {
                anyresult = HasSpecAnyPriv(GetUserId(), DROP_ANY_TRIGGER, false);
            }
            break;
        default:
            break;
    }

    return ((aclresult == ACLCHECK_OK) ? true : false) || anyresult;
}

static void DropExtensionInListIsSupported(List* objname)
{
    static const char *supportList[] = {
        "drop",
        "postgis",
        "packages",
        "ndpplugin",
        "datavec",
        "chparser",
        "timescaledb",
#ifndef ENABLE_MULTIPLE_NODES
        "mysql_fdw",
        "oracle_fdw",
        "postgres_fdw",
        "dblink",
        "security_plugin",
        "db_a_parser",
        "db_b_parser",
        "db_c_parser",
        "db_pg_parser",
        "hdfs_fdw",
        "age",
        "gms_compress",
        "gms_utility",
        "gms_stats",
        "gms_output",
        "gms_inaddr",
        "gms_profiler",
        "gms_xmlgen",
        "gms_lob",
        "gms_sql",
        "gms_i18n",
        "gms_raw",
        "gms_match",
        "gms_assert",
        "gms_tcp"
#endif
    };
    int len = lengthof(supportList);
    const char* name = strVal(linitial(objname));

#if (!defined(ENABLE_MULTIPLE_NODES)) && (!defined(ENABLE_PRIVATEGAUSS))
    static const char *unsupportList[] = {
        "dolphin"
    };
    int len_unsupport = lengthof(unsupportList);
    for (int i = 0; i < len_unsupport; i++) {
        if (pg_strcasecmp(name, unsupportList[i]) == 0) {
            if (u_sess->attr.attr_common.IsInplaceUpgrade && t_thrd.proc->workingVersionNum < DOLPHIN_ENABLE_DROP_NUM) {
                return;
            }
            ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("EXTENSION is not yet supported.")));
        }
    }
#endif

    for (int i = 0; i < len; i++) {
        if (pg_strcasecmp(name, supportList[i]) == 0) {
            return;
        }
    }

    if (pg_strcasecmp(name, "file_fdw") == 0 && !u_sess->attr.attr_common.IsInplaceUpgrade) {
        ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
            errmsg("EXTENSION file_fdw does not allow to drop.")));
    }

    /* Enable DROP operation of the above objects during inplace upgrade or support_extended_features is true */
    if (!u_sess->attr.attr_common.IsInplaceUpgrade && !g_instance.attr.attr_common.support_extended_features) {
        ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("EXTENSION is not yet supported.")));
    }
}

/*
 * @Description: drop one or more objects.
 *               We don't currently handle all object types here.  Relations, for example,
 *               require special handling, because (for example) indexes have additional
 *               locking requirements.
 *               We look up all the objects first, and then delete them in a single
 *               performMultipleDeletions() call.  This avoids unnecessary DROP RESTRICT
 *               errors if there are dependencies between them.
 * @in stmt : the info of dropstmt.
 * @in is_securityadmin : whether the is a security administrator doing this.
 * @return : nothing.
 */
void RemoveObjects(DropStmt* stmt, bool missing_ok, bool is_securityadmin)
{
    ObjectAddresses* objects = NULL;
    ListCell* cell1 = NULL;
    bool skip_check = false;
    ListCell* cell2 = NULL;
    Oid pkgOid = InvalidOid;
    objects = new_object_addresses();

    foreach (cell1, stmt->objects) {
        ObjectAddress address;
        List* objname = (List*)lfirst(cell1);
        List* objargs = NIL;
        Relation relation = NULL;
        Oid namespaceId;

        if (stmt->arguments) {
            cell2 = (!cell2 ? list_head(stmt->arguments) : lnext(cell2));
            objargs = (List*)lfirst(cell2);
        }

        /* Get an ObjectAddress for the object. */
        address =
            get_object_address(stmt->removeType, objname, objargs, &relation, AccessExclusiveLock, stmt->missing_ok);
        /* Issue NOTICE if supplied object was not found. */
        if (!OidIsValid(address.objectId)) {
            if (u_sess->attr.attr_common.xc_maintenance_mode)
                missing_ok = stmt->missing_ok;
            does_not_exist_skipping(stmt->removeType, objname, objargs, missing_ok);
            continue;
        } else if (stmt->removeType == OBJECT_EXTENSION) {
            DropExtensionInListIsSupported(objname);
        }

        TrForbidAccessRbObject(address.classId, address.objectId);

        /*
         * Although COMMENT ON FUNCTION, SECURITY LABEL ON FUNCTION, etc. are
         * happy to operate on an aggregate as on any other function, we have
         * historically not allowed this for DROP FUNCTION.
         */

        if (stmt->removeType == OBJECT_FUNCTION) {
            Oid funcOid = address.objectId;
            if (IsMaskingFunctionOid(funcOid) && !u_sess->attr.attr_common.IsInplaceUpgrade) {
                ereport(ERROR,
                    (errmodule(MOD_FUNCTION),
                        errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
                            errmsg("function \"%s\" is a masking function, it can not be droped", 
                                NameListToString(objname)),
                                errdetail("cannot drop masking function")));
            }

            HeapTuple tup;
            bool isnull = false;
            tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcOid));
            if (!HeapTupleIsValid(tup)) /* should not happen */
                ereport(ERROR,
                    (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmsg("cache lookup failed for function %u", funcOid)));
            Datum packageOidDatum = SysCacheGetAttr(PROCOID, tup, Anum_pg_proc_packageid, &isnull);
            if (!isnull) {
                Oid pkgFuncOid = DatumGetObjectId(packageOidDatum);
                if (OidIsValid(pkgFuncOid)) {
                    ereport(ERROR,
                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                            errmsg("\"%s\" is a function in package", NameListToString(objname)),
                            errhint("Use DROP PACKAGE.")));
                }
            }
            isnull = false;
            bool ispackage = SysCacheGetAttr(PROCOID, tup, Anum_pg_proc_package, &isnull);
            Datum typeOidDatum = SysCacheGetAttr(PROCOID, tup, Anum_pg_proc_packageid, &isnull);
            if (!isnull && !ispackage) {
                Oid typFuncOid = DatumGetObjectId(typeOidDatum);
                if (OidIsValid(typFuncOid)) {
                    ereport(ERROR,
                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                            errmsg("\"%s\" is a function in TYPE", NameListToString(objname)),
                            errhint("Use DROP TYPE.")));
                }
            }

            if (((Form_pg_proc)GETSTRUCT(tup))->proisagg)
                ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                        errmsg("\"%s\" is an aggregate function", NameListToString(objname)),
                        errhint("Use DROP AGGREGATE to drop aggregate functions.")));

            CacheInvalidateFunction(funcOid, InvalidOid);
            /* send invalid message for for relation holding replaced function as trigger */
            InvalidRelcacheForTriggerFunction(funcOid, ((Form_pg_proc)GETSTRUCT(tup))->prorettype);
            ReleaseSysCache(tup);
        }
        /* For DROP TYPE */
        if (stmt->removeType == OBJECT_TYPE) {
            char typtype = get_typtype(address.objectId);
            if ((typtype == TYPTYPE_ABSTRACT_OBJECT) || (typtype == TYPTYPE_TABLEOF) || (typtype == TYPTYPE_VARRAY)) {
                /* Also need to drop related methods of object type */
                RemoveTypeMethod(address.objectId);
            }
        }
        /* For DROP PACKAGE */
        if (stmt->removeType == OBJECT_PACKAGE) {
            pkgOid = address.objectId;

            HeapTuple oldtup;
            ScanKeyData entry;
            ScanKeyInit(&entry, Anum_pg_proc_packageid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(pkgOid));
            Relation pg_proc_rel = heap_open(ProcedureRelationId, RowExclusiveLock);
            SysScanDesc scan = systable_beginscan(pg_proc_rel, InvalidOid, false, NULL, 1, &entry);
            /*
             * do we need job status not run, we can change the owner, 
             * now only check if have the permission to change the owner, if after change, the running job may 
             * failed in check permission
             */
            while ((oldtup = systable_getnext(scan)) != NULL) {
                HeapTuple proctup = heap_copytuple(oldtup);
                Oid funcOid = InvalidOid;
                if (HeapTupleIsValid(proctup)) {
                    funcOid = HeapTupleGetOid(proctup);
                    if (!OidIsValid(funcOid)) {
                        ereport(ERROR,
                            (errcode(ERRCODE_CACHE_LOOKUP_FAILED),
                                errmodule(MOD_PLSQL),
                                errmsg("cache lookup failed for relid %u", funcOid)));
                    }
                } else {
                    ereport(ERROR,
                        (errcode(ERRCODE_CACHE_LOOKUP_FAILED),
                            errmodule(MOD_PLSQL),
                            errmsg("cache lookup failed for relid %u", funcOid)));
                }
                heap_freetuple(proctup);
            }
            systable_endscan(scan);
            heap_close(pg_proc_rel, RowExclusiveLock);

            HeapTuple tup = SearchSysCache1(PACKAGEOID, ObjectIdGetDatum(pkgOid));
            if (!HeapTupleIsValid(tup)) /* should not happen */
                ereport(ERROR,
                    (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmsg("cache lookup failed for package %u", pkgOid)));
            CacheInvalidateFunction(InvalidOid, pkgOid);
            ReleaseSysCache(tup);
        }

#if (!defined(ENABLE_MULTIPLE_NODES)) && (!defined(ENABLE_PRIVATEGAUSS))
        if (stmt->removeType == OBJECT_SCHEMA && u_sess->attr.attr_sql.dolphin && strcmp(strVal(linitial(objname)), "public") == 0) {
            ereport(ERROR,
                (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
                    errmsg("cannot drop schema public because dolphin depends on it")));
        }
#endif
        // @Temp Table. myTempNamespace and myTempToastNamespace's owner is
        // bootstrap user, so can not be deleted by ordinary user. to ensuer this two
        // schema be deleted on session quiting, we should bypass acl check when
        // drop my own temp namespace
        if (stmt->removeType == OBJECT_SCHEMA && (address.objectId == u_sess->catalog_cxt.myTempNamespace ||
                                                     address.objectId == u_sess->catalog_cxt.myTempToastNamespace))
            skip_check = true;

        /* Check permissions. */
        if (!skip_check) {
            skip_check = CheckObjectDropPrivilege(stmt->removeType, address.objectId, relation);
        }
        namespaceId = get_object_namespace(&address);
        if ((!is_securityadmin) && (!skip_check) &&
            (!OidIsValid(namespaceId) || !pg_namespace_ownercheck(namespaceId, GetUserId())))
            check_object_ownership(GetUserId(), stmt->removeType, address, objname, objargs, relation);

        /* Release any relcache reference count, but keep lock until commit. */
        if (relation)
            heap_close(relation, NoLock);

        add_exact_object_address(&address, objects);
    }

    /* Here we really delete them. */
    performMultipleDeletions(objects, stmt->behavior, 0);

    free_object_addresses(objects);
}

/*
 * schema_does_not_exist_skipping
 *      Subroutine for RemoveObjects
 *
 * After determining that a specification for a schema-qualifiable object
 * refers to an object that does not exist, test whether the specified schema
 * exists or not.  If no schema was specified, or if the schema does exist,
 * return false -- the object itself is missing instead.  If the specified
 * schema does not exist, fill the error message format string and the
 * specified schema name, and return true.
 */
static bool schema_does_not_exist_skipping(List *object, char **msg, char **name)
{
    RangeVar   *rel;

    rel = makeRangeVarFromNameList(object);

    if (rel->schemaname != NULL &&
        !OidIsValid(LookupNamespaceNoError(rel->schemaname))) {
        *msg = gettext_noop("schema \"%s\" does not exist, skipping");
        *name = rel->schemaname;

        return true;
    }

    return false;
}

/*
 * Generate a NOTICE stating that the named object was not found, and is
 * being skipped.  This is only relevant when "IF EXISTS" is used; otherwise,
 * get_object_address() will throw an ERROR.
 */
static void does_not_exist_skipping(ObjectType objtype, List* objname, List* objargs, bool missing_ok)
{
    char* msg = NULL;
    char* name = NULL;
    char* args = NULL;
    List* relname = NIL;
    StringInfo message = makeStringInfo();

    switch (objtype) {
        case OBJECT_ACCESS_METHOD:
            msg = gettext_noop("access method \"%s\" does not exist, skipping");
            name = NameListToString(objname);
            break;
        case OBJECT_TYPE:
        case OBJECT_DOMAIN:
        {
            /*objname migth be a TypeName list or a String list, check its node type firstly*/
            Node * ptype = (Node*) linitial(objname);
            TypeName   *typ = NULL;
            if (ptype->type == T_String)
                typ = makeTypeNameFromNameList(objname);
            else if (ptype->type == T_TypeName)
                typ = (TypeName*)linitial(objname);
            else
                ereport(ERROR, (errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE), (errmsg("unknown type: %d",(int)ptype->type))));
 
            List *typeNames = list_copy(typ->names);
            if (!schema_does_not_exist_skipping(typeNames, &msg, &name)) {
                msg = gettext_noop("type \"%s\" does not exist");
                name = TypeNameToString(typ);
            }
            list_free_ext(typeNames);
        }  break;
        case OBJECT_COLLATION:
            msg = gettext_noop("collation \"%s\" does not exist");
            name = NameListToString(objname);
            break;
        case OBJECT_CONVERSION:
            msg = gettext_noop("conversion \"%s\" does not exist");
            name = NameListToString(objname);
            break;
        case OBJECT_SCHEMA:
            msg = gettext_noop("schema \"%s\" does not exist");
            name = NameListToString(objname);
            break;
        case OBJECT_TSPARSER:
            msg = gettext_noop("text search parser \"%s\" does not exist");
            name = NameListToString(objname);
            break;
        case OBJECT_TSDICTIONARY:
            msg = gettext_noop("text search dictionary \"%s\" does not exist");
            name = NameListToString(objname);
            break;
        case OBJECT_TSTEMPLATE:
            msg = gettext_noop("text search template \"%s\" does not exist");
            name = NameListToString(objname);
            break;
        case OBJECT_TSCONFIGURATION:
            msg = gettext_noop("text search configuration \"%s\" does not exist");
            name = NameListToString(objname);
            break;
        case OBJECT_EXTENSION:
            msg = gettext_noop("extension \"%s\" does not exist");
            name = NameListToString(objname);
            break;
        case OBJECT_FUNCTION:
            msg = gettext_noop("function %s(%s) does not exist");
            name = NameListToString(objname);
            args = TypeNameListToString(objargs);
            break;
        case OBJECT_PACKAGE:
        case OBJECT_PACKAGE_BODY:
            msg = gettext_noop("package %s(%s) does not exist");
            name = NameListToString(objname);
            args = TypeNameListToString(objargs);
            break;
        case OBJECT_AGGREGATE:
            /* Given ordered set aggregate with no direct args, aggr_args variable is modified in gram.y.
               So the parse of aggr_args should be changed. See gram.y for detail. */
            objargs = (List*)linitial(objargs);
            
            msg = gettext_noop("aggregate %s(%s) does not exist");
            name = NameListToString(objname);
            args = TypeNameListToString(objargs);
            break;
        case OBJECT_OPERATOR:
            msg = gettext_noop("operator %s does not exist");
            name = NameListToString(objname);
            break;
        case OBJECT_LANGUAGE:
            msg = gettext_noop("language \"%s\" does not exist");
            name = NameListToString(objname);
            break;
        case OBJECT_CAST:
            msg = gettext_noop("cast from type %s to type %s does not exist");
            name = format_type_be(typenameTypeId(NULL, (TypeName*)linitial(objname)));
            args = format_type_be(typenameTypeId(NULL, (TypeName*)linitial(objargs)));
            break;
        case OBJECT_TRIGGER:
            if (list_length(objname) == 1) {
                msg = gettext_noop("trigger \"%s\" does not exist");
                name = NameListToString(list_make1(lfirst(list_tail((List*)lfirst(list_tail(objname))))));
                break;
            } else {
                msg = gettext_noop("trigger \"%s\" for table \"%s\" does not exist");
                relname = list_truncate(list_copy(objname), list_length(objname) - 1);
                args = NameListToString(relname);
                name = NameListToString(lappend(relname, lfirst(list_tail((List*)lfirst(list_tail(objname))))));
                break;
            }
        case OBJECT_EVENT_TRIGGER:
            msg = gettext_noop("event trigger \"%s\" does not exist, skipping");
            name = NameListToString(objname);
            break;
        case OBJECT_RULE:
            msg = gettext_noop("rule \"%s\" for relation \"%s\" does not exist");
            name = strVal(llast(objname));
            args = NameListToString(list_truncate(list_copy(objname), list_length(objname) - 1));
            break;
        case OBJECT_FDW:
            msg = gettext_noop("foreign-data wrapper \"%s\" does not exist");
            name = NameListToString(objname);
            break;
        case OBJECT_FOREIGN_SERVER:
            msg = gettext_noop("server \"%s\" does not exist");
            name = NameListToString(objname);
            break;
        case OBJECT_OPCLASS:
            {
                List *opcname = list_copy_tail(objname, 1);
                if (!schema_does_not_exist_skipping(opcname, &msg, &name)) {
                    msg = gettext_noop("operator class \"%s\" does not exist for access method \"%s\"");
                    name = NameListToString(opcname);
                    args = strVal(linitial(objname));
                }
                list_free_ext(opcname);
            } break;
        case OBJECT_OPFAMILY:
            {
                List *opfname = list_copy_tail(objname, 1);
                if (!schema_does_not_exist_skipping(opfname, &msg, &name)) {
                    msg = gettext_noop("operator family \"%s\" does not exist for access method \"%s\"");
                    name = NameListToString(opfname);
                    args = strVal(linitial(objname));
                }
                list_free_ext(opfname);
            } break;
        case OBJECT_DATA_SOURCE:
            msg = gettext_noop("data source \"%s\" does not exist");
            name = NameListToString(objname);
            break;
        case OBJECT_RLSPOLICY:
            msg = gettext_noop("row level security policy \"%s\" for relation \"%s\" does not exist");
            name = pstrdup(strVal(llast(objname)));
            args = NameListToString(list_truncate(list_copy(objname), list_length(objname) - 1));
            break;
        case OBJECT_PUBLICATION:
            msg = gettext_noop("publication \"%s\" does not exist, skipping");
            name = NameListToString(objname);
            break;
        default:
            pfree_ext(message->data);
            pfree_ext(message);
            ereport(
                ERROR, (errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE), errmsg("unexpected object type (%d)", (int)objtype)));
            break;
    }

    if (missing_ok) {
        if (args == NULL) {
            appendStringInfo(message, msg, name);
        } else {
            appendStringInfo(message, msg, name, args);
        }

        appendStringInfo(message, ", skipping");

        ereport(NOTICE, (errmsg("%s", message->data)));

        pfree_ext(message->data);
        pfree_ext(message);
        pfree_ext(name);
    } else {
        pfree_ext(message->data);
        pfree_ext(message);

        if (args == NULL)
            ereport(ERROR, (errcode(ERRCODE_UNDEFINED_SCHEMA), errmsg(msg, name)));
        else
            ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION), errmsg(msg, name, args)));
    }
}