import csv from typing import List, Dict, Union, Any, Optional class CSVReader: """ CSV文件读取工具类 支持有表头和无表头模式,可按列索引或表头字段名返回数据 """ def __init__(self, file_path: str): """ 初始化CSV读取器 Args: file_path: CSV文件路径 """ self.file_path = file_path def read(self, has_header: bool = True, encoding: str = 'utf-8') -> List[Dict[str, Any]]: """ 读取CSV文件并返回查询结果 Args: has_header: 是否有表头,默认为True encoding: 文件编码,默认为utf-8 Returns: List[Dict[str, Any]]: 查询结果列表,每个元素是一行数据的字典表示 """ data = [] with open(self.file_path, 'r', encoding=encoding) as file: if has_header: # 使用DictReader处理有表头的CSV reader = csv.DictReader(file) for row in reader: data.append(dict(row)) else: # 使用普通reader处理无表头的CSV reader = csv.reader(file) for row in reader: # 为无表头的行创建字典,使用列索引作为键 row_dict = {f'col_{i}': value for i, value in enumerate(row)} data.append(row_dict) return data def read_column(self, column: Union[int, str], has_header: bool = True, encoding: str = 'utf-8') -> List[Any]: """ 读取指定列的数据 Args: column: 列索引(从0开始)或列名 has_header: 是否有表头 encoding: 文件编码 Returns: List[Any]: 指定列的数据列表 """ data = self.read(has_header, encoding) if has_header and isinstance(column, str): # 有表头且指定了列名 return [row[column] for row in data] elif isinstance(column, int): # 指定了列索引 if has_header: # 有表头时需要获取列名 if data: keys = list(data[0].keys()) if 0 <= column < len(keys): column_name = keys[column] return [row[column_name] for row in data] else: raise IndexError(f"Column index {column} out of range") else: return [] else: # 无表头时使用默认列名 column_name = f'col_{column}' return [row[column_name] for row in data if column_name in row] else: raise ValueError("Invalid column parameter") def read_columns(self, columns: Dict[str, Union[int, str]], has_header: bool = True, encoding: str = 'utf-8') -> List[Dict[str, Any]]: """ 读取指定的多列数据,可以重命名列名 Args: columns: 字典,键为返回结果中的列名,值为原CSV中的列索引或列名 has_header: 是否有表头 encoding: 文件编码 Returns: List[Dict[str, Any]]: 指定列的数据列表 """ all_data = self.read(has_header, encoding) result = [] # 获取所有列名 if all_data: header_keys = list(all_data[0].keys()) else: header_keys = [] for row in all_data: new_row = {} for new_name, old_column in columns.items(): if isinstance(old_column, str) and has_header: # 按列名获取值 new_row[new_name] = row.get(old_column, '') elif isinstance(old_column, int): # 按列索引获取值 if has_header: if 0 <= old_column < len(header_keys): key = header_keys[old_column] new_row[new_name] = row.get(key, '') else: new_row[new_name] = '' else: key = f'col_{old_column}' new_row[new_name] = row.get(key, '') result.append(new_row) return result # 使用示例 if __name__ == "__main__": # 示例1: 有表头的CSV文件 # 假设有一个名为data.csv的文件内容如下: # name,age,city # Alice,25,Beijing # Bob,30,Shanghai # Charlie,35,Guangzhou reader = CSVReader('../data.csv') # 读取所有数据 #all_data = reader.read(has_header=False) # 读取所有数据 # all_data = reader.read(has_header=True) # print("所有数据:", all_data) selected_data_no_header = reader.read_columns({ 'company_name': 0 }, has_header=False) print("所有数据:", selected_data_no_header) selected_data_no_header = reader.read_column(0, has_header=False) print("所有数据:", selected_data_no_header) # # 读取指定列(按列名) # names = reader.read_column('name', has_header=True) # print("姓名列:", names) # # # 读取指定列(按索引) # ages = reader.read_column(1, has_header=True) # print("年龄列:", ages) # # # 读取多列并重命名 # selected_data = reader.read_columns({ # '姓名': 'name', # '年龄': 1 # }, has_header=True) # print("选择的数据:", selected_data) # # # 示例2: 无表头的CSV文件 # # 假设有一个名为data_no_header.csv的文件内容如下: # # Alice,25,Beijing # # Bob,30,Shanghai # # Charlie,35,Guangzhou # # reader2 = CSVReader('data_no_header.csv') # # # 读取所有数据 # all_data_no_header = reader2.read(has_header=False) # print("无表头所有数据:", all_data_no_header) # # # 读取指定列(按索引) # first_column = reader2.read_column(0, has_header=False) # print("第一列:", first_column) # # # 读取多列并指定名称 # selected_data_no_header = reader2.read_columns({ # '姓名': 0, # '城市': 2 # }, has_header=False) # print("无表头选择的数据:", selected_data_no_header)