Skip to content

Aedes Adapter

@mqttkit/aedes 把 Aedes broker 接入 MqttApp。Aedes 继续负责 MQTT 协议语义,mqttkit 负责应用层路由、middleware、策略、服务注入和事件监听。

TCP MQTT

ts
import { aedes } from '@mqttkit/aedes'
import { MqttApp, router } from '@mqttkit/core'

const app = new MqttApp()
  .use(aedes({ tcp: { port: 1883 } }))
  .use(
    router().topic('devices/:uid/events', {
      onMessage(ctx) {
        console.log(ctx.params.uid, ctx.payload.toString())
      },
    }),
  )

await app.listen()

MQTT-over-WebSocket

ts
const app = new MqttApp()
  .use(aedes({ tcp: false, ws: { port: 8888, path: '/mqtt' } }))
  .use(
    router()
      .topic('browser/:clientId/ping', {
        async onMessage(ctx) {
          await ctx.publish(`browser/${ctx.params.clientId}/pong`, ctx.payload)
        },
      })
      .topic('browser/:clientId/pong'),
  )

await app.listen()

客户端使用标准 MQTT-over-WebSocket:

ts
import mqtt from 'mqtt'

const client = mqtt.connect('ws://localhost:8888/mqtt')
client.subscribe('browser/demo/pong')
client.publish('browser/demo/ping', 'hello')

认证与 principal

authenticate 返回的对象会作为 principal 注入 publish policy、subscribe policy 和 handler context。

ts
const app = new MqttApp<{ principal?: { uid: string } }>()
  .use(
    aedes({
      tcp: { port: 1883 },
      authenticate({ username }) {
        if (!username) return false
        return { uid: username }
      },
    }),
  )
  .use(
    router<{ principal?: { uid: string } }>()
      .topic('devices/:uid/events', {
        publish: ({ params, principal }) => params.uid === principal?.uid,
        onMessage(ctx) {
          console.log(ctx.principal?.uid)
        },
      })
      .topic('server/:uid/commands', {
        subscribe: ({ params, principal }) => params.uid === principal?.uid,
      }),
  )

外部 Aedes 与 server

adapter 可以使用外部创建的 Aedes instance,也可以挂载到已有 HTTP server 上:

ts
import { createServer } from 'node:http'
import { createBroker } from 'aedes'

const broker = createBroker()
const server = createServer()

const app = new MqttApp().use(
  aedes({
    instance: broker,
    tcp: false,
    ws: { server, path: '/mqtt' },
  }),
)

await app.listen()
server.listen(8888)

Persistence

persistence 会透传给 Aedes:

ts
new MqttApp().use(
  aedes({
    tcp: { port: 1883 },
    persistence,
  }),
)

QoS、retain、session、offline delivery 和持久化行为由 Aedes 及其 persistence adapter 决定。

基于 MIT 协议发布