Skip to content

Commit

Permalink
feat: add functions-framework implementation of message center example
Browse files Browse the repository at this point in the history
  • Loading branch information
WoodenStone committed Dec 30, 2024
1 parent 9510ef9 commit 1428bf6
Show file tree
Hide file tree
Showing 25 changed files with 2,835 additions and 1 deletion.
14 changes: 13 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
node_modules
.DS_Store
package-lock.json
yarn.lock
yarn.lock
.env
node_modules
build
built
dist
coverage
.log
.idea
.history
.DS_Store
logs
.vscode
13 changes: 13 additions & 0 deletions functions-framework/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# README

函数式托管是云开发推出的支持在云托管环境中部署函数式代码的开发框架,它相比直接使用云托管优势在于:

- 像写函数代码一样写服务:函数式托管支持云托管中部署函数式代码,更适合函数式编程风格,更容易编写
- 心智负担少:函数式托管不涉及容器、镜像等概念,不需要编写 Dockerfile,更轻量
- 内置的服务端优化:函数式托管框架内部内置 Graceful 优雅重启、并发控制、超时控制、参数优化等
- 更可读的日志:函数式托管框架提供`accesslog`日志能力,相比直接使用云托管需要自行处理日志更方便
- 本地开发友好:函数式托管提供了更多的开发辅助功能,如 cli 工具,本地调试、热重启、函数间路由和代码复用等
- 云原生底座:函数式托管基于云托管,支持云托管的所有功能,如自动扩容、自动伸缩等

函数式托管主要基于 [`@cloudbase/functions-framework`](https://www.npmjs.com/package/@cloudbase/functions-framework) 进行开发。本目录中的示例提供如何基于该 SDK 开发实用功能。

2 changes: 2 additions & 0 deletions functions-framework/message-center/.npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
link-workspace-packages=true
prefer-workspace-packages=true
107 changes: 107 additions & 0 deletions functions-framework/message-center/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# README

函数式托管集成消息队列和 Redis 实现与客户端收发消息的示例。

## 架构示意图

![architecture](./architecture.png)

## 时序图

```mermaid
sequenceDiagram
autonumber
actor c as 客户端
box rgb(199,239,251) 函数式托管集群
participant t as 函数 trigger
participant m as 函数 message
end
participant k as Kafka
participant r as Redis
m --> k: 消费消息(异步)
c ->> +m: 建立 Websocket 连接
note over m: 订阅客户端 channel,异步完成
m --> r: 订阅 Redis channel(异步)
c ->> +t: HTTP 请求触发函数,执行操作
t -->> -c: 返回(同步)
t ->> k: 投递消息
note over k: Kafka 投递消息完成后,函数 message 进行消费
m --> r: 收到消息后,发布到客户端 channel(异步)
m -->> c: 推送消息到指定客户端(异步)
```

## 使用

1. 前置条件

- 安装 `Node.js`
- 安装 `pnpm`
- 安装 `@cloudbase/functions-framework`

2. 进入项目根目录,安装依赖

```sh
pnpm i
```

3. 在项目根目录创建 `.env` 文件,填入中间件参数

```sh
touch .env
```

`.env` 文件内容:

```env
KAFKA_BROKER=ip:port # Kafka 连接地址
KAFKA_TOPIC=topic-xxx # Kafka 主题名
REDIS_URL=redis://ip:port # Redis 连接地址
```

4. 启动服务

```sh
pnpm start
```

5. 客户端向 `message` 函数发起 websocket 长链接

```sh
# User-Agent、客户端 IP 用于关联客户端
wscat -c "ws://127.0.0.1:3000/message" -H "User-Agent:unique-client"
```

6. 客户端向 `trigger` 函数发送消息

```sh
# User-Agent、客户端 IP 用于关联客户端
curl -v http://127.0.0.1:3000/trigger \
-H "content-type:application/json" \
-H "User-Agent:unique-client" \
-d '{"id":"Hello"}'
```

7. 连接到 `message` 函数的客户端收到服务端推送的消息,消息内容为客户端向 `trigger` 函数发送的消息

```text
< {"id":"Hello"}
```

8. 使用 `pm2` 扩展为多节点部署

8.1 全局安装 `pm2`

```sh
npm i -g pm2
```

8.2 使用 pm2 启动项目

```sh
pm2 start ecosystem.config.js
```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
25 changes: 25 additions & 0 deletions functions-framework/message-center/cloudbase-functions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"functionsRoot": ".",
"functions": [
{
"name": "trigger",
"directory": "./packages/trigger",
"source": "built/index.js"
},
{
"name": "message",
"directory": "./packages/message",
"source": "built/index.js"
}
],
"routes": [
{
"functionName": "trigger",
"path": "/trigger"
},
{
"functionName": "message",
"path": "/message"
}
]
}
19 changes: 19 additions & 0 deletions functions-framework/message-center/ecosystem.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const os = require('os')
require('dotenv').config()

