forked from HJYHJYHJY/xianyu-auto-reply
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb_manager.py
More file actions
3987 lines (3466 loc) · 170 KB
/
db_manager.py
File metadata and controls
3987 lines (3466 loc) · 170 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import sqlite3
import os
import threading
import hashlib
import time
import json
import random
import string
import aiohttp
import io
import base64
from PIL import Image, ImageDraw, ImageFont
from typing import List, Tuple, Dict, Optional, Any
from loguru import logger
class DBManager:
"""SQLite数据库管理,持久化存储Cookie和关键字"""
def __init__(self, db_path: str = None):
"""初始化数据库连接和表结构"""
# 支持环境变量配置数据库路径
if db_path is None:
db_path = os.getenv('DB_PATH', 'xianyu_data.db')
# 确保数据目录存在并有正确权限
db_dir = os.path.dirname(db_path)
if db_dir and not os.path.exists(db_dir):
try:
os.makedirs(db_dir, mode=0o755, exist_ok=True)
logger.info(f"创建数据目录: {db_dir}")
except PermissionError as e:
logger.error(f"创建数据目录失败,权限不足: {e}")
# 尝试使用当前目录
db_path = os.path.basename(db_path)
logger.warning(f"使用当前目录作为数据库路径: {db_path}")
except Exception as e:
logger.error(f"创建数据目录失败: {e}")
raise
# 检查目录权限
if db_dir and os.path.exists(db_dir):
if not os.access(db_dir, os.W_OK):
logger.error(f"数据目录没有写权限: {db_dir}")
# 尝试使用当前目录
db_path = os.path.basename(db_path)
logger.warning(f"使用当前目录作为数据库路径: {db_path}")
self.db_path = db_path
logger.info(f"数据库路径: {self.db_path}")
self.conn = None
self.lock = threading.RLock() # 使用可重入锁保护数据库操作
# SQL日志配置 - 默认启用
self.sql_log_enabled = True # 默认启用SQL日志
self.sql_log_level = 'INFO' # 默认使用INFO级别
# 允许通过环境变量覆盖默认设置
if os.getenv('SQL_LOG_ENABLED'):
self.sql_log_enabled = os.getenv('SQL_LOG_ENABLED', 'true').lower() == 'true'
if os.getenv('SQL_LOG_LEVEL'):
self.sql_log_level = os.getenv('SQL_LOG_LEVEL', 'INFO').upper()
logger.info(f"SQL日志已启用,日志级别: {self.sql_log_level}")
self.init_db()
def init_db(self):
"""初始化数据库表结构"""
try:
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
cursor = self.conn.cursor()
# 创建用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建邮箱验证码表
cursor.execute('''
CREATE TABLE IF NOT EXISTS email_verifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL,
code TEXT NOT NULL,
expires_at TIMESTAMP NOT NULL,
used BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建图形验证码表
cursor.execute('''
CREATE TABLE IF NOT EXISTS captcha_codes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
code TEXT NOT NULL,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建cookies表(添加user_id字段和auto_confirm字段)
cursor.execute('''
CREATE TABLE IF NOT EXISTS cookies (
id TEXT PRIMARY KEY,
value TEXT NOT NULL,
user_id INTEGER NOT NULL,
auto_confirm INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)
''')
# 创建keywords表
cursor.execute('''
CREATE TABLE IF NOT EXISTS keywords (
cookie_id TEXT,
keyword TEXT,
reply TEXT,
item_id TEXT,
type TEXT DEFAULT 'text',
image_url TEXT,
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE
)
''')
# 创建cookie_status表
cursor.execute('''
CREATE TABLE IF NOT EXISTS cookie_status (
cookie_id TEXT PRIMARY KEY,
enabled BOOLEAN DEFAULT TRUE,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE
)
''')
# 创建AI回复配置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS ai_reply_settings (
cookie_id TEXT PRIMARY KEY,
ai_enabled BOOLEAN DEFAULT FALSE,
model_name TEXT DEFAULT 'qwen-plus',
api_key TEXT,
base_url TEXT DEFAULT 'https://dashscope.aliyuncs.com/compatible-mode/v1',
max_discount_percent INTEGER DEFAULT 10,
max_discount_amount INTEGER DEFAULT 100,
max_bargain_rounds INTEGER DEFAULT 3,
custom_prompts TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE
)
''')
# 创建AI对话历史表
cursor.execute('''
CREATE TABLE IF NOT EXISTS ai_conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cookie_id TEXT NOT NULL,
chat_id TEXT NOT NULL,
user_id TEXT NOT NULL,
item_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
intent TEXT,
bargain_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (cookie_id) REFERENCES cookies (id) ON DELETE CASCADE
)
''')
# 创建AI商品信息缓存表
cursor.execute('''
CREATE TABLE IF NOT EXISTS ai_item_cache (
item_id TEXT PRIMARY KEY,
data TEXT NOT NULL,
price REAL,
description TEXT,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建卡券表
cursor.execute('''
CREATE TABLE IF NOT EXISTS cards (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
type TEXT NOT NULL CHECK (type IN ('api', 'text', 'data', 'image')),
api_config TEXT,
text_content TEXT,
data_content TEXT,
image_url TEXT,
description TEXT,
enabled BOOLEAN DEFAULT TRUE,
delay_seconds INTEGER DEFAULT 0,
is_multi_spec BOOLEAN DEFAULT FALSE,
spec_name TEXT,
spec_value TEXT,
user_id INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# 检查并添加 user_id 列(用于数据库迁移)
try:
self._execute_sql(cursor, "SELECT user_id FROM cards LIMIT 1")
except sqlite3.OperationalError:
# user_id 列不存在,需要添加
logger.info("正在为 cards 表添加 user_id 列...")
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN user_id INTEGER NOT NULL DEFAULT 1")
self._execute_sql(cursor, "CREATE INDEX IF NOT EXISTS idx_cards_user_id ON cards(user_id)")
logger.info("cards 表 user_id 列添加完成")
# 检查并添加 delay_seconds 列(用于自动发货延时功能)
try:
self._execute_sql(cursor, "SELECT delay_seconds FROM cards LIMIT 1")
except sqlite3.OperationalError:
# delay_seconds 列不存在,需要添加
logger.info("正在为 cards 表添加 delay_seconds 列...")
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN delay_seconds INTEGER DEFAULT 0")
logger.info("cards 表 delay_seconds 列添加完成")
# 检查并添加 item_id 列(用于自动回复商品ID功能)
try:
self._execute_sql(cursor, "SELECT item_id FROM keywords LIMIT 1")
except sqlite3.OperationalError:
# item_id 列不存在,需要添加
logger.info("正在为 keywords 表添加 item_id 列...")
self._execute_sql(cursor, "ALTER TABLE keywords ADD COLUMN item_id TEXT")
logger.info("keywords 表 item_id 列添加完成")
# 创建商品信息表
cursor.execute('''
CREATE TABLE IF NOT EXISTS item_info (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cookie_id TEXT NOT NULL,
item_id TEXT NOT NULL,
item_title TEXT,
item_description TEXT,
item_category TEXT,
item_price TEXT,
item_detail TEXT,
is_multi_spec BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE,
UNIQUE(cookie_id, item_id)
)
''')
# 创建自动发货规则表
cursor.execute('''
CREATE TABLE IF NOT EXISTS delivery_rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT NOT NULL,
card_id INTEGER NOT NULL,
delivery_count INTEGER DEFAULT 1,
enabled BOOLEAN DEFAULT TRUE,
description TEXT,
delivery_times INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (card_id) REFERENCES cards(id) ON DELETE CASCADE
)
''')
# 创建默认回复表
cursor.execute('''
CREATE TABLE IF NOT EXISTS default_replies (
cookie_id TEXT PRIMARY KEY,
enabled BOOLEAN DEFAULT FALSE,
reply_content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE
)
''')
# 创建通知渠道表
cursor.execute('''
CREATE TABLE IF NOT EXISTS notification_channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
type TEXT NOT NULL CHECK (type IN ('qq')),
config TEXT NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建系统设置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
description TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建消息通知配置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS message_notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cookie_id TEXT NOT NULL,
channel_id INTEGER NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE,
FOREIGN KEY (channel_id) REFERENCES notification_channels(id) ON DELETE CASCADE,
UNIQUE(cookie_id, channel_id)
)
''')
# 创建用户设置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_settings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
UNIQUE(user_id, key)
)
''')
# 插入默认系统设置(不包括管理员密码,由reply_server.py初始化)
cursor.execute('''
INSERT OR IGNORE INTO system_settings (key, value, description) VALUES
('theme_color', 'blue', '主题颜色')
''')
# 检查并升级数据库
self.check_and_upgrade_db(cursor)
# 执行数据库迁移
self._migrate_database(cursor)
self.conn.commit()
logger.info("数据库初始化完成")
except Exception as e:
logger.error(f"数据库初始化失败: {e}")
self.conn.rollback()
raise
def _migrate_database(self, cursor):
"""执行数据库迁移"""
try:
# 检查cards表是否存在image_url列
cursor.execute("PRAGMA table_info(cards)")
columns = [column[1] for column in cursor.fetchall()]
if 'image_url' not in columns:
logger.info("添加cards表的image_url列...")
cursor.execute("ALTER TABLE cards ADD COLUMN image_url TEXT")
logger.info("数据库迁移完成:添加image_url列")
# 检查并更新CHECK约束(重建表以支持image类型)
self._update_cards_table_constraints(cursor)
except Exception as e:
logger.error(f"数据库迁移失败: {e}")
# 迁移失败不应该阻止程序启动
pass
def _update_cards_table_constraints(self, cursor):
"""更新cards表的CHECK约束以支持image类型"""
try:
# 尝试插入一个测试的image类型记录来检查约束
cursor.execute('''
INSERT INTO cards (name, type, user_id)
VALUES ('__test_image_constraint__', 'image', 1)
''')
# 如果插入成功,立即删除测试记录
cursor.execute("DELETE FROM cards WHERE name = '__test_image_constraint__'")
logger.info("cards表约束检查通过,支持image类型")
except Exception as e:
if "CHECK constraint failed" in str(e) or "constraint" in str(e).lower():
logger.info("检测到旧的CHECK约束,开始更新cards表...")
# 重建表以更新约束
try:
# 1. 创建新表
cursor.execute('''
CREATE TABLE IF NOT EXISTS cards_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
type TEXT NOT NULL CHECK (type IN ('api', 'text', 'data', 'image')),
api_config TEXT,
text_content TEXT,
data_content TEXT,
image_url TEXT,
description TEXT,
enabled BOOLEAN DEFAULT TRUE,
delay_seconds INTEGER DEFAULT 0,
is_multi_spec BOOLEAN DEFAULT FALSE,
spec_name TEXT,
spec_value TEXT,
user_id INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# 2. 复制数据
cursor.execute('''
INSERT INTO cards_new (id, name, type, api_config, text_content, data_content, image_url,
description, enabled, delay_seconds, is_multi_spec, spec_name, spec_value,
user_id, created_at, updated_at)
SELECT id, name, type, api_config, text_content, data_content, image_url,
description, enabled, delay_seconds, is_multi_spec, spec_name, spec_value,
user_id, created_at, updated_at
FROM cards
''')
# 3. 删除旧表
cursor.execute("DROP TABLE cards")
# 4. 重命名新表
cursor.execute("ALTER TABLE cards_new RENAME TO cards")
logger.info("cards表约束更新完成,现在支持image类型")
except Exception as rebuild_error:
logger.error(f"重建cards表失败: {rebuild_error}")
# 如果重建失败,尝试回滚
try:
cursor.execute("DROP TABLE IF EXISTS cards_new")
except:
pass
else:
logger.error(f"检查cards表约束时出现未知错误: {e}")
def check_and_upgrade_db(self, cursor):
"""检查数据库版本并执行必要的升级"""
try:
# 获取当前数据库版本
current_version = self.get_system_setting("db_version") or "1.0"
logger.info(f"当前数据库版本: {current_version}")
if current_version == "1.0":
logger.info("开始升级数据库到版本1.0...")
self.update_admin_user_id(cursor)
self.set_system_setting("db_version", "1.0", "数据库版本号")
logger.info("数据库升级到版本1.0完成")
# 如果版本低于需要升级的版本,执行升级
if current_version < "1.1":
logger.info("开始升级数据库到版本1.1...")
self.upgrade_notification_channels_table(cursor)
self.set_system_setting("db_version", "1.1", "数据库版本号")
logger.info("数据库升级到版本1.1完成")
# 升级到版本1.2 - 支持更多通知渠道类型
if current_version < "1.2":
logger.info("开始升级数据库到版本1.2...")
self.upgrade_notification_channels_types(cursor)
self.set_system_setting("db_version", "1.2", "数据库版本号")
logger.info("数据库升级到版本1.2完成")
# 升级到版本1.3 - 添加关键词类型和图片URL字段
if current_version < "1.3":
logger.info("开始升级数据库到版本1.3...")
self.upgrade_keywords_table_for_image_support(cursor)
self.set_system_setting("db_version", "1.3", "数据库版本号")
logger.info("数据库升级到版本1.3完成")
# 迁移遗留数据(在所有版本升级完成后执行)
self.migrate_legacy_data(cursor)
except Exception as e:
logger.error(f"数据库版本检查或升级失败: {e}")
raise
def update_admin_user_id(self, cursor):
"""更新admin用户ID"""
try:
logger.info("开始更新admin用户ID...")
# 创建默认admin用户(只在首次初始化时创建)
cursor.execute('SELECT COUNT(*) FROM users WHERE username = ?', ('admin',))
admin_exists = cursor.fetchone()[0] > 0
if not admin_exists:
# 首次创建admin用户,设置默认密码
default_password_hash = hashlib.sha256("admin123".encode()).hexdigest()
cursor.execute('''
INSERT INTO users (username, email, password_hash) VALUES
('admin', 'admin@localhost', ?)
''', (default_password_hash,))
logger.info("创建默认admin用户,密码: admin123")
# 获取admin用户ID,用于历史数据绑定
self._execute_sql(cursor, "SELECT id FROM users WHERE username = 'admin'")
admin_user = cursor.fetchone()
if admin_user:
admin_user_id = admin_user[0]
# 将历史cookies数据绑定到admin用户(如果user_id列不存在)
try:
self._execute_sql(cursor, "SELECT user_id FROM cookies LIMIT 1")
except sqlite3.OperationalError:
# user_id列不存在,需要添加并更新历史数据
self._execute_sql(cursor, "ALTER TABLE cookies ADD COLUMN user_id INTEGER")
self._execute_sql(cursor, "UPDATE cookies SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
else:
# user_id列存在,更新NULL值
self._execute_sql(cursor, "UPDATE cookies SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
# 为cookies表添加auto_confirm字段(如果不存在)
try:
self._execute_sql(cursor, "SELECT auto_confirm FROM cookies LIMIT 1")
except sqlite3.OperationalError:
# auto_confirm列不存在,需要添加并设置默认值
self._execute_sql(cursor, "ALTER TABLE cookies ADD COLUMN auto_confirm INTEGER DEFAULT 1")
self._execute_sql(cursor, "UPDATE cookies SET auto_confirm = 1 WHERE auto_confirm IS NULL")
else:
# auto_confirm列存在,更新NULL值
self._execute_sql(cursor, "UPDATE cookies SET auto_confirm = 1 WHERE auto_confirm IS NULL")
# 为delivery_rules表添加user_id字段(如果不存在)
try:
self._execute_sql(cursor, "SELECT user_id FROM delivery_rules LIMIT 1")
except sqlite3.OperationalError:
# user_id列不存在,需要添加并更新历史数据
self._execute_sql(cursor, "ALTER TABLE delivery_rules ADD COLUMN user_id INTEGER")
self._execute_sql(cursor, "UPDATE delivery_rules SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
else:
# user_id列存在,更新NULL值
self._execute_sql(cursor, "UPDATE delivery_rules SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
# 为notification_channels表添加user_id字段(如果不存在)
try:
self._execute_sql(cursor, "SELECT user_id FROM notification_channels LIMIT 1")
except sqlite3.OperationalError:
# user_id列不存在,需要添加并更新历史数据
self._execute_sql(cursor, "ALTER TABLE notification_channels ADD COLUMN user_id INTEGER")
self._execute_sql(cursor, "UPDATE notification_channels SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
else:
# user_id列存在,更新NULL值
self._execute_sql(cursor, "UPDATE notification_channels SET user_id = ? WHERE user_id IS NULL", (admin_user_id,))
# 为email_verifications表添加type字段(如果不存在)
try:
self._execute_sql(cursor, "SELECT type FROM email_verifications LIMIT 1")
except sqlite3.OperationalError:
# type列不存在,需要添加并更新历史数据
self._execute_sql(cursor, "ALTER TABLE email_verifications ADD COLUMN type TEXT DEFAULT 'register'")
self._execute_sql(cursor, "UPDATE email_verifications SET type = 'register' WHERE type IS NULL")
else:
# type列存在,更新NULL值
self._execute_sql(cursor, "UPDATE email_verifications SET type = 'register' WHERE type IS NULL")
# 为cards表添加多规格字段(如果不存在)
try:
self._execute_sql(cursor, "SELECT is_multi_spec FROM cards LIMIT 1")
except sqlite3.OperationalError:
# 多规格字段不存在,需要添加
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN is_multi_spec BOOLEAN DEFAULT FALSE")
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN spec_name TEXT")
self._execute_sql(cursor, "ALTER TABLE cards ADD COLUMN spec_value TEXT")
logger.info("为cards表添加多规格字段")
# 为item_info表添加多规格字段(如果不存在)
try:
self._execute_sql(cursor, "SELECT is_multi_spec FROM item_info LIMIT 1")
except sqlite3.OperationalError:
# 多规格字段不存在,需要添加
self._execute_sql(cursor, "ALTER TABLE item_info ADD COLUMN is_multi_spec BOOLEAN DEFAULT FALSE")
logger.info("为item_info表添加多规格字段")
# 处理keywords表的唯一约束问题
# 由于SQLite不支持直接修改约束,我们需要重建表
self._migrate_keywords_table_constraints(cursor)
self.conn.commit()
logger.info(f"admin用户ID更新完成")
except Exception as e:
logger.error(f"更新admin用户ID失败: {e}")
raise
def upgrade_notification_channels_table(self, cursor):
"""升级notification_channels表的type字段约束"""
try:
logger.info("开始升级notification_channels表...")
# 检查表是否存在
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='notification_channels'")
if not cursor.fetchone():
logger.info("notification_channels表不存在,无需升级")
return True
# 检查表中是否有数据
cursor.execute("SELECT COUNT(*) FROM notification_channels")
count = cursor.fetchone()[0]
# 删除可能存在的临时表
cursor.execute("DROP TABLE IF EXISTS notification_channels_new")
# 创建临时表
cursor.execute('''
CREATE TABLE notification_channels_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
user_id INTEGER NOT NULL,
type TEXT NOT NULL CHECK (type IN ('qq','ding_talk')),
config TEXT NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 复制数据,并转换不兼容的类型
if count > 0:
logger.info(f"复制 {count} 条通知渠道数据到新表")
# 先查看现有数据的类型
cursor.execute("SELECT DISTINCT type FROM notification_channels")
existing_types = [row[0] for row in cursor.fetchall()]
logger.info(f"现有通知渠道类型: {existing_types}")
# 获取所有现有数据进行逐行处理
cursor.execute("SELECT * FROM notification_channels")
existing_data = cursor.fetchall()
# 逐行转移数据,确保类型映射正确
for row in existing_data:
old_type = row[3] if len(row) > 3 else 'qq' # type字段,默认为qq
# 类型映射规则
type_mapping = {
'dingtalk': 'ding_talk',
'ding_talk': 'ding_talk',
'qq': 'qq',
'email': 'qq', # 暂时映射为qq,后续版本会支持
'webhook': 'qq', # 暂时映射为qq,后续版本会支持
'wechat': 'qq', # 暂时映射为qq,后续版本会支持
'telegram': 'qq' # 暂时映射为qq,后续版本会支持
}
new_type = type_mapping.get(old_type, 'qq') # 默认转换为qq类型
if old_type != new_type:
logger.info(f"转换通知渠道类型: {old_type} -> {new_type}")
# 插入到新表
cursor.execute('''
INSERT INTO notification_channels_new
(id, name, user_id, type, config, enabled, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
row[0], # id
row[1], # name
row[2], # user_id
new_type, # type (转换后的)
row[4] if len(row) > 4 else '{}', # config
row[5] if len(row) > 5 else True, # enabled
row[6] if len(row) > 6 else None, # created_at
row[7] if len(row) > 7 else None # updated_at
))
# 删除旧表
cursor.execute("DROP TABLE notification_channels")
# 重命名新表
cursor.execute("ALTER TABLE notification_channels_new RENAME TO notification_channels")
logger.info("notification_channels表升级完成")
return True
except Exception as e:
logger.error(f"升级notification_channels表失败: {e}")
raise
def upgrade_notification_channels_types(self, cursor):
"""升级notification_channels表支持更多渠道类型"""
try:
logger.info("开始升级notification_channels表支持更多渠道类型...")
# 检查表是否存在
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='notification_channels'")
if not cursor.fetchone():
logger.info("notification_channels表不存在,无需升级")
return True
# 检查表中是否有数据
cursor.execute("SELECT COUNT(*) FROM notification_channels")
count = cursor.fetchone()[0]
# 获取现有数据
existing_data = []
if count > 0:
cursor.execute("SELECT * FROM notification_channels")
existing_data = cursor.fetchall()
logger.info(f"备份 {count} 条通知渠道数据")
# 创建新表,支持更多渠道类型
cursor.execute('''
CREATE TABLE notification_channels_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
user_id INTEGER NOT NULL,
type TEXT NOT NULL CHECK (type IN ('qq','ding_talk','dingtalk','email','webhook','wechat','telegram')),
config TEXT NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 复制数据,同时处理类型映射
if existing_data:
logger.info(f"迁移 {len(existing_data)} 条通知渠道数据到新表")
for row in existing_data:
# 处理类型映射,支持更多渠道类型
old_type = row[3] if len(row) > 3 else 'qq' # type字段
# 扩展的类型映射规则
type_mapping = {
'ding_talk': 'dingtalk', # 统一为dingtalk
'dingtalk': 'dingtalk',
'qq': 'qq',
'email': 'email', # 现在支持email
'webhook': 'webhook', # 现在支持webhook
'wechat': 'wechat', # 现在支持wechat
'telegram': 'telegram' # 现在支持telegram
}
new_type = type_mapping.get(old_type, 'qq') # 默认为qq
if old_type != new_type:
logger.info(f"转换通知渠道类型: {old_type} -> {new_type}")
# 插入到新表,确保字段完整性
cursor.execute('''
INSERT INTO notification_channels_new
(id, name, user_id, type, config, enabled, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
row[0], # id
row[1], # name
row[2], # user_id
new_type, # type (转换后的)
row[4] if len(row) > 4 else '{}', # config
row[5] if len(row) > 5 else True, # enabled
row[6] if len(row) > 6 else None, # created_at
row[7] if len(row) > 7 else None # updated_at
))
# 删除旧表
cursor.execute("DROP TABLE notification_channels")
# 重命名新表
cursor.execute("ALTER TABLE notification_channels_new RENAME TO notification_channels")
logger.info("notification_channels表类型升级完成")
return True
except Exception as e:
logger.error(f"升级notification_channels表类型失败: {e}")
raise
def migrate_legacy_data(self, cursor):
"""迁移遗留数据到新表结构"""
try:
logger.info("开始检查和迁移遗留数据...")
# 检查是否有需要迁移的老表
legacy_tables = [
'old_notification_channels',
'legacy_delivery_rules',
'old_keywords',
'backup_cookies'
]
for table_name in legacy_tables:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
if cursor.fetchone():
logger.info(f"发现遗留表: {table_name},开始迁移数据...")
self._migrate_table_data(cursor, table_name)
logger.info("遗留数据迁移完成")
return True
except Exception as e:
logger.error(f"迁移遗留数据失败: {e}")
return False
def _migrate_table_data(self, cursor, table_name: str):
"""迁移指定表的数据"""
try:
if table_name == 'old_notification_channels':
# 迁移通知渠道数据
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
count = cursor.fetchone()[0]
if count > 0:
cursor.execute(f"SELECT * FROM {table_name}")
old_data = cursor.fetchall()
for row in old_data:
# 处理数据格式转换
cursor.execute('''
INSERT OR IGNORE INTO notification_channels
(name, user_id, type, config, enabled)
VALUES (?, ?, ?, ?, ?)
''', (
row[1] if len(row) > 1 else f"迁移渠道_{row[0]}",
row[2] if len(row) > 2 else 1, # 默认admin用户
self._normalize_channel_type(row[3] if len(row) > 3 else 'qq'),
row[4] if len(row) > 4 else '{}',
row[5] if len(row) > 5 else True
))
logger.info(f"成功迁移 {count} 条通知渠道数据")
# 迁移完成后删除老表
cursor.execute(f"DROP TABLE {table_name}")
logger.info(f"已删除遗留表: {table_name}")
except Exception as e:
logger.error(f"迁移表 {table_name} 数据失败: {e}")
def _normalize_channel_type(self, old_type: str) -> str:
"""标准化通知渠道类型"""
type_mapping = {
'ding_talk': 'dingtalk',
'dingtalk': 'dingtalk',
'qq': 'qq',
'email': 'email',
'webhook': 'webhook',
'wechat': 'wechat',
'telegram': 'telegram',
# 处理一些可能的变体
'dingding': 'dingtalk',
'weixin': 'wechat',
'tg': 'telegram'
}
return type_mapping.get(old_type.lower(), 'qq')
def _migrate_keywords_table_constraints(self, cursor):
"""迁移keywords表的约束,支持基于商品ID的唯一性校验"""
try:
# 检查是否已经迁移过(通过检查是否存在新的唯一索引)
cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name='idx_keywords_unique_with_item'")
if cursor.fetchone():
logger.info("keywords表约束已经迁移过,跳过")
return
logger.info("开始迁移keywords表约束...")
# 1. 创建临时表,不设置主键约束
cursor.execute('''
CREATE TABLE IF NOT EXISTS keywords_temp (
cookie_id TEXT,
keyword TEXT,
reply TEXT,
item_id TEXT,
FOREIGN KEY (cookie_id) REFERENCES cookies(id) ON DELETE CASCADE
)
''')
# 2. 复制现有数据到临时表
cursor.execute('''
INSERT INTO keywords_temp (cookie_id, keyword, reply, item_id)
SELECT cookie_id, keyword, reply, item_id FROM keywords
''')
# 3. 删除原表
cursor.execute('DROP TABLE keywords')
# 4. 重命名临时表
cursor.execute('ALTER TABLE keywords_temp RENAME TO keywords')
# 5. 创建复合唯一索引来实现我们需要的约束逻辑
# 对于item_id为空的情况:(cookie_id, keyword)必须唯一
cursor.execute('''
CREATE UNIQUE INDEX idx_keywords_unique_no_item
ON keywords(cookie_id, keyword)
WHERE item_id IS NULL OR item_id = ''
''')
# 对于item_id不为空的情况:(cookie_id, keyword, item_id)必须唯一
cursor.execute('''
CREATE UNIQUE INDEX idx_keywords_unique_with_item
ON keywords(cookie_id, keyword, item_id)
WHERE item_id IS NOT NULL AND item_id != ''
''')
logger.info("keywords表约束迁移完成")
except Exception as e:
logger.error(f"迁移keywords表约束失败: {e}")
# 如果迁移失败,尝试回滚
try:
cursor.execute('DROP TABLE IF EXISTS keywords_temp')
except:
pass
raise
def close(self):
"""关闭数据库连接"""
if self.conn:
self.conn.close()
self.conn = None
def get_connection(self):
"""获取数据库连接,如果已关闭则重新连接"""
if self.conn is None:
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
return self.conn
def _log_sql(self, sql: str, params: tuple = None, operation: str = "EXECUTE"):
"""记录SQL执行日志"""
if not self.sql_log_enabled:
return
# 格式化参数
params_str = ""
if params:
if isinstance(params, (list, tuple)):
if len(params) > 0:
# 限制参数长度,避免日志过长
formatted_params = []
for param in params:
if isinstance(param, str) and len(param) > 100:
formatted_params.append(f"{param[:100]}...")
else:
formatted_params.append(repr(param))
params_str = f" | 参数: [{', '.join(formatted_params)}]"
# 格式化SQL(移除多余空白)
formatted_sql = ' '.join(sql.split())
# 根据配置的日志级别输出
log_message = f"🗄️ SQL {operation}: {formatted_sql}{params_str}"
if self.sql_log_level == 'DEBUG':
logger.debug(log_message)
elif self.sql_log_level == 'INFO':
logger.info(log_message)
elif self.sql_log_level == 'WARNING':
logger.warning(log_message)
else:
logger.debug(log_message)
def _execute_sql(self, cursor, sql: str, params: tuple = None):
"""执行SQL并记录日志"""
self._log_sql(sql, params, "EXECUTE")
if params:
return cursor.execute(sql, params)
else:
return cursor.execute(sql)
def _executemany_sql(self, cursor, sql: str, params_list):
"""批量执行SQL并记录日志"""
self._log_sql(sql, f"批量执行 {len(params_list)} 条记录", "EXECUTEMANY")
return cursor.executemany(sql, params_list)
# -------------------- Cookie操作 --------------------
def save_cookie(self, cookie_id: str, cookie_value: str, user_id: int = None) -> bool:
"""保存Cookie到数据库,如存在则更新"""
with self.lock:
try:
cursor = self.conn.cursor()
# 如果没有提供user_id,尝试从现有记录获取,否则使用admin用户ID
if user_id is None:
self._execute_sql(cursor, "SELECT user_id FROM cookies WHERE id = ?", (cookie_id,))
existing = cursor.fetchone()
if existing:
user_id = existing[0]
else:
# 获取admin用户ID作为默认值
self._execute_sql(cursor, "SELECT id FROM users WHERE username = 'admin'")
admin_user = cursor.fetchone()
user_id = admin_user[0] if admin_user else 1
self._execute_sql(cursor,
"INSERT OR REPLACE INTO cookies (id, value, user_id) VALUES (?, ?, ?)",
(cookie_id, cookie_value, user_id)
)
self.conn.commit()
logger.info(f"Cookie保存成功: {cookie_id} (用户ID: {user_id})")
# 验证保存结果
self._execute_sql(cursor, "SELECT user_id FROM cookies WHERE id = ?", (cookie_id,))
saved_user_id = cursor.fetchone()
if saved_user_id: