改进脚本处理JSON对话数据

我可以改进这个脚本,使其更好地处理你提供的JSON格式。主要问题是原脚本期望的JSON结构与实际结构有所不同。以下是改进后的脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
import json
import os
import re
from datetime import datetime
from typing import Dict, List, Any, Optional

def extract_conversation_messages(mapping: Dict[str, Any], start_node_id: str = "root") -> List[Dict[str, Any]]:
"""
从mapping的树形结构中递归提取对话消息,处理复杂的树形结构
"""
messages = []
visited_nodes = set()

def traverse_node(node_id: str):
"""递归遍历节点"""
if node_id in visited_nodes or node_id not in mapping:
return

visited_nodes.add(node_id)
current_node = mapping[node_id]

# 处理当前节点的消息
message_data = current_node.get("message")
if message_data and isinstance(message_data, dict):
process_message_data(message_data)

# 递归处理子节点
children = current_node.get("children", [])
for child_id in children:
if isinstance(child_id, str):
traverse_node(child_id)

def process_message_data(message_data: Dict[str, Any]):
"""处理消息数据"""
fragments = message_data.get("fragments", [])
files = message_data.get("files", [])

# 处理文件附件
for file_info in files:
if isinstance(file_info, dict):
file_content = file_info.get('content', '')
file_name = file_info.get('file_name', 'unnamed')

# 只保存有内容的文件
if file_content.strip():
file_markdown = f"\n\n[文件: {file_name}]\n```\n{file_content}\n```\n"
messages.append({
"role": "system",
"content": file_markdown,
"type": "FILE_ATTACHMENT",
"inserted_at": message_data.get("inserted_at", "")
})

# 处理消息片段
for fragment in fragments:
if not isinstance(fragment, dict):
continue

fragment_type = fragment.get("type")
content = fragment.get("content", "")

if not content.strip():
continue

if fragment_type == "REQUEST":
role = "user"
elif fragment_type == "RESPONSE":
role = "assistant"
elif fragment_type == "THINK":
role = "assistant_think"
else:
continue

messages.append({
"role": role,
"content": content,
"type": fragment_type,
"inserted_at": message_data.get("inserted_at", "")
})

# 开始遍历
traverse_node(start_node_id)
return messages

def detect_code_language(content: str) -> str:
"""自动检测代码语言"""
content_lower = content.lower().strip()

if content_lower.startswith('<?php') or 'php' in content_lower:
return 'php'
elif content_lower.startswith('from ') or 'dockerfile' in content_lower:
return 'dockerfile'
elif content_lower.startswith('<script') or 'javascript' in content_lower:
return 'javascript'
elif content_lower.startswith('#!/'):
return 'bash'
elif content_lower.startswith('<?xml') or '<html' in content_lower:
return 'html'
elif content_lower.startswith('{') and ('"}' in content_lower or '},' in content_lower):
return 'json'
elif re.search(r'^\s*(def|class|import|from)\s', content_lower):
return 'python'
elif re.search(r'^\s*(function|const|let|var)\s', content_lower):
return 'javascript'
elif re.search(r'^\s*#', content_lower) and not content_lower.startswith('#!'):
return 'yaml'

return ''

def format_content(content: str, role: str) -> str:
"""格式化内容,智能识别代码块"""
if not content.strip():
return content

# 如果是系统消息或文件附件,直接返回
if role in ["system", "FILE_ATTACHMENT"]:
return content

# 检测是否为代码内容
lines = content.split('\n')
code_lines = 0

for line in lines:
stripped = line.strip()
# 检测代码特征:缩进、括号、分号等
if (stripped.startswith(('def ', 'class ', 'import ', 'from ', 'function ', 'const ', 'let ', 'var ')) or
' = ' in stripped or ';' in stripped or stripped.endswith('{') or stripped.endswith('}') or
stripped.startswith('<') and stripped.endswith('>') or
re.match(r'^\s*[{}()\[\]]', stripped)):
code_lines += 1

# 如果超过一定比例的行看起来像代码,则格式化为代码块
if len(lines) > 2 and code_lines / len(lines) > 0.3:
language = detect_code_language(content)
return f"```{language}\n{content}\n```"

return content

def sanitize_filename(filename: str, max_length: int = 50) -> str:
"""清理文件名,移除非法字符和换行符"""
if not filename:
return "untitled_conversation"

# 首先处理换行符、制表符等特殊空白字符
filename = filename.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ')

# 移除Windows文件名非法字符
illegal_chars = r'[<>:"/\\|?*]'
filename = re.sub(illegal_chars, '', filename)

# 移除首尾空格和点
filename = filename.strip().strip('.')

# 限制长度
if len(filename) > max_length:
# 尝试在空格处截断,保持可读性
if ' ' in filename:
parts = filename.split(' ')
truncated = []
current_length = 0
for part in parts:
if current_length + len(part) + 1 <= max_length - 3: # 保留空间给"..."
truncated.append(part)
current_length += len(part) + 1
else:
break
filename = ' '.join(truncated) + '...'
else:
filename = filename[:max_length - 3] + '...'

# 确保文件名不为空
if not filename:
filename = f"conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

return filename

def parse_timestamp(timestamp_str: str) -> str:
"""解析时间戳"""
if not timestamp_str:
return "Unknown time"

try:
# 尝试解析ISO格式时间
if 'T' in timestamp_str:
# 处理时区信息
if timestamp_str.endswith('Z'):
timestamp_str = timestamp_str[:-1] + '+00:00'
elif '+' in timestamp_str and ':' not in timestamp_str.split('+')[1]:
# 处理简化的时区格式
parts = timestamp_str.split('+')
timestamp_str = parts[0] + '+' + parts[1][:2] + ':' + parts[1][2:]

dt = datetime.fromisoformat(timestamp_str)
return dt.strftime("%Y-%m-%d %H:%M:%S")
else:
return timestamp_str
except (ValueError, AttributeError):
return str(timestamp_str)[:19]

def process_single_conversation(conversation: Dict[str, Any], output_folder: str, index: int, stats: Dict[str, int]) -> bool:
"""处理单个对话"""
# 提取对话信息
conv_id = conversation.get("id", f"unknown_{index}")
title = conversation.get("title", f"Conversation {conv_id}")
inserted_at = conversation.get("inserted_at", "")
mapping = conversation.get("mapping", {})

# 使用repr显示原始标题,便于调试
print(f"\n[{index + 1}] 处理: {repr(title)}")

# 提取消息
try:
messages = extract_conversation_messages(mapping)
stats['total_messages'] += len(messages)
except Exception as e:
print(f" 提取消息时出错: {e}")
stats['error'] += 1
return False

if not messages:
print(f" 跳过空对话")
stats['skipped'] += 1
return False

# 生成安全的文件名
valid_title = sanitize_filename(str(title))
filename = f"{valid_title}.md"
filepath = os.path.join(output_folder, filename)

# 处理重复文件名
counter = 1
base_name = valid_title
while os.path.exists(filepath):
filename = f"{base_name}_{counter}.md"
filepath = os.path.join(output_folder, filename)
counter += 1

# 创建Markdown内容
try:
md_content = [
f"# {title}",
f"",
f"**Conversation ID:** `{conv_id}` ",
f"**Created:** {parse_timestamp(inserted_at)}",
f"**Total Messages:** {len(messages)}",
f"",
"---",
f""
]

# 添加消息内容
for msg in messages:
role = msg["role"]
content = msg["content"]

# 添加角色标题
if role == "user":
md_content.append("### 👤 User")
elif role == "assistant":
md_content.append("### 🤖 Assistant")
elif role == "assistant_think":
md_content.append("### 🤖 Assistant (内部思考)")
elif role == "system":
md_content.append("### 📎 文件附件")

md_content.append("")

# 格式化内容
formatted_content = format_content(content, role)
md_content.append(formatted_content)
md_content.append("")
md_content.append("---")
md_content.append("")

# 写入文件
with open(filepath, 'w', encoding='utf-8') as md_file:
md_file.write('\n'.join(md_content))

print(f" 成功创建: {filename} ({len(messages)} 条消息)")
stats['successful'] += 1
return True

except Exception as e:
print(f" 创建文件失败: {e}")
stats['error'] += 1
return False

def json_to_obsidian():
"""主函数:将JSON转换为Obsidian格式"""
# 配置路径
json_path = "新建 文本文档 (6).txt" # 你的文本文件路径
output_folder = r"C:\Users\dog\Documents\Obsidian Vault\deepseek"

# 创建输出文件夹
os.makedirs(output_folder, exist_ok=True)

# 加载JSON数据
try:
with open(json_path, 'r', encoding='utf-8') as f:
content = f.read()

# 尝试解析为JSON
data = json.loads(content)
print(f"成功加载JSON数据")
except Exception as e:
print(f"加载JSON文件失败: {e}")
# 尝试修复可能的JSON格式问题
try:
# 如果是文本文件包含JSON,尝试提取JSON部分
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
data = json.loads(json_match.group())
print("从文本中提取JSON成功")
else:
raise ValueError("无法从文本中提取JSON")
except Exception as e2:
print(f"解析JSON失败: {e2}")
return

# 处理数据格式
if isinstance(data, dict):
conversations = [data]
elif isinstance(data, list):
conversations = data
else:
print("错误:不支持的JSON数据格式")
return

print(f"开始处理 {len(conversations)} 个对话...")

# 统计信息
stats = {
'successful': 0,
'skipped': 0,
'error': 0,
'total_messages': 0
}

# 处理每个对话
for index, conversation in enumerate(conversations):
if not isinstance(conversation, dict):
print(f"跳过非字典类型的对话 {index}")
stats['skipped'] += 1
continue

process_single_conversation(conversation, output_folder, index, stats)

# 打印总结
print(f"\n=== 处理完成 ===")
print(f"总对话数: {len(conversations)}")
print(f"成功导出: {stats['successful']}")
print(f"跳过: {stats['skipped']}")
print(f"错误: {stats['error']}")
print(f"总消息数: {stats['total_messages']}")
print(f"输出文件夹: {output_folder}")

if __name__ == "__main__":
json_to_obsidian()

主要改进:

  1. 更好的JSON处理:脚本现在可以处理文本文件中的JSON内容,而不仅仅是纯JSON文件
  2. 改进的代码检测:更智能地识别代码内容,包括YAML格式
  3. 时间戳解析:更好地处理ISO 8601时间格式,包括时区信息
  4. 文件处理优化:只保存有实际内容的文件附件
  5. 错误处理:更健壮的错误处理和恢复机制
  6. 模块化设计:将单个对话处理逻辑分离到独立函数中

使用方法:

  1. 将你的文本文件重命名为 新建 文本文档 (6).txt 或修改脚本中的路径
  2. 确保输出文件夹路径正确
  3. 运行脚本

这个改进版本应该能更好地处理你提供的对话数据格式。

时间: 2025-09-03T23:45:24


👤 用户

使用这个脚本出来json片段(新建文本文档6)不是太完美,能不能改进下。

时间: 2025-09-03T23:45:24


🤖 DeepSeek (deepseek-chat)

我可以改进这个脚本,使其更好地处理你提供的JSON格式。主要问题是原脚本期望的JSON结构与实际结构有所不同。以下是改进后的脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
import json
import os
import re
from datetime import datetime
from typing import Dict, List, Any, Optional

def extract_conversation_messages(mapping: Dict[str, Any], start_node_id: str = "root") -> List[Dict[str, Any]]:
"""
从mapping的树形结构中递归提取对话消息,处理复杂的树形结构
"""
messages = []
visited_nodes = set()

def traverse_node(node_id: str):
"""递归遍历节点"""
if node_id in visited_nodes or node_id not in mapping:
return

visited_nodes.add(node_id)
current_node = mapping[node_id]

# 处理当前节点的消息
message_data = current_node.get("message")
if message_data and isinstance(message_data, dict):
process_message_data(message_data)

# 递归处理子节点
children = current_node.get("children", [])
for child_id in children:
if isinstance(child_id, str):
traverse_node(child_id)

def process_message_data(message_data: Dict[str, Any]):
"""处理消息数据"""
fragments = message_data.get("fragments", [])
files = message_data.get("files", [])

# 处理文件附件
for file_info in files:
if isinstance(file_info, dict):
file_content = file_info.get('content', '')
file_name = file_info.get('file_name', 'unnamed')

# 只保存有内容的文件
if file_content.strip():
file_markdown = f"\n\n[文件: {file_name}]\n```\n{file_content}\n```\n"
messages.append({
"role": "system",
"content": file_markdown,
"type": "FILE_ATTACHMENT",
"inserted_at": message_data.get("inserted_at", "")
})

# 处理消息片段
for fragment in fragments:
if not isinstance(fragment, dict):
continue

fragment_type = fragment.get("type")
content = fragment.get("content", "")

if not content.strip():
continue

if fragment_type == "REQUEST":
role = "user"
elif fragment_type == "RESPONSE":
role = "assistant"
elif fragment_type == "THINK":
role = "assistant_think"
else:
continue

messages.append({
"role": role,
"content": content,
"type": fragment_type,
"inserted_at": message_data.get("inserted_at", "")
})

# 开始遍历
traverse_node(start_node_id)
return messages

def detect_code_language(content: str) -> str:
"""自动检测代码语言"""
content_lower = content.lower().strip()

if content_lower.startswith('<?php') or 'php' in content_lower:
return 'php'
elif content_lower.startswith('from ') or 'dockerfile' in content_lower:
return 'dockerfile'
elif content_lower.startswith('<script') or 'javascript' in content_lower:
return 'javascript'
elif content_lower.startswith('#!/'):
return 'bash'
elif content_lower.startswith('<?xml') or '<html' in content_lower:
return 'html'
elif content_lower.startswith('{') and ('"}' in content_lower or '},' in content_lower):
return 'json'
elif re.search(r'^\s*(def|class|import|from)\s', content_lower):
return 'python'
elif re.search(r'^\s*(function|const|let|var)\s', content_lower):
return 'javascript'
elif re.search(r'^\s*#', content_lower) and not content_lower.startswith('#!'):
return 'yaml'

return ''

def format_content(content: str, role: str) -> str:
"""格式化内容,智能识别代码块"""
if not content.strip():
return content

# 如果是系统消息或文件附件,直接返回
if role in ["system", "FILE_ATTACHMENT"]:
return content

# 检测是否为代码内容
lines = content.split('\n')
code_lines = 0

for line in lines:
stripped = line.strip()
# 检测代码特征:缩进、括号、分号等
if (stripped.startswith(('def ', 'class ', 'import ', 'from ', 'function ', 'const ', 'let ', 'var ')) or
' = ' in stripped or ';' in stripped or stripped.endswith('{') or stripped.endswith('}') or
stripped.startswith('<') and stripped.endswith('>') or
re.match(r'^\s*[{}()\[\]]', stripped)):
code_lines += 1

# 如果超过一定比例的行看起来像代码,则格式化为代码块
if len(lines) > 2 and code_lines / len(lines) > 0.3:
language = detect_code_language(content)
return f"```{language}\n{content}\n```"

return content

def sanitize_filename(filename: str, max_length: int = 50) -> str:
"""清理文件名,移除非法字符和换行符"""
if not filename:
return "untitled_conversation"

# 首先处理换行符、制表符等特殊空白字符
filename = filename.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ')

# 移除Windows文件名非法字符
illegal_chars = r'[<>:"/\\|?*]'
filename = re.sub(illegal_chars, '', filename)

# 移除首尾空格和点
filename = filename.strip().strip('.')

# 限制长度
if len(filename) > max_length:
# 尝试在空格处截断,保持可读性
if ' ' in filename:
parts = filename.split(' ')
truncated = []
current_length = 0
for part in parts:
if current_length + len(part) + 1 <= max_length - 3: # 保留空间给"..."
truncated.append(part)
current_length += len(part) + 1
else:
break
filename = ' '.join(truncated) + '...'
else:
filename = filename[:max_length - 3] + '...'

# 确保文件名不为空
if not filename:
filename = f"conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

return filename

def parse_timestamp(timestamp_str: str) -> str:
"""解析时间戳"""
if not timestamp_str:
return "Unknown time"

try:
# 尝试解析ISO格式时间
if 'T' in timestamp_str:
# 处理时区信息
if timestamp_str.endswith('Z'):
timestamp_str = timestamp_str[:-1] + '+00:00'
elif '+' in timestamp_str and ':' not in timestamp_str.split('+')[1]:
# 处理简化的时区格式
parts = timestamp_str.split('+')
timestamp_str = parts[0] + '+' + parts[1][:2] + ':' + parts[1][2:]

dt = datetime.fromisoformat(timestamp_str)
return dt.strftime("%Y-%m-%d %H:%M:%S")
else:
return timestamp_str
except (ValueError, AttributeError):
return str(timestamp_str)[:19]

def process_single_conversation(conversation: Dict[str, Any], output_folder: str, index: int, stats: Dict[str, int]) -> bool:
"""处理单个对话"""
# 提取对话信息
conv_id = conversation.get("id", f"unknown_{index}")
title = conversation.get("title", f"Conversation {conv_id}")
inserted_at = conversation.get("inserted_at", "")
mapping = conversation.get("mapping", {})

# 使用repr显示原始标题,便于调试
print(f"\n[{index + 1}] 处理: {repr(title)}")

# 提取消息
try:
messages = extract_conversation_messages(mapping)
stats['total_messages'] += len(messages)
except Exception as e:
print(f" 提取消息时出错: {e}")
stats['error'] += 1
return False

if not messages:
print(f" 跳过空对话")
stats['skipped'] += 1
return False

# 生成安全的文件名
valid_title = sanitize_filename(str(title))
filename = f"{valid_title}.md"
filepath = os.path.join(output_folder, filename)

# 处理重复文件名
counter = 1
base_name = valid_title
while os.path.exists(filepath):
filename = f"{base_name}_{counter}.md"
filepath = os.path.join(output_folder, filename)
counter += 1

# 创建Markdown内容
try:
md_content = [
f"# {title}",
f"",
f"**Conversation ID:** `{conv_id}` ",
f"**Created:** {parse_timestamp(inserted_at)}",
f"**Total Messages:** {len(messages)}",
f"",
"---",
f""
]

# 添加消息内容
for msg in messages:
role = msg["role"]
content = msg["content"]

# 添加角色标题
if role == "user":
md_content.append("### 👤 User")
elif role == "assistant":
md_content.append("### 🤖 Assistant")
elif role == "assistant_think":
md_content.append("### 🤖 Assistant (内部思考)")
elif role == "system":
md_content.append("### 📎 文件附件")

md_content.append("")

# 格式化内容
formatted_content = format_content(content, role)
md_content.append(formatted_content)
md_content.append("")
md_content.append("---")
md_content.append("")

# 写入文件
with open(filepath, 'w', encoding='utf-8') as md_file:
md_file.write('\n'.join(md_content))

print(f" 成功创建: {filename} ({len(messages)} 条消息)")
stats['successful'] += 1
return True

except Exception as e:
print(f" 创建文件失败: {e}")
stats['error'] += 1
return False

def json_to_obsidian():
"""主函数:将JSON转换为Obsidian格式"""
# 配置路径
json_path = "新建 文本文档 (6).txt" # 你的文本文件路径
output_folder = r"C:\Users\dog\Documents\Obsidian Vault\deepseek"

# 创建输出文件夹
os.makedirs(output_folder, exist_ok=True)

# 加载JSON数据
try:
with open(json_path, 'r', encoding='utf-8') as f:
content = f.read()

# 尝试解析为JSON
data = json.loads(content)
print(f"成功加载JSON数据")
except Exception as e:
print(f"加载JSON文件失败: {e}")
# 尝试修复可能的JSON格式问题
try:
# 如果是文本文件包含JSON,尝试提取JSON部分
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
data = json.loads(json_match.group())
print("从文本中提取JSON成功")
else:
raise ValueError("无法从文本中提取JSON")
except Exception as e2:
print(f"解析JSON失败: {e2}")
return

# 处理数据格式
if isinstance(data, dict):
conversations = [data]
elif isinstance(data, list):
conversations = data
else:
print("错误:不支持的JSON数据格式")
return

print(f"开始处理 {len(conversations)} 个对话...")

# 统计信息
stats = {
'successful': 0,
'skipped': 0,
'error': 0,
'total_messages': 0
}

# 处理每个对话
for index, conversation in enumerate(conversations):
if not isinstance(conversation, dict):
print(f"跳过非字典类型的对话 {index}")
stats['skipped'] += 1
continue

process_single_conversation(conversation, output_folder, index, stats)

# 打印总结
print(f"\n=== 处理完成 ===")
print(f"总对话数: {len(conversations)}")
print(f"成功导出: {stats['successful']}")
print(f"跳过: {stats['skipped']}")
print(f"错误: {stats['error']}")
print(f"总消息数: {stats['total_messages']}")
print(f"输出文件夹: {output_folder}")

if __name__ == "__main__":
json_to_obsidian()

主要改进:

  1. 更好的JSON处理:脚本现在可以处理文本文件中的JSON内容,而不仅仅是纯JSON文件
  2. 改进的代码检测:更智能地识别代码内容,包括YAML格式
  3. 时间戳解析:更好地处理ISO 8601时间格式,包括时区信息
  4. 文件处理优化:只保存有实际内容的文件附件
  5. 错误处理:更健壮的错误处理和恢复机制
  6. 模块化设计:将单个对话处理逻辑分离到独立函数中

使用方法:

  1. 将你的文本文件重命名为 新建 文本文档 (6).txt 或修改脚本中的路径
  2. 确保输出文件夹路径正确
  3. 运行脚本

这个改进版本应该能更好地处理你提供的对话数据格式。

时间: 2025-09-03T23:45:24



对话统计

  • 消息总数: 2 条
  • 处理状态: 成功转换