-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathapp-service.ts
312 lines (288 loc) · 10.6 KB
/
app-service.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
import {Application, Request, Response, default as express} from "express";
import bodyParser from "body-parser";
import morgan from "morgan";
import util from "util";
import { EventEmitter } from "events";
import fs from "fs";
import https from "https";
import { Server, default as http } from "http";
import { AppserviceHttpError } from "./AppserviceHttpError";
const MAX_SIZE_BYTES = 5000000; // 5MB
export declare interface AppService {
/**
* Emitted when an event is pushed to the appservice.
* The format of the event object is documented at
* https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid
* @event
* @example
* appService.on("event", function(ev) {
* console.log("ID: %s", ev.event_id);
* });
*/
on(event: "event", cb: (event: Record<string, unknown>) => void): this;
/**
* Emitted when an ephemeral event is pushed to the appservice.
* The format of the event object is documented at
* https://github.com/matrix-org/matrix-doc/pull/2409
* @event
* @example
* appService.on("ephemeral", function(ev) {
* console.log("ID: %s", ev.type);
* });
*/
on(event: "ephemeral", cb: (event: Record<string, unknown>) => void): this;
/**
* Emitted when the HTTP listener logs some information.
* `access_tokens` are stripped from requests
* @event
* @example
* appService.on("http-log", function(line) {
* console.log(line);
* });
*/
on(event: "http-log", cb: (line: string) => void): this;
/**
* Emitted when an event of a particular type is pushed
* to the appservice. This will be emitted *in addition*
* to "event", so ensure your bridge deduplicates events.
* @event
* @param event Should start with "type:"
* @example
* appService.on("type:m.room.message", function(event) {
* console.log("ID: %s", ev.content.body);
* });
*/
on(event: string, cb: (event: Record<string, unknown>) => void): this;
}
export class AppService extends EventEmitter {
/**
* @deprecated Use `AppService.expressApp`
*/
public readonly app: Application;
private server?: Server;
private lastProcessedTxnId = "";
/**
* Construct a new application service.
* @constructor
* @param {Object} config Configuration for this service.
* @param {String} config.homeserverToken The incoming HS token to expect. Must
* be set prior to calling listen(port).
* @param {Number} config.httpMaxSizeBytes The max number of bytes allowed on an
* incoming HTTP request. Default: 5000000.
* @throws If a homeserver token is not supplied.
*/
constructor (private config: { homeserverToken: string; httpMaxSizeBytes?: number}) {
super();
const app = express();
app.use(morgan("combined", {
stream: {
write: this.onMorganLog.bind(this),
}
}));
app.use(bodyParser.json({
limit: this.config.httpMaxSizeBytes || MAX_SIZE_BYTES,
}));
const legacyEndpointHandler = (req: Request, res: Response) => {
res.status(308).location("/_matrix/app/v1" + req.originalUrl).send({ errcode: "M_UNKNOWN", error: "This non-standard endpoint has been removed" }) };
app.get("/_matrix/app/v1/users/:userId", this.onGetUsers.bind(this));
app.get("/_matrix/app/v1/rooms/:alias", this.onGetRoomAlias.bind(this));
app.put("/_matrix/app/v1/transactions/:txnId", this.onTransaction.bind(this));
app.get("/users/:userId", legacyEndpointHandler);
app.get("/rooms/:alias", legacyEndpointHandler);
app.put("/transactions/:txnId", legacyEndpointHandler);
app.get("/health", this.onHealthCheck.bind(this));
this.app = app;
}
/***
* Begin listening on the specified port.
* @param {Number} port The port to listen on.
* @param {String} hostname Optional hostname to listen on
* @param {Number} backlog Maximum length of the queue of pending connections
* @param {Function} callback The callback for the "listening" event. Optional.
* @returns {Promise} When the server is listening, if a callback is not provided.
*/
public listen(port: number, hostname: string, backlog: number, callback?: () => void) {
const tlsKey = process.env.MATRIX_AS_TLS_KEY;
const tlsCert = process.env.MATRIX_AS_TLS_CERT;
let serverApp: Server;
if (tlsKey || tlsCert) {
if (!(tlsKey && tlsCert)) {
throw new Error("MATRIX_AS_TLS_KEY and MATRIX_AS_TLS_CERT should be defined together!");
}
if (!fs.existsSync(tlsKey)) {
throw new Error("Could not open MATRIX_AS_TLS_KEY: " + tlsKey);
}
if (!fs.existsSync(tlsCert)) {
throw new Error("Could not open MATRIX_AS_TLS_CERT: " + tlsCert);
}
const options = {
key : fs.readFileSync(tlsKey),
cert : fs.readFileSync(tlsCert)
};
serverApp = https.createServer(options, this.app);
}
else {
serverApp = http.createServer({}, this.app);
}
if (callback) {
this.server = serverApp.listen(port, hostname, backlog, callback);
return;
}
return new Promise<void>((resolve, reject) => {
serverApp.once("error", reject);
serverApp.once("listening", resolve);
this.server = serverApp.listen(port, hostname, backlog);
});
}
/**
* Closes the HTTP server.
* @returns {Promise} When the operation has completed
* @throws If the server has not been started.
*/
public async close() {
if (!this.server) {
throw Error("Server has not started");
}
return util.promisify(this.server.close).apply(this.server);
}
/**
* Override this method to handle alias queries.
* @param {string} alias The queried room alias
* @param {Function} callback The callback to invoke when complete.
* @return {Promise} A promise to resolve when complete (if callback isn't supplied)
*/
public onAliasQuery(alias: string, callback: () => void): PromiseLike<void>|null {
callback(); // stub impl
return null;
}
/**
* Override this method to handle user queries.
* @param {string} userId The queried user ID.
* @param {Function} callback The callback to invoke when complete.
* @return {Promise} A promise to resolve when complete (if callback isn't supplied)
*/
public onUserQuery(userId: string, callback: () => void): PromiseLike<void>|null {
callback(); // stub impl
return null;
}
/**
* Set the token that should be used to verify incoming events.
* @param {string} hsToken The home server token
*/
public setHomeserverToken(hsToken: string) {
this.config.homeserverToken = hsToken;
}
/**
* The Express App instance for the appservice, which
* can be extended with paths.
*/
public get expressApp() {
return this.app;
}
private onMorganLog(str: string) {
// The dependency `morgan` expects to write to a stream and adds a new line at the end.
// Listeners of the `http-log` event expect there not to be a new line, so the string
// can be handed to a logger like `console.log()` without displaying empty lines.
str = str.replace(/\n$/, "");
str = str.replace(/access_token=.*?(&|\s|$)/, "access_token=<REDACTED>$1");
this.emit("http-log", str);
}
private isInvalidToken(req: Request, res: Response): boolean {
const providedToken = req.headers.authorization?.substring("Bearer ".length) ?? req.query.access_token;
if (providedToken !== this.config.homeserverToken) {
res.status(403);
res.send({
errcode: "M_FORBIDDEN",
error: "Bad token supplied,"
});
return true;
}
return false;
}
private async onGetUsers(req: Request, res: Response) {
if (this.isInvalidToken(req, res)) {
return;
}
const possiblePromise = this.onUserQuery(req.params.userId, () => {
res.send({});
});
if (!possiblePromise) {
return;
}
try {
await possiblePromise;
res.send({});
} catch (e) {
if (e instanceof AppserviceHttpError) {
res.status(e.status);
res.send({
errcode: e.errcode,
message: e.message,
});
} else {
res.status(500);
res.send({
errcode: "M_UNKNOWN",
message: e instanceof Error ? e.message : "",
});
}
}
}
private async onGetRoomAlias(req: Request, res: Response) {
if (this.isInvalidToken(req, res)) {
return;
}
const possiblePromise = this.onAliasQuery(req.params.alias, function() {
res.send({});
});
if (!possiblePromise) {
return;
}
try {
await possiblePromise;
res.send({});
} catch (e) {
res.send({
errcode: "M_UNKNOWN",
error: e instanceof Error ? e.message : ""
});
}
}
private onTransaction(req: Request, res: Response) {
if (this.isInvalidToken(req, res)) {
return;
}
const txnId = req.params.txnId;
if (!txnId) {
res.send("Missing transaction ID.");
return;
}
if (!req.body) {
res.send("Missing body.");
return;
}
const events = req.body.events || [];
const ephemeral = req.body["de.sorunome.msc2409.ephemeral"] || [];
if (this.lastProcessedTxnId === txnId) {
res.send({}); // duplicate
return;
}
for (const event of events) {
this.emit("event", event);
if (event.type) {
this.emit("type:" + event.type, event);
}
}
for (const event of ephemeral) {
this.emit("ephemeral", event);
if (event.type) {
this.emit("ephemeral_type:" + event.type, event);
}
}
this.lastProcessedTxnId = txnId;
res.send({});
}
private onHealthCheck(req: Request, res: Response) {
res.send('OK');
}
}