Notify API
顶层 notify / notification_session 异步推送 + ChannelPlugin.push + NotificationPayload 语义化数据 — 从策略侧产生通知到 channel 侧消费的完整链路
策略产出的"提醒/告警/分析摘要"不直接打到 IM 协议——而是通过框架的语义化 NotificationPayload 分发给所有启用的 channel 插件,channel 各自负责协议层渲染。
本章拆开两件事:作为策略作者怎么发通知,以及作为 channel 作者怎么实现 push。
1. 策略侧:发通知
一次性通知
from deeptrade import notify
from deeptrade.plugins_api import (
NotificationPayload,
NotificationSection,
NotificationItem,
)
def alert_pump_dump(ts_code: str, score: int) -> None:
payload = NotificationPayload(
title="异动告警",
sections=[
NotificationSection(
heading="候选标的",
items=[
NotificationItem(label="代码", value=ts_code),
NotificationItem(label="启动评分", value=str(score)),
],
),
],
metrics={
"score": score,
},
)
notify(payload) # 顶层一次性 APInotify() 一次性 API:每次调用重新构造 AsyncDispatchNotifier,写一条消息后等 worker drain 完才返回。开销略大但简单。
批量场景:notification_session
如果一次 run 要发数十条通知,逐条 notify() 会反复重建 worker。改用 session:
from deeptrade import notification_session
def alert_batch(candidates: list[Candidate]) -> None:
with notification_session() as session:
for c in candidates:
session.notify(make_payload(c))
# exit context 时框架 join(timeout=5s) 让 worker 把队列清空session 复用同一个 AsyncDispatchNotifier,性能上比 N 次独立 notify() 显著好。
零成本退化
如果没有任何启用的 channel:
# build_notifier 检查 SELECT * FROM plugins WHERE type='channel' AND enabled=true
# 结果空 → 返回 NoopNotifier
notify(payload) # 瞬间返回,零成本策略代码无需写 if channels_enabled: notify(...) 之类的条件判断——notify 自己零成本退化。
2. NotificationPayload 结构
@dataclass
class NotificationPayload:
title: str # 必填,简短主题
sections: list[NotificationSection] = field(default_factory=list)
metrics: dict[str, float | int] = field(default_factory=dict)
metadata: dict[str, str] = field(default_factory=dict)
@dataclass
class NotificationSection:
heading: str
items: list[NotificationItem]
@dataclass
class NotificationItem:
label: str
value: str
emphasis: bool = False # channel 可选用作高亮关键设计:完全语义化,不预渲染 markdown / HTML / 富文本。每个 channel 看到同一个 payload,自己决定怎么渲染:
| Channel | 渲染产物 |
|---|---|
| stdout-channel | 一行 ANSI text 到 stdout |
| feishu-channel | 飞书 Markdown 卡片 |
| wechat-channel | 微信公众号文本(受限) |
| slack-channel | Slack Block Kit |
不要在策略侧拼 markdown 字符串然后丢给 notify()。一旦那么做,所有 channel 都被锁死成同一种渲染——失去多渠道分发的意义。语义化的 sections / items / metrics 是项目的核心约束。
3. Channel 侧:实现 push
Channel 插件多一个 push 方法:
from deeptrade.plugins_api import (
ChannelPlugin,
NotificationPayload,
PluginContext,
PluginMetadata,
)
class StdoutChannel:
metadata: PluginMetadata
def validate_static(self, ctx: PluginContext) -> None:
# 如果你的 channel 需要 webhook URL / token,这里不能联网验证;
# 但可以读 ctx.config 看 secret 是否已配;未配 → raise 告诉用户去 config
webhook = ctx.config.get("stdout.webhook", default=None)
# 没就 noop,有就检查格式(不发请求)
def dispatch(self, argv: list[str]) -> int:
# channel 也可以有 CLI 子命令(典型:test 自检)
if argv == ["test"]:
return self._self_test()
return 1
def push(self, ctx: PluginContext, payload: NotificationPayload) -> None:
"""框架的 MultiplexNotifier 同步调用这里。
- 正常情况:把 payload 渲染成你的协议格式 + 发送
- raise 时:框架 catch + 日志,**不影响其他 channel**
- 不要在这里做任何"重试"逻辑:那是框架 / 你的 transport 库的责任
"""
line = self._format(payload)
print("✔ push success: " + line)
def _format(self, payload: NotificationPayload) -> str:
parts = [payload.title]
for s in payload.sections:
parts.append(s.heading)
for it in s.items:
marker = "**" if it.emphasis else ""
parts.append(f" - {marker}{it.label}: {it.value}{marker}")
return " | ".join(parts)
def _self_test(self) -> int:
from deeptrade.plugins_api import NotificationPayload
sample = NotificationPayload(
title="自检",
sections=[],
metrics={},
)
self.push(ctx=None, payload=sample)
return 0ChannelPlugin 与普通 Plugin 的差异只有一个——多 push(ctx, payload) -> None 方法。其他完全一样。
Channel 是 Protocol,不是基类
# ✓
class FeishuChannel:
def push(self, ctx, payload): ...
# 其他 Plugin 三件套
# ✗ 不需要也不要继承
class FeishuChannel(ChannelPlugin): # ChannelPlugin 是 Protocol
...4. 通知链路细节(架构层)
策略调 notify(payload)
│
▼
build_notifier(db, plugin_manager)
│ 查 plugins 表中 type='channel' AND enabled=true
│ - 0 个 → NoopNotifier(瞬间返回)
│ - ≥1 个 → AsyncDispatchNotifier
▼
AsyncDispatchNotifier
│ bounded queue (默认 size=1024)
│ worker thread 单线程消费
│ put 满 → 调用方阻塞,避免 OOM
▼
MultiplexNotifier
│ for each enabled channel:
│ try: channel.push(ctx, payload)
│ except: log_warning(traceback), 继续下一个
▼
ChannelPlugin × N关键性质:
| 性质 | 实现 |
|---|---|
| 异步 | worker thread + bounded queue |
| 失败隔离 | per-channel try/except |
| 优雅退出 | 进程退出前 join(timeout) 等队列 drain |
| 零成本退化 | 0 enabled channel 时返回 NoopNotifier |
5. 用户视角:启用 channel
用户不需要任何"在策略代码里加 if 判断"的事;只要:
deeptrade plugin install <my-channel> # 装上即启用
deeptrade plugin disable <my-channel> # 临时关闭
deeptrade plugin enable <my-channel> # 恢复策略代码完全不需要感知有哪些 channel 启用了。
6. 自检命令
约定 channel 插件提供 test 子命令:
deeptrade <channel-id> test实现:
def dispatch(self, argv: list[str]) -> int:
if argv == ["test"]:
sample = NotificationPayload(
title="DeepTrade 自检",
sections=[
NotificationSection(
heading="状态",
items=[NotificationItem(label="ok", value="✔")],
),
],
)
try:
self.push(ctx=self._fake_ctx(), payload=sample)
print("self-test passed")
return 0
except Exception as e:
print(f"self-test failed: {e}", file=sys.stderr)
return 1
return 2 # usage error让用户装完后能立刻 deeptrade my-channel test 验证 webhook / token 配置正确。
排错
| 现象 | 排查 |
|---|---|
notify() 不发任何东西 | deeptrade plugin list 看是否有 enabled channel |
| 一个 channel 挂掉,其他都没收到 | 不应该发生 —— per-channel 隔离。看日志 ~/.deeptrade/logs/ |
| 进程退出后消息没到 | join(timeout) 默认 5s;策略提前 sys.exit 会跳过 join |
| 同一条消息收到 N 次 | 检查策略是不是循环里调了 notify(p) 而非 session.notify(p) |
下一步
→ 发布到注册表
关键词:notify、notification_session、NotificationPayload、NotificationSection、NotificationItem、ChannelPlugin、push、AsyncDispatchNotifier、MultiplexNotifier、失败隔离、NoopNotifier