1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2023 Huawei Device Co., Ltd. 5# 6# HDF is dual licensed: you can use it either under the terms of 7# the GPL, or the BSD license, at your option. 8# See the LICENSE file in the root of this repository for complete details. 9 10 11import json 12import os 13import re 14import shutil 15import string 16 17import hdf_tool_settings 18import hdf_utils 19from command_line.hdf_command_error_code import CommandErrorCode 20from command_line.hdf_command_handler_base import HdfCommandHandlerBase 21from command_line.hdf_device_info_hcs import HdfDeviceInfoHcsFile 22from command_line.hdi_operate.hdi_get_handler import HdiGetHandler 23from command_line.operate_group_passwd import OperateGroupPasswd 24from hdf_tool_exception import HdfToolException 25 26 27class HdiDeleteHandler(HdfCommandHandlerBase): 28 def __init__(self, args): 29 super(HdiDeleteHandler, self).__init__() 30 self.cmd = 'deletei' 31 self.parser.add_argument("--action_type", 32 help=' '.join(self.handlers.keys()), 33 required=True) 34 self.parser.add_argument("--root_dir", required=True) 35 self.parser.add_argument("--vendor_name") 36 self.parser.add_argument("--board_name") 37 self.parser.add_argument("--driver_name") 38 self.args_original = args 39 self.root = None 40 self.type = None 41 self.vendor = None 42 self.board = None 43 self.driver = None 44 self.args = self.parser.parse_args(args) 45 self.hdi_get = self._call_hdi_get() 46 self.config_info = None 47 self.create_info = None 48 49 def run(self): 50 self.root = self.args.root_dir 51 self.operate_delete() 52 53 def _call_hdi_get(self): 54 get_argument = ["--action_type", "--root_dir"] 55 get_args = [] 56 for argument_name in get_argument: 57 index_num = self.args_original.index(argument_name) 58 get_args.append(self.args_original[index_num]) 59 get_args.append(self.args_original[index_num + 1]) 60 return HdiGetHandler(get_args) 61 62 def _del_comm_handler(self, type_name): 63 return self.hdi_get.get_type_create_info(type_name)() 64 65 def operate_delete(self): 66 self.type = self.args.action_type 67 self.driver = self.args.driver_name 68 if self.type == "interface": 69 temp_json = self._del_comm_handler("interface") 70 elif self.type == "peripheral": 71 temp_json = self._del_comm_handler("peripheral") 72 elif self.type == "unittest": 73 temp_json = self._del_comm_handler("unittest") 74 else: 75 temp_json = {} 76 json_format = json.loads(temp_json) 77 res_json = json_format.get(self.driver) 78 if res_json is None: 79 raise HdfToolException( 80 "this %s driver name is not exists" % self.driver, 81 CommandErrorCode.TARGET_NOT_EXIST) 82 self.config_info = res_json.get("config") 83 self.create_info = res_json.get("create_file") 84 self.operate_create_file() 85 self.operate_config_file() 86 self.modify_idl_config() 87 del json_format[self.driver] 88 print(json.dumps(json_format, indent=4)) 89 90 def modify_idl_config(self): 91 hdf_setting = hdf_tool_settings.HdfToolSettings() 92 config_hdi_dir_path = hdf_setting.get_file_path() 93 config_dict = hdf_setting.get_config_setting_info() 94 hdi_config_path = os.path.join( 95 config_hdi_dir_path, 96 config_dict.get("config_pre_dir"), 97 config_dict.get("create_hdi_file")) 98 hdi_config_info = hdf_utils.read_file(hdi_config_path) 99 json_hdi_info = json.loads(hdi_config_info) 100 del json_hdi_info[self.type][self.driver] 101 hdf_utils.write_file(hdi_config_path, json.dumps(json_hdi_info, indent=4)) 102 103 def operate_create_file(self): 104 re_dir = r".*%s/" % self.driver 105 pre_handler = re.search(re_dir, self.create_info[0]) 106 for file_path in self.create_info: 107 if os.path.exists(file_path): 108 os.remove(file_path) 109 if pre_handler is None: 110 return 111 pre_handler = pre_handler.group() 112 pre_handler_file_list = os.listdir(pre_handler) 113 state_flag = False 114 if len(pre_handler_file_list) == 1: 115 dir_path = os.path.join(pre_handler, pre_handler_file_list[0]) 116 while True and os.path.isdir(dir_path): 117 temp_list = os.listdir(dir_path) 118 if len(temp_list) == 1: 119 dir_path = os.path.join(dir_path, temp_list[0]) 120 elif len(temp_list) == 0: 121 state_flag = True 122 break 123 else: 124 break 125 if state_flag: 126 shutil.rmtree(pre_handler) 127 128 def operate_config_file(self): 129 if self.type == "interface": 130 for file_path in self.config_info: 131 self._delete_interface_config_file(file_path) 132 elif self.type == "peripheral": 133 for file_path in self.config_info: 134 self._delete_peripheral_config_file(file_path) 135 elif self.type == "unittest": 136 for file_path in self.config_info: 137 self._delete_unittest_config_file(file_path) 138 139 def _delete_peripheral_config_file(self, config_json_path): 140 hdi_config = hdf_tool_settings.HdiToolConfig() 141 if config_json_path.endswith(".hcs"): 142 HdfDeviceInfoHcsFile.hdi_hcs_delete(config_json_path, self.driver) 143 elif config_json_path.endswith(".te"): 144 if config_json_path.endswith("type.te"): 145 _, selinux_temp = hdi_config.get_hdi_selinux_type() 146 elif config_json_path.endswith("hdf_service.te"): 147 _, selinux_temp = hdi_config.get_hdi_selinux_hdf_service() 148 elif config_json_path.endswith("hdf_host.te"): 149 _, selinux_temp = hdi_config.get_hdi_selinux_hdf_host() 150 self._selinux_config_file_delete(selinux_temp, config_json_path) 151 elif config_json_path.endswith("group") \ 152 or config_json_path.endswith("passwd"): 153 hdi_config = hdf_tool_settings.HdfToolSettings() 154 group_passwd = OperateGroupPasswd(tool_settings=hdi_config, root_path=self.root) 155 group_passwd.delete_group_passwd(name=self.args.driver_name, file_path=config_json_path) 156 else: 157 _, selinux_temp = hdi_config.get_hdi_selinux_hdf_service_contexts() 158 self._selinux_config_file_delete(selinux_temp, config_json_path, align=True) 159 160 def _selinux_config_file_delete(self, selinux_temp, config_path, align=False): 161 if isinstance(selinux_temp["info_temp"], str): 162 if align: 163 temp_new_line = string.Template( 164 selinux_temp["info_temp"]).safe_substitute( 165 {"peripheral_name": self.driver}) 166 temp_line = temp_new_line.split(" ") 167 space_num = selinux_temp["space_len"] - len(temp_line[0]) 168 new_line = (" " * space_num).join(temp_line) 169 else: 170 new_line = string.Template( 171 selinux_temp["info_temp"]).safe_substitute( 172 {"peripheral_name": self.driver}) 173 hdf_service_contexts_lines = hdf_utils.read_file_lines(config_path) 174 hdf_service_contexts_lines.remove(new_line) 175 elif isinstance(selinux_temp["info_temp"], list): 176 hdf_service_contexts_lines = hdf_utils.read_file_lines(config_path) 177 temp_re = r"pid=\d+ scontext=u:r:{driver_name}_host:s0".format(driver_name=self.driver) 178 for line in hdf_service_contexts_lines: 179 temp_res = re.search(temp_re, line) 180 if temp_res is not None: 181 pid_num = re.search(r"\d+", temp_res.group()).group() 182 break 183 temp_replace_list = [] 184 for temp_line in selinux_temp["info_temp"]: 185 temp_replace_list.append( 186 string.Template(temp_line).safe_substitute({ 187 "pid_num": pid_num, 188 "peripheral_name": self.driver 189 })) 190 [hdf_service_contexts_lines.remove(temp) for temp in temp_replace_list] 191 hdf_utils.write_file_lines(config_path, hdf_service_contexts_lines) 192 193 def _delete_interface_config_file(self, config_json_path): 194 file_config = hdf_utils.read_file(config_json_path) 195 file_json_type = json.loads(file_config) 196 for info in file_json_type['subsystems']: 197 if info.get("subsystem") == "hdf": 198 key_list_name = [] 199 for key_name in info.get("components"): 200 key_list_name.append(key_name.get('component')) 201 component_name = 'drivers_interface_%s' % self.driver 202 if component_name in key_list_name: 203 interface_dict = { 204 'component': component_name, 205 'features': []} 206 info.get("components").remove(interface_dict) 207 hdf_utils.write_file( 208 config_json_path, json.dumps(file_json_type, indent=4)) 209 210 def _delete_unittest_config_file(self, config_json_path): 211 file_info_str = hdf_utils.read_file(config_json_path) 212 file_info_json = json.loads(file_info_str) 213 pre_dict = file_info_json['component']["build"] 214 temp_str = pre_dict[list(pre_dict.keys())[0]][0].split(":")[0] 215 result_test_str = "/".join([temp_str, "test:{peripheral_name}_unittest". 216 format(peripheral_name=self.driver)]) 217 if result_test_str in pre_dict[list(pre_dict.keys())[1]]: 218 pre_dict[list(pre_dict.keys())[1]].remove(result_test_str) 219 hdf_utils.write_file( 220 config_json_path, content=json.dumps(file_info_json, indent=4)) 221