"""
Triggers to action user_account database.
"""
import logging
from sqlalchemy.exc import SQLAlchemyError
from analysis.ui.database import tables
from analysis.ui.database.table_user_account import UserAccount
from analysis.ui.database.table_ip_addrs import IpAddrs
from analysis.ui.database.table_tuning import TuningTable
from analysis.ui.database.table_collection import CollectionTable
LOGGER = logging.getLogger(__name__)
def user_exist(email, pwd):
"""check if user exist"""
session = tables.get_session()
uid = -1
account_name = ''
if session is None:
return uid, account_name
try:
user_account = UserAccount()
password = user_account.get_field_by_key(UserAccount.password, UserAccount.email, email,
session)
if password is not None and password == pwd:
uid = user_account.get_field_by_key(UserAccount.user_id, UserAccount.email, email,
session)
account_name = user_account.get_field_by_key(UserAccount.account_name,
UserAccount.email, email, session)
except SQLAlchemyError as err:
LOGGER.error('User login failed: %s', err)
return -1, account_name
finally:
session.close()
return uid, account_name
def create_user(email, pwd, name):
"""create new user"""
session = tables.get_session()
if session is None:
return False, False
try:
user_account = UserAccount()
password = user_account.get_field_by_key(UserAccount.password, UserAccount.email, email,
session)
if password is not None:
return False, True
uid = user_account.get_max_uid(session)
if uid is None:
uid = 1
else:
uid += 1
user_account.insert_new_user(uid, email, name, pwd, session)
session.commit()
except SQLAlchemyError as err:
LOGGER.error('Create new user failed: %s', err)
return False, False
finally:
session.close()
return True, False
def user_ip_list(uid):
"""get all ip as list for given user"""
session = tables.get_session()
res = []
if session is None:
return res
try:
ip_table = IpAddrs()
res.extend(ip_table.get_ips_by_uid(uid, session))
except SQLAlchemyError as err:
LOGGER.error('Get user ip list failed: %s', err)
return []
finally:
session.close()
return res
def ip_info_list(ip_addr):
"""get analysis and tuning list with ip_addr"""
session = tables.get_session()
if session is None:
return {'getData': False}
response = {}
try:
response['getData'] = True
tuning_table = TuningTable()
response['tuning'] = tuning_table.get_all_tunings_by_ip(ip_addr, session)
collection_table = CollectionTable()
response['analysis'] = collection_table.get_all_collection_by_ip(ip_addr, session)
except SQLAlchemyError as err:
LOGGER.error('Get analysis and tuning list failed: %s', err)
return response
finally:
session.close()
return response
def add_ip(uid, ip_addrs, ip_port, server_user, server_password, description):
"""add ip to table"""
session = tables.get_session()
if session is None:
return False
res = False
try:
ip_table = IpAddrs()
if not ip_table.check_exist_by_ip(ip_addrs, uid, session):
res = ip_table.insert_ip_by_user(ip_addrs, ip_port, server_user, server_password, description, uid, session)
session.commit()
except SQLAlchemyError as err:
LOGGER.error('Insert new ip failed: %s', err)
return res
finally:
session.close()
return res
def update_ip(uid, ip_addrs, ip_port, server_user, server_password, description):
"""update ip from table"""
session = tables.get_session()
if session is None:
return False
res = False
try:
ip_table = IpAddrs()
res = ip_table.update_ip_by_user(ip_addrs, ip_port, server_user, server_password, description, uid, session)
session.commit()
except SQLAlchemyError as err:
LOGGER.error('Insert new ip failed: %s', err)
return res
finally:
session.close()
return res
def delete_ip(uid, ip_addrs):
"""delete ip from table"""
session = tables.get_session()
if session is None:
return False
res = False
try:
ip_table = IpAddrs()
if ip_table.check_exist_by_ip(ip_addrs, uid, session):
res = ip_table.delete_ip_by_user(ip_addrs, uid, session)
session.commit()
except SQLAlchemyError as err:
LOGGER.error('Delete ip failed: %s', err)
return res
finally:
session.close()
return res
def get_user_info(uid):
"""get user info for given user"""
session = tables.get_session()
account_name = ''
description = ''
if session is None:
return account_name, description
try:
user_account = UserAccount()
account_name = user_account.get_field_by_key(UserAccount.account_name, UserAccount.user_id, uid,
session)
description = user_account.get_field_by_key(UserAccount.description, UserAccount.user_id, uid,
session)
except SQLAlchemyError as err:
LOGGER.error('User login failed: %s', err)
return account_name, description
finally:
session.close()
return account_name, description
def change_user_pwd(uid, pwd, new_pwd):
"""check if user password match pwd"""
session = tables.get_session()
if session is None:
return {'oldMatch': False}
response = {}
try:
user_account = UserAccount()
password = user_account.get_field_by_key(UserAccount.password, UserAccount.user_id, uid,
session)
if password is None or password != pwd:
return {'oldMatch': False}
response['oldMatch'] = True
response['newMatch'] = user_account.update_password(uid, new_pwd, session)
session.commit()
except SQLAlchemyError as err:
LOGGER.error('Check user password failed: %s', err)
return response
finally:
session.close()
return response
def change_user_info(uid, name, description):
"""change basic info of user"""
session = tables.get_session()
if session is None:
return {'success': False}
response = {}
try:
user_account = UserAccount()
response['success'] = user_account.update_user_basic_info(uid, name, description, session)
session.commit()
except SQLAlchemyError as err:
LOGGER.error('Check user password failed: %s', err)
return response
finally:
session.close()
return response
def count_user():
"""count the number of user in user_account table"""
session = tables.get_session()
if session is None:
return 0
count = 0
try:
user_account = UserAccount()
uid = user_account.get_max_uid(session)
if uid is None:
return 0
return uid + 1
except SQLAlchemyError as err:
LOGGER.error('Count user number failed: %s', err)
return count
finally:
session.close()
return count
def create_admin(pwd):
"""create admin account"""
session = tables.get_session()
if session is None:
return {'success': False, 'reason': 'failed'}
try:
user_account = UserAccount()
user_account.insert_new_user(0, 'admin@openeuler.org', 'Admin', pwd, session, 'admin')
session.commit()
except SQLAlchemyError as err:
LOGGER.error('Create admin account failed: %s', err)
return {'success': False, 'reason': 'failed'}
finally:
session.close()
return {'success': True}
def get_user_list():
"""get user details in user_account table"""
session = tables.get_session()
if session is None:
return {'success': False, 'reason': 'failed'}
try:
user_account = UserAccount()
user_account.find_all_user(session)
except SQLAlchemyError as err:
LOGGER.error('Get user details failed: %s', err)
return {'success': False, 'reason': 'failed'}
finally:
session.close()
return {'success': True}
def delete_user(uid):
"""delete user account by uid"""
session = tables.get_session()
if session is None:
return {'success': False, 'reason': 'failed'}
response = {}
try:
user_account = UserAccount()
user_account.delete_user(uid, session)
session.commit()
except SQLAlchemyError as err:
LOGGER.error('delete user account failed: %s', err)
return response
finally:
session.close()
return response
def update_user(uid, pwd, name, role):
"""delete user account by uid"""
session = tables.get_session()
if session is None:
return {'success': False, 'reason': 'failed'}
response = {}
try:
user_account = UserAccount()
user_account.update_user(uid, pwd, name, role, session)
session.commit()
except SQLAlchemyError as err:
LOGGER.error('update user account failed: %s', err)
return response
finally:
session.close()
return response