191 lines
6.3 KiB
Python
191 lines
6.3 KiB
Python
import csv
|
||
from typing import List, Dict, Union, Any, Optional
|
||
|
||
class CSVReader:
|
||
"""
|
||
CSV文件读取工具类
|
||
支持有表头和无表头模式,可按列索引或表头字段名返回数据
|
||
"""
|
||
|
||
def __init__(self, file_path: str):
|
||
"""
|
||
初始化CSV读取器
|
||
|
||
Args:
|
||
file_path: CSV文件路径
|
||
"""
|
||
self.file_path = file_path
|
||
|
||
def read(self, has_header: bool = True, encoding: str = 'utf-8') -> List[Dict[str, Any]]:
|
||
"""
|
||
读取CSV文件并返回查询结果
|
||
|
||
Args:
|
||
has_header: 是否有表头,默认为True
|
||
encoding: 文件编码,默认为utf-8
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 查询结果列表,每个元素是一行数据的字典表示
|
||
"""
|
||
data = []
|
||
|
||
with open(self.file_path, 'r', encoding=encoding) as file:
|
||
if has_header:
|
||
# 使用DictReader处理有表头的CSV
|
||
reader = csv.DictReader(file)
|
||
for row in reader:
|
||
data.append(dict(row))
|
||
else:
|
||
# 使用普通reader处理无表头的CSV
|
||
reader = csv.reader(file)
|
||
for row in reader:
|
||
# 为无表头的行创建字典,使用列索引作为键
|
||
row_dict = {f'col_{i}': value for i, value in enumerate(row)}
|
||
data.append(row_dict)
|
||
|
||
return data
|
||
|
||
def read_column(self, column: Union[int, str], has_header: bool = True,
|
||
encoding: str = 'utf-8') -> List[Any]:
|
||
"""
|
||
读取指定列的数据
|
||
|
||
Args:
|
||
column: 列索引(从0开始)或列名
|
||
has_header: 是否有表头
|
||
encoding: 文件编码
|
||
|
||
Returns:
|
||
List[Any]: 指定列的数据列表
|
||
"""
|
||
data = self.read(has_header, encoding)
|
||
|
||
if has_header and isinstance(column, str):
|
||
# 有表头且指定了列名
|
||
return [row[column] for row in data]
|
||
elif isinstance(column, int):
|
||
# 指定了列索引
|
||
if has_header:
|
||
# 有表头时需要获取列名
|
||
if data:
|
||
keys = list(data[0].keys())
|
||
if 0 <= column < len(keys):
|
||
column_name = keys[column]
|
||
return [row[column_name] for row in data]
|
||
else:
|
||
raise IndexError(f"Column index {column} out of range")
|
||
else:
|
||
return []
|
||
else:
|
||
# 无表头时使用默认列名
|
||
column_name = f'col_{column}'
|
||
return [row[column_name] for row in data if column_name in row]
|
||
else:
|
||
raise ValueError("Invalid column parameter")
|
||
|
||
def read_columns(self, columns: Dict[str, Union[int, str]], has_header: bool = True,
|
||
encoding: str = 'utf-8') -> List[Dict[str, Any]]:
|
||
"""
|
||
读取指定的多列数据,可以重命名列名
|
||
|
||
Args:
|
||
columns: 字典,键为返回结果中的列名,值为原CSV中的列索引或列名
|
||
has_header: 是否有表头
|
||
encoding: 文件编码
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 指定列的数据列表
|
||
"""
|
||
all_data = self.read(has_header, encoding)
|
||
result = []
|
||
|
||
# 获取所有列名
|
||
if all_data:
|
||
header_keys = list(all_data[0].keys())
|
||
else:
|
||
header_keys = []
|
||
|
||
for row in all_data:
|
||
new_row = {}
|
||
for new_name, old_column in columns.items():
|
||
if isinstance(old_column, str) and has_header:
|
||
# 按列名获取值
|
||
new_row[new_name] = row.get(old_column, '')
|
||
elif isinstance(old_column, int):
|
||
# 按列索引获取值
|
||
if has_header:
|
||
if 0 <= old_column < len(header_keys):
|
||
key = header_keys[old_column]
|
||
new_row[new_name] = row.get(key, '')
|
||
else:
|
||
new_row[new_name] = ''
|
||
else:
|
||
key = f'col_{old_column}'
|
||
new_row[new_name] = row.get(key, '')
|
||
result.append(new_row)
|
||
|
||
return result
|
||
|
||
# 使用示例
|
||
if __name__ == "__main__":
|
||
# 示例1: 有表头的CSV文件
|
||
# 假设有一个名为data.csv的文件内容如下:
|
||
# name,age,city
|
||
# Alice,25,Beijing
|
||
# Bob,30,Shanghai
|
||
# Charlie,35,Guangzhou
|
||
|
||
reader = CSVReader('../data.csv')
|
||
# 读取所有数据
|
||
#all_data = reader.read(has_header=False)
|
||
# 读取所有数据
|
||
# all_data = reader.read(has_header=True)
|
||
|
||
|
||
# print("所有数据:", all_data)
|
||
|
||
selected_data_no_header = reader.read_columns({
|
||
'company_name': 0
|
||
}, has_header=False)
|
||
print("所有数据:", selected_data_no_header)
|
||
|
||
selected_data_no_header = reader.read_column(0, has_header=False)
|
||
print("所有数据:", selected_data_no_header)
|
||
# # 读取指定列(按列名)
|
||
# names = reader.read_column('name', has_header=True)
|
||
# print("姓名列:", names)
|
||
#
|
||
# # 读取指定列(按索引)
|
||
# ages = reader.read_column(1, has_header=True)
|
||
# print("年龄列:", ages)
|
||
#
|
||
# # 读取多列并重命名
|
||
# selected_data = reader.read_columns({
|
||
# '姓名': 'name',
|
||
# '年龄': 1
|
||
# }, has_header=True)
|
||
# print("选择的数据:", selected_data)
|
||
#
|
||
# # 示例2: 无表头的CSV文件
|
||
# # 假设有一个名为data_no_header.csv的文件内容如下:
|
||
# # Alice,25,Beijing
|
||
# # Bob,30,Shanghai
|
||
# # Charlie,35,Guangzhou
|
||
#
|
||
# reader2 = CSVReader('data_no_header.csv')
|
||
#
|
||
# # 读取所有数据
|
||
# all_data_no_header = reader2.read(has_header=False)
|
||
# print("无表头所有数据:", all_data_no_header)
|
||
#
|
||
# # 读取指定列(按索引)
|
||
# first_column = reader2.read_column(0, has_header=False)
|
||
# print("第一列:", first_column)
|
||
#
|
||
# # 读取多列并指定名称
|
||
# selected_data_no_header = reader2.read_columns({
|
||
# '姓名': 0,
|
||
# '城市': 2
|
||
# }, has_header=False)
|
||
# print("无表头选择的数据:", selected_data_no_header)
|