Getting Started
mqttkit organizes MQTT applications with MqttApp, ordered use() middleware, and router().topic(). Broker adapters own connections, QoS, retain, sessions, and persistence.
Install
bash
bun add @mqttkit/core @mqttkit/aedes aedesCreate an App
ts
import { aedes } from '@mqttkit/aedes'
import { MqttApp, router } from '@mqttkit/core'
const app = new MqttApp()
.use(aedes({ tcp: { port: 1883 }, ws: { port: 8888, path: '/mqtt' } }))
.use(async (ctx, next) => {
console.log(ctx.clientId, ctx.topic)
await next()
})
.use(
router()
.topic('devices/:uid/events', {
async onMessage(ctx) {
console.log(ctx.params.uid, ctx.payload.toString())
await ctx.publish(`server/${ctx.params.uid}/echo`, ctx.payload)
},
})
.topic('server/:uid/echo'),
)
await app.listen()Topic Routes
topic(pattern, config) declares publish policy, subscribe policy, and an inbound message handler.
ts
router()
.topic('devices/:uid/events', {
publish: ({ params, principal }) => params.uid === principal?.uid,
subscribe: false,
async onMessage(ctx) {
await ctx.publish(`server/${ctx.params.uid}/commands`, ctx.payload)
},
})
.topic('server/:uid/commands', {
publish: false,
subscribe: ({ params, principal }) => params.uid === principal?.uid,
})Default policies:
- A topic with
onMessagedefaults topublish: trueandsubscribe: false. - A topic without
onMessagedefaults topublish: falseandsubscribe: true.
Middleware
use() runs in registration order.
text
app middleware -> route middleware -> onMessageIf middleware does not call next(), the remaining middleware and handler do not run.
ts
new MqttApp()
.use(async (ctx, next) => {
if (!ctx.clientId) return
await next()
})
.use(
router()
.use(async (ctx, next) => {
if (ctx.params.uid === 'blocked') return
await next()
})
.topic('devices/:uid/events', { onMessage }),
)Service Injection
Use decorate() to inject business dependencies. Handlers access them through ctx.services.
ts
const app = new MqttApp<{ services: { audit: AuditService } }>()
.decorate('audit', audit)
.use(
router<{ services: { audit: AuditService } }>().topic('devices/:uid/events', {
async onMessage(ctx) {
await ctx.services.audit.write(ctx.topic, ctx.payload)
},
}),
)Run an Example
bash
bun install
bun run --cwd examples/aedes-basic devThen connect with mqtt.js, MQTTX, Mosquitto CLI, or any standard MQTT client.