DeepTrade / docs
开发者手册

Notify API

顶层 notify / notification_session 异步推送 + ChannelPlugin.push + NotificationPayload 语义化数据 — 从策略侧产生通知到 channel 侧消费的完整链路

策略产出的"提醒/告警/分析摘要"不直接打到 IM 协议——而是通过框架的语义化 NotificationPayload 分发给所有启用的 channel 插件,channel 各自负责协议层渲染。

本章拆开两件事:作为策略作者怎么发通知,以及作为 channel 作者怎么实现 push

1. 策略侧:发通知

一次性通知

from deeptrade import notify
from deeptrade.plugins_api import (
    NotificationPayload,
    NotificationSection,
    NotificationItem,
)

def alert_pump_dump(ts_code: str, score: int) -> None:
    payload = NotificationPayload(
        title="异动告警",
        sections=[
            NotificationSection(
                heading="候选标的",
                items=[
                    NotificationItem(label="代码", value=ts_code),
                    NotificationItem(label="启动评分", value=str(score)),
                ],
            ),
        ],
        metrics={
            "score": score,
        },
    )
    notify(payload)   # 顶层一次性 API

notify() 一次性 API:每次调用重新构造 AsyncDispatchNotifier,写一条消息后等 worker drain 完才返回。开销略大但简单。

批量场景:notification_session

如果一次 run 要发数十条通知,逐条 notify() 会反复重建 worker。改用 session:

from deeptrade import notification_session

def alert_batch(candidates: list[Candidate]) -> None:
    with notification_session() as session:
        for c in candidates:
            session.notify(make_payload(c))
    # exit context 时框架 join(timeout=5s) 让 worker 把队列清空

session 复用同一个 AsyncDispatchNotifier,性能上比 N 次独立 notify() 显著好。

零成本退化

如果没有任何启用的 channel:

# build_notifier 检查 SELECT * FROM plugins WHERE type='channel' AND enabled=true
# 结果空 → 返回 NoopNotifier
notify(payload)   # 瞬间返回,零成本

策略代码无需写 if channels_enabled: notify(...) 之类的条件判断——notify 自己零成本退化。

2. NotificationPayload 结构

@dataclass
class NotificationPayload:
    title: str                                  # 必填,简短主题
    sections: list[NotificationSection] = field(default_factory=list)
    metrics: dict[str, float | int] = field(default_factory=dict)
    metadata: dict[str, str] = field(default_factory=dict)


@dataclass
class NotificationSection:
    heading: str
    items: list[NotificationItem]


@dataclass
class NotificationItem:
    label: str
    value: str
    emphasis: bool = False    # channel 可选用作高亮

关键设计:完全语义化,不预渲染 markdown / HTML / 富文本。每个 channel 看到同一个 payload,自己决定怎么渲染:

Channel渲染产物
stdout-channel一行 ANSI text 到 stdout
feishu-channel飞书 Markdown 卡片
wechat-channel微信公众号文本(受限)
slack-channelSlack Block Kit

不要在策略侧拼 markdown 字符串然后丢给 notify()。一旦那么做,所有 channel 都被锁死成同一种渲染——失去多渠道分发的意义。语义化的 sections / items / metrics 是项目的核心约束

3. Channel 侧:实现 push

Channel 插件多一个 push 方法:

from deeptrade.plugins_api import (
    ChannelPlugin,
    NotificationPayload,
    PluginContext,
    PluginMetadata,
)

class StdoutChannel:
    metadata: PluginMetadata

    def validate_static(self, ctx: PluginContext) -> None:
        # 如果你的 channel 需要 webhook URL / token,这里不能联网验证;
        # 但可以读 ctx.config 看 secret 是否已配;未配 → raise 告诉用户去 config
        webhook = ctx.config.get("stdout.webhook", default=None)
        # 没就 noop,有就检查格式(不发请求)

    def dispatch(self, argv: list[str]) -> int:
        # channel 也可以有 CLI 子命令(典型:test 自检)
        if argv == ["test"]:
            return self._self_test()
        return 1

    def push(self, ctx: PluginContext, payload: NotificationPayload) -> None:
        """框架的 MultiplexNotifier 同步调用这里。
        
        - 正常情况:把 payload 渲染成你的协议格式 + 发送
        - raise 时:框架 catch + 日志,**不影响其他 channel**
        - 不要在这里做任何"重试"逻辑:那是框架 / 你的 transport 库的责任
        """
        line = self._format(payload)
        print("✔ push success: " + line)

    def _format(self, payload: NotificationPayload) -> str:
        parts = [payload.title]
        for s in payload.sections:
            parts.append(s.heading)
            for it in s.items:
                marker = "**" if it.emphasis else ""
                parts.append(f"  - {marker}{it.label}: {it.value}{marker}")
        return " | ".join(parts)

    def _self_test(self) -> int:
        from deeptrade.plugins_api import NotificationPayload
        sample = NotificationPayload(
            title="自检",
            sections=[],
            metrics={},
        )
        self.push(ctx=None, payload=sample)
        return 0

ChannelPlugin 与普通 Plugin 的差异只有一个——多 push(ctx, payload) -> None 方法。其他完全一样。

Channel 是 Protocol,不是基类

# ✓
class FeishuChannel:
    def push(self, ctx, payload): ...
    # 其他 Plugin 三件套

# ✗ 不需要也不要继承
class FeishuChannel(ChannelPlugin):  # ChannelPlugin 是 Protocol
    ...

4. 通知链路细节(架构层)

策略调 notify(payload)


build_notifier(db, plugin_manager)
    │  查 plugins 表中 type='channel' AND enabled=true
    │  - 0 个 → NoopNotifier(瞬间返回)
    │  - ≥1 个 → AsyncDispatchNotifier

AsyncDispatchNotifier
    │  bounded queue (默认 size=1024)
    │  worker thread 单线程消费
    │  put 满 → 调用方阻塞,避免 OOM

MultiplexNotifier
    │  for each enabled channel:
    │      try: channel.push(ctx, payload)
    │      except: log_warning(traceback), 继续下一个

ChannelPlugin × N

关键性质

性质实现
异步worker thread + bounded queue
失败隔离per-channel try/except
优雅退出进程退出前 join(timeout) 等队列 drain
零成本退化0 enabled channel 时返回 NoopNotifier

5. 用户视角:启用 channel

用户不需要任何"在策略代码里加 if 判断"的事;只要:

deeptrade plugin install <my-channel>     # 装上即启用
deeptrade plugin disable <my-channel>     # 临时关闭
deeptrade plugin enable <my-channel>      # 恢复

策略代码完全不需要感知有哪些 channel 启用了。

6. 自检命令

约定 channel 插件提供 test 子命令:

deeptrade <channel-id> test

实现:

def dispatch(self, argv: list[str]) -> int:
    if argv == ["test"]:
        sample = NotificationPayload(
            title="DeepTrade 自检",
            sections=[
                NotificationSection(
                    heading="状态",
                    items=[NotificationItem(label="ok", value="✔")],
                ),
            ],
        )
        try:
            self.push(ctx=self._fake_ctx(), payload=sample)
            print("self-test passed")
            return 0
        except Exception as e:
            print(f"self-test failed: {e}", file=sys.stderr)
            return 1
    return 2  # usage error

让用户装完后能立刻 deeptrade my-channel test 验证 webhook / token 配置正确。

排错

现象排查
notify() 不发任何东西deeptrade plugin list 看是否有 enabled channel
一个 channel 挂掉,其他都没收到不应该发生 —— per-channel 隔离。看日志 ~/.deeptrade/logs/
进程退出后消息没到join(timeout) 默认 5s;策略提前 sys.exit 会跳过 join
同一条消息收到 N 次检查策略是不是循环里调了 notify(p) 而非 session.notify(p)

下一步

发布到注册表

关键词:notify、notification_session、NotificationPayload、NotificationSection、NotificationItem、ChannelPlugin、push、AsyncDispatchNotifier、MultiplexNotifier、失败隔离、NoopNotifier