import unittest
import pandas as pd
from mock import patch, MagicMock
from msprof_analyze.prof_common.constant import Constant
from msprof_analyze.prof_common.database_service import DatabaseService
class TestDatabaseService(unittest.TestCase):
def setUp(self):
"""设置测试环境"""
self.db_path = "/path/to/test.db"
self.step_range = {
Constant.START_NS: 1000000,
Constant.END_NS: 2000000
}
self.database_service = DatabaseService(self.db_path, self.step_range)
def test_init_with_step_range(self):
"""测试使用时间范围初始化数据库服务"""
self.assertEqual(self.database_service._db_path, self.db_path)
self.assertEqual(self.database_service._step_range, self.step_range)
self.assertEqual(self.database_service._table_info, {})
self.assertEqual(self.database_service._param, (1000000, 2000000))
def test_init_without_step_range(self):
"""测试不使用时间范围初始化数据库服务"""
database_service = DatabaseService(self.db_path, None)
self.assertEqual(database_service._db_path, self.db_path)
self.assertIsNone(database_service._step_range)
self.assertEqual(database_service._table_info, {})
self.assertIsNone(database_service._param)
def test_add_table_for_query_valid(self):
"""测试添加有效的表和列信息"""
table_name = "TEST_TABLE"
columns = ["col1", "col2", "col3"]
self.database_service.add_table_for_query(table_name, columns)
self.assertIn(table_name, self.database_service._table_info)
self.assertEqual(self.database_service._table_info[table_name], columns)
table_name2 = "TEST_TABLE2"
self.database_service.add_table_for_query(table_name2)
self.assertIn(table_name2, self.database_service._table_info)
self.assertIsNone(self.database_service._table_info[table_name2])
@patch('msprof_analyze.prof_common.database_service.logger.error')
def test_add_table_for_query_invalid(self, mock_error):
"""测试添加无效的表和列信息"""
self.database_service.add_table_for_query(123, ["col1", "col2"])
mock_error.assert_called_once_with("Parameter table_name must be type of string.")
mock_error.reset_mock()
self.database_service.add_table_for_query("TEST_TABLE", "not_a_list")
mock_error.assert_called_once_with("Parameter columns must be type of list.")
@patch('msprof_analyze.prof_common.database_service.DBManager.create_connect_db')
def test_query_data_empty_table_info(self, mock_create_connect):
"""测试查询数据时表信息为空的情况"""
self.database_service._table_info = {}
result = self.database_service.query_data()
self.assertEqual(result, {})
mock_create_connect.assert_not_called()
@patch('msprof_analyze.prof_common.database_service.DBManager.create_connect_db')
def test_query_data_empty_db_path(self, mock_create_connect):
"""测试查询数据时数据库路径为空的情况"""
self.database_service._db_path = ""
self.database_service.add_table_for_query("TEST_TABLE")
result = self.database_service.query_data()
self.assertEqual(result, {})
mock_create_connect.assert_not_called()
@patch('msprof_analyze.prof_common.database_service.DBManager.destroy_db_connect')
@patch('pandas.read_sql')
@patch('msprof_analyze.prof_common.database_service.DBManager.get_table_columns_name')
@patch('msprof_analyze.prof_common.database_service.DBManager.judge_table_exists')
@patch('msprof_analyze.prof_common.database_service.DBManager.create_connect_db')
@patch('msprof_analyze.prof_common.database_service.os.path.exists')
def test_query_data_successful(self, mock_exists, mock_create_connect, mock_judge_table,
mock_get_columns, mock_read_sql, mock_destroy_connect):
"""测试成功查询数据的情况"""
mock_conn = MagicMock()
mock_cursor = MagicMock()
mock_create_connect.return_value = (mock_conn, mock_cursor)
mock_exists.return_value = True
mock_judge_table.return_value = True
mock_get_columns.return_value = ["col1", "col2", "col3"]
mock_data = pd.DataFrame({"col1": [1, 2], "col2": [3, 4], "col3": [5, 6]})
mock_read_sql.return_value = mock_data
self.database_service.add_table_for_query("TEST_TABLE", ["col1", "col2"])
result = self.database_service.query_data()
self.assertIn("TEST_TABLE", result)
pd.testing.assert_frame_equal(result["TEST_TABLE"], mock_data)
mock_create_connect.assert_called_once_with(self.db_path)
mock_destroy_connect.assert_called_once_with(mock_conn, mock_cursor)
@patch('msprof_analyze.prof_common.database_service.logger.warning')
@patch('msprof_analyze.prof_common.database_service.DBManager.destroy_db_connect')
@patch('msprof_analyze.prof_common.database_service.DBManager.judge_table_exists')
@patch('msprof_analyze.prof_common.database_service.DBManager.create_connect_db')
def test_query_data_table_not_exists(self, mock_create_connect, mock_judge_table,
mock_destroy_connect, mock_warning):
"""测试查询不存在的表的情况"""
mock_conn = MagicMock()
mock_cursor = MagicMock()
mock_create_connect.return_value = (mock_conn, mock_cursor)
mock_judge_table.return_value = False
self.database_service.add_table_for_query("NON_EXISTENT_TABLE")
result = self.database_service.query_data()
self.assertEqual(result, {})
@patch('msprof_analyze.prof_common.database_service.logger.error')
@patch('msprof_analyze.prof_common.database_service.DBManager.destroy_db_connect')
@patch('msprof_analyze.prof_common.database_service.DBManager.get_table_columns_name')
@patch('msprof_analyze.prof_common.database_service.DBManager.judge_table_exists')
@patch('msprof_analyze.prof_common.database_service.DBManager.create_connect_db')
@patch('msprof_analyze.prof_common.database_service.os.path.exists')
def test_query_data_invalid_columns(self, mock_exists, mock_create_connect, mock_judge_table,
mock_get_columns, mock_destroy_connect, mock_error):
"""测试查询无效列的情况"""
mock_conn = MagicMock()
mock_cursor = MagicMock()
mock_create_connect.return_value = (mock_conn, mock_cursor)
mock_exists.return_value = True
mock_judge_table.return_value = True
mock_get_columns.return_value = ["col1", "col2"]
self.database_service.add_table_for_query("TEST_TABLE", ["non_existent_col"])
result = self.database_service.query_data()
self.assertEqual(result, {})
expected_error = "The fields to be queried in Table TEST_TABLE are invalid."
mock_error.assert_called_once_with(expected_error)
if __name__ == '__main__':
unittest.main()