module.exports = {
apps: [
{
name: 'tcb-ff',
script: 'tcb-ff',
watch: false,
instances: os.cpus().length,
exec_mode: 'cluster',
env: {
'KAFKA_BROKER': process.env.KAFKA_BROKER,
'KAFKA_TOPIC': process.env.KAFKA_TOPIC,
'REDIS_URL': process.env.REDIS_URL
}
}
]
};
24 changes: 24 additions & 0 deletions functions-framework/message-center/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"name": "message-center",
"private": "true",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"build": "pnpm run --parallel -r build",
"start": "export $(xargs <.env) && npx tcb-ff",
"watch": "pnpm run --parallel -r watch & export $(xargs <.env) && npx tcb-ff -w",
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"devDependencies": {
"@types/node": "^22.10.2",
"typescript": "^5.7.2"
},
"dependencies": {
"@cloudbase/functions-framework": "1.0.0-beta.11",
"dotenv": "^16.4.7"
}
}
19 changes: 19 additions & 0 deletions functions-framework/message-center/packages/common/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "@local/common",
"version": "1.0.0",
"description": "",
"main": "built/index.js",
"private": "true",
"scripts": {
"build": "npx tsc",
"watch": "npx tsc -w",
"test": "echo \"Error: no test specified\" && exit 1"
},

"author": "",
"license": "ISC",
"dependencies": {
"kafkajs": "^2.2.4",
"redis": "^4.7.0"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './kafka'
export * from './utils'
export * from './redis'
94 changes: 94 additions & 0 deletions functions-framework/message-center/packages/common/src/kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs'

export type TMessage = {
clientID: string
event: unknown
}

export class KafkaClient {
private kafka: Kafka
private producer: Producer
private consumer: Consumer

// 初始化 Kafka 客户端,连接 Kafka 集群
constructor(kafkaConfig: { brokers: string[]; clientId: string }) {
this.kafka = new Kafka(kafkaConfig)
this.producer = this.kafka.producer()
this.consumer = this.kafka.consumer({ groupId: kafkaConfig.clientId })
}

public async init(): Promise<void> {
await this.producer.connect()
await this.consumer.connect()
}

// 发送消息到 Kafka topic
public async sendMessage(topic: string, message: TMessage) {
if (!this.producer) {
throw new Error('Kafka producer is not initialized')
}

try {
const metadata = await this.producer.send({
topic,
messages: [{ value: JSON.stringify(message) }]
})
for (const m of metadata) {
console.log(
`Message delivered to ${m.topicName} (${m.partition}) at ${m.logAppendTime}`
)
}
} catch (error) {
throw Error('Failed to send message:' + error)
}
}

// 启动消费消息
public async startConsuming(
topic: string,
eachMessageHandler: (payload: EachMessagePayload) => Promise<void>
) {
if (!this.consumer) {
throw new Error('Kafka consumer is not initialized')
}

// 启动消费者,监听指定的 topic
await this.consumer.subscribe({ topic, fromBeginning: true })

await this.consumer.run({
autoCommitThreshold: 1,
eachMessage: eachMessageHandler
})

console.log(`Start consuming messages from topic "${topic}"...`)
}

// 关闭 Kafka 客户端连接
public async close() {
if (this.producer) {
await this.producer.disconnect()
console.log('Kafka producer disconnected')
}

if (this.consumer) {
await this.consumer.disconnect()
console.log('Kafka consumer disconnected')
}

if (this.kafka) {
console.log('Kafka client disconnected')
}
}
}

export const createKafkaClient = async (config: {
brokers: string[]
clientId: string
}) => {
const kafkaClient = new KafkaClient({
clientId: config.clientId,
brokers: config.brokers
})
await kafkaClient.init()
return kafkaClient
}
Loading

0 comments on commit 1428bf6

Please sign in to comment.