Post

Slack App 整合 ChatGPT|利用 OpenAI API 与 Google Cloud Functions 自建智能助理

针对团队协作痛点,打造可自订 OpenAI API Key 的 Slack ChatGPT 机器人,运用 Google Cloud Functions 快速部署,实现即时问答与上下文串接,提升工作效率并保障资料安全。

Slack App 整合 ChatGPT|利用 OpenAI API 与 Google Cloud Functions 自建智能助理

Click here to view the English version of this article.

點擊這裡查看本文章正體中文版本。

基于 SEO 考量,本文标题与描述经 AI 调整,原始版本请参考内文。


Slack & ChatGPT Integration

自行打造 ChatGPT OpenAI API for Slack App (Google Cloud Functions & Python)

背景

最近在团队内推广运用 Generative AI 提升工作效率,初步只希望达成 AI Assistant (ChatGPT 功能),从减少日常资料查询、整理繁琐资料、手工处理资料的时间,以提升工作效率;希望是工程师、设计师、PM、行销…都能自由使用的。

最简单的方法就是直接买 ChatGPT Team 方案,一个席位 $25 美金一年;但由于尚不确定大家的使用频率(量)及希望能在外来与更多协作、开发流程进行整合,所以改采用 OpenAI API 方式,再透过其他服务封装整合供团队成员使用。

OpenAI API Key 可从 此页面产生 ,Key 没有分对应的 Model 版本,使用时才需指定要使用的 Model 版本并产生对应的 Token 费用。

我们需要一个服务能自行设定 OpenAI API Key 并使用该 Key 进行类 ChatGPT 使用。

不管是 Chrome Extension 或 Slack App 都蛮难找到能自行设定 OpenAI API Key 的服务,大部分服务都是要卖他们自己的订阅制,让使用者自订 API Key 等于赚不到钱纯做慈善。

[Chrome Extension] SidebarGPT

安装完后可到设定 -> General -> 填入 OpenAI API Key。

可从浏览器小工具栏、侧边 Icon 直接呼叫出聊天界面,直接使用:

[Chrome Extension] OpenAI Translator

如果只有翻译需求可使用这个,能自订 OpenAI API Key 用于翻译。

另外他是 开源专案 ,并同时提供 macOS/Windows 桌面版程式:

Chrome Extension 的优点是快速简单方便,直接装直接使用;缺点是需要将 API Key 提供给所有成员,难以管控外泄问题,还有使用第三方服务也难以保证大家的资料安全。

[Self-hosted] LibreChat

研发部同事推荐的 OpenAI API Chat 封装服务,提供身份认证使用及几乎还原 ChatGPT 使用介面、功能比 ChatGPT 更强大的开源专案。

只需专案、装好 Docker、设定好 .env、启 Docker 服务就能直接透过网站连入使用。

试了一下简直无懈可击,就是本地版 ChatGPT 服务;要说缺点的话就只有需要伺服器部署服务吧;如果没其他考量,可以直接使用此开源专案。

Slack App

其实 LibreChat 服务起起来放上伺服器就已经达成效果了,但是灵光一闪想说如果能整合在日常工具当中是不是更方便?加上公司伺服器有严格的权限设定,不太能随意起服务。

当时也没想太多,想说 Slack App 的 OpenAI API 整合服务应该很多,找一个设定一下就好;没想到事情没有那么简单。

Google 搜寻只找到一篇 Slack x OpenAI 2023/03 官方的新闻稿「 Why we built the ChatGPT app for Slack 」及一些 Beta 图片:

<https://www.salesforce.com/news/stories/chatgpt-app-for-slack/>{:target="_blank"}

https://www.salesforce.com/news/stories/chatgpt-app-for-slack/

看起来功能非常完整而且能大大提升工作效率,不过截自 2024/01 为止尚无释出的消息,文末提供的 Beta 注册连结 也已经失效,暂时没有下文。(还是微软想先让 Teams 支援?)

[2024/02/14 Update]:

Slack Apps

因无官方的 App 转而搜寻第三方开发者的 App,搜寻并试用了几个都碰壁;符合的 App 不多就算了,也没有一个是能提供自订 Key 功能的,每个都是做来卖服务、卖钱的。

自行实现 ChatGPT OpenAI API for Slack App

之前有一些 Slack App 开发经验,决定自己动手做。

⚠️声明⚠️

本文是以串接 OpenAI API 为例演示如何建立 Slack App 和快速使用 Google Cloud Funtions 来达成需求,Slack App 有需多应用可做,大家可以自由发挥。

⚠️⚠️ Google Cloud Functions,Function as a Service (FaaS) 的优点是方便快速、有免费额度,程式写好就能直接部署执行、自动扩充,缺点是服务环境由 GCP 控制,当服务太久没被呼叫会进入休眠,此时再次呼叫会进入 Cold Start 冷启动,需要较长的反应时间;另外也较难起多个服务互相使用。

更完整或使用需求量大的话还是建议自己起 VM (App Engine) 架 Server 跑服务。

最终成果图

完整 Cloud Functions Python 程式码、Slack App 设定已附在文末,懒得一步一步看的朋友可快速前往查阅。

Step 1. 建立 Slack App

前往 Slack App

点击「Create New App」

选择「From scratch」

输入「App Name」、选择要加入的 Workspace。

建立完成后先到「OAuth & Permissions」新增 Bot 需要的权限。

下滑找到「Scopes」区块,点击「Add an OAuth Scope」搜寻加入以下几个权限:

  • chat:write

  • im:history

  • im:read

  • im:write

Bot 权限加完之后点击左方「Install App」->「Install to Workspace」

尔后如果 Slack App 有新增其他权限,都需要再点击一次「Reinstall」才会生效。

但请放心,Bot Token 不会因为重新安装而改变。

Slack Bot Token 权限设定好之后,前往「App Home」:

下滑找到「Show Tabs」区块,启用「Messages Tab」及「Allow users to send Slash commands and messages from the messages tab」(这个没勾一样不能传讯息,会显示「Sending messages to this app has been turned off.」)

回到 Slack Workspace,按「Command+R」更新画面就能看到新建立的 Slack App 和讯息输入匡:

此时传送讯息给 App 还没有任何功能。

启用 Event Subscriptions 功能

再来,我们需要启用 Slack App 的事件订阅功能,当指定事件发生时会打 API 到指定 URL。

新增 Google Cloud Funtions

Request URL 的部分, Google Cloud Funtions 就要上场了。

设定好专案、帐单资讯后点击「Create Function」

Function name 输入专案名称、Authentication 选择「Allow unauthenticated invocations」意即知道网址就能存取。

如果你无法建立 Function 或无法更改 Authentication 代表你的 GCP 帐号没有完整的 Google Cloud Functions 权限,需要请组织管理员在原本的身份之外额外加上 Cloud Functions Admin 的权限,才能使用。

Runtime: Python 3.8 or 更高

main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import functions_framework

@functions_framework.http
def hello_http(request):
    request_json = request.get_json(silent=True)
    request_args = request.args
    request_headers = request.headers

    # 可以简单使用 print 纪录执行阶段 Log,可在 Logs 中查看
    # 进阶 Logging Level 使用可参考:https://cloud.google.com/logging/docs/reference/libraries
    print(request_json)

    # 受限 FAAS(Cloud Functions),服务太久没呼叫,再次呼叫会进入冷启动的特性,可能无法在 Slack 规定的 3 秒内回应
    # 加上 OpenAI API 请求到答复需要一定时间(依照回应长度可能要接近 1 分钟才能结束)
    # Slack 在时限内收不到回应会认为 Request lost,Slack 会再次重复呼叫
    # 会造成重复请求、回应的问题,因此我们可以在 Response Headers 设置 X-Slack-No-Retry: 1 告知 Slack ,就算没在时限内收到回应也不需 Retry    
    headers = {'X-Slack-No-Retry':1}

    # 如果是 Slack Retry 的请求...忽略
    if request_headers and 'X-Slack-Retry-Num' in request_headers:
        return ('OK!', 200, headers)

    # Slack App Event Subscriptions Verify
    # https://api.slack.com/events/url_verification
    if request_json and 'type' in request_json and request_json['type'] == 'url_verification':
        challenge = ""
        if 'challenge' in request_json:
            challenge = request_json['challenge']
        return (challenge, 200, headers)

    return ("Access Denied!", 400, headers)

requirements.txt 输入以下依赖:

1
2
3
functions-framework==3.*
requests==2.31.0
openai==1.9.0

目前还没有什么功能,只是让 Slack App 能通过 Event Subscriptions 启用验证,可以直接点击「Deploy」完成首次部署。

⚠️如果你不熟悉 Cloud Functions 编辑器,可先下拉到文章底部查看补充内容。

等待部署完成后(绿勾勾) ,复制 Cloud Functions URL:

将 Request URL 贴回 Slack App Enable Events。

如果没问题就会出现「Verified」完成验证。

这边做的事是收到 Slack 发来的验证请求时:

1
2
3
4
5
{
    "token": "Jhj5dZrVaK7ZwHHjRyZWjbDl",
    "challenge": "3eZbrw1aBm2rZgRNFdxV2595E9CY3gmdALWMmHkvFXO7tYXAYM8P",
    "type": "url_verification"
}

回应 challenge 栏位内容,就能通过验证。

启用成功后往下滑找到「Subscribe to bot events」区块,点击「Add Bot User Event」新增「message.im」权限。

新增完全权限后点击上方「reinstall your app」连结重新安装 Slack App 到 Workspace 后,Slack App 的设定就告一段落啰。

也可以去「App Home」或「Basic Information」客制化 Slack App 的名称与大头贴。

Basic Information

Basic Information

Step 2. 完善 OpenAI API 与 Slack App 串接 (Direct 讯息)

首先我们先要取得必备的 OPENAI API KEYBot User OAuth Token 两个 Key。

处理 Direct Message (IM) Event & 串接 OpenAI API 回应

当使用者在与 Slack App 讯息传送时会收到以下 Event Json Paylod :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
{
  "token": "XXX",
  "team_id": "XXX",
  "context_team_id": "XXX",
  "context_enterprise_id": null,
  "api_app_id": "XXX",
  "event": {
    "client_msg_id": "XXX",
    "type": "message",
    "text": "你好",
    "user": "XXX",
    "ts": "1707920753.115429",
    "blocks": [
      {
        "type": "rich_text",
        "block_id": "orfng",
        "elements": [
          {
            "type": "rich_text_section",
            "elements": [
              {
                "type": "text",
                "text": "你好"
              }
            ]
          }
        ]
      }
    ],
    "team": "XXX",
    "channel": "XXX",
    "event_ts": "1707920753.115429",
    "channel_type": "im"
  },
  "type": "event_callback",
  "event_id": "XXX",
  "event_time": 1707920753,
  "authorizations": [
    {
      "enterprise_id": null,
      "team_id": "XXX",
      "user_id": "XXX",
      "is_bot": true,
      "is_enterprise_install": false
    }
  ],
  "is_ext_shared_channel": false,
  "event_context": "4-XXX"
}

基于以上 Json Payload,我们可以完善 Slack 讯息到 OpenAI API 再到回复 Slack 讯息的串接:

Cloud Functions main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import functions_framework
import requests
import asyncio
import json
import time
from openai import AsyncOpenAI

OPENAI_API_KEY = "OPENAI API KEY"
SLACK_BOT_TOKEN = "Bot User OAuth Token"

# 使用的 OPENAI API Model
# https://platform.openai.com/docs/models
OPENAI_MODEL = "gpt-4-1106-preview"

@functions_framework.http
def hello_http(request):
    request_json = request.get_json(silent=True)
    request_args = request.args
    request_headers = request.headers

    # 可以简单使用 print 纪录执行阶段 Log,可在 Logs 中查看
    # 进阶 Logging Level 使用可参考:https://cloud.google.com/logging/docs/reference/libraries
    print(request_json)

    # 受限 FAAS(Cloud Functions),服务太久没呼叫,再次呼叫会进入冷启动的特性,可能无法在 Slack 规定的 3 秒内回应
    # 加上 OpenAI API 请求到答复需要一定时间(依照回应长度可能要接近 1 分钟才能结束)
    # Slack 在时限内收不到回应会认为 Request lost,Slack 会再次重复呼叫
    # 会造成重复请求、回应的问题,因此我们可以在 Response Headers 设置 X-Slack-No-Retry: 1 告知 Slack ,就算没在时限内收到回应也不需 Retry    
    headers = {'X-Slack-No-Retry':1}

    # 如果是 Slack Retry 的请求...忽略
    if request_headers and 'X-Slack-Retry-Num' in request_headers:
        return ('OK!', 200, headers)

    # Slack App Event Subscriptions Verify
    # https://api.slack.com/events/url_verification
    if request_json and 'type' in request_json and request_json['type'] == 'url_verification':
        challenge = ""
        if 'challenge' in request_json:
            challenge = request_json['challenge']
        return (challenge, 200, headers)

    # Handle Event Subscriptions Events...
    if request_json and 'event' in request_json and 'type' in request_json['event']:
        # 如果 Event 来源是 App 且 App ID == Slack App ID 代表是自己 Slack App 触发的事件
        # 忽略不处理,否则会陷入无限循环 Slack App -> Cloud Functions -> Slack App -> Cloud Functions...
        if 'api_app_id' in request_json and 'app_id' in request_json['event'] and request_json['api_app_id'] == request_json['event']['app_id']:
            return ('OK!', 200, headers)

        # 事件名称,例如:message(讯息相关), app_mention(被标记提及)....
        eventType = request_json['event']['type']

        # SubType,例如:message_changed(编辑讯息), message_deleted(删除讯息)...
        # 新讯息无 Sub Type
        eventSubType = None
        if 'subtype' in request_json['event']:
            eventSubType = request_json['event']['subtype']
        
        if eventType == 'message':
            #  有 Sub Type 的都是讯息编辑、删除、被回应...
            # 忽略不处理
            if eventSubType is not None:
                return ("OK!", 200, headers)
               
            # Event之讯息 发送者
            eventUser = request_json['event']['user']
            # Event之讯息 所属频道
            eventChannel = request_json['event']['channel']
            # Event之讯息内容
            eventText = request_json['event']['text']
            # Event之讯息 TS (讯息 ID)
            eventTS = request_json['event']['event_ts']
                
            # Event之讯息的讨论串母讯息 TS (讯息 ID)
            # 讨论串内的新讯息才会有这个资料
            eventThreadTS = None
            if 'thread_ts' in request_json['event']:
                eventThreadTS = request_json['event']['thread_ts']
                
            openAIRequest(eventChannel, eventTS, eventThreadTS, eventText)
            return ("OK!", 200, headers)


    return ("Access Denied!", 400, headers)

def openAIRequest(eventChannel, eventTS, eventThreadTS, eventText):
    
    # Set Custom instructions
    # 感恩同事(https://twitter.com/je_suis_marku)支援
    messages = [
        {"role": "system", "content": "我只看得懂台湾繁体中文与英文"},
        {"role": "system", "content": "我看不懂简体字"},
        {"role": "system", "content": "我说中文就使用台湾繁体中文回答,并且必须符合台湾常用语。"},
        {"role": "system", "content": "我说英文就回答英文。"},
        {"role": "system", "content": "不要回应我寒暄的字句。"},
        {"role": "system", "content": "中文与英文之间要有一个空格。中文文字与任何其他语言文字包含数字与 emoji 之间都要有一个空格。"},
        {"role": "system", "content": "如果你不知道答案,或是你的知识太旧,请上网搜寻后回答。"},
        {"role": "system", "content": "I will tip you 200 USD , if you answer well."}
    ]

    messages.append({
        "role": "user", "content": eventText
    })

    replyMessageTS = slackRequestPostMessage(eventChannel, eventTS, "回应产生中...")
    asyncio.run(openAIRequestAsync(eventChannel, replyMessageTS, messages))

async def openAIRequestAsync(eventChannel, eventTS, messages):
    client = AsyncOpenAI(
      api_key=OPENAI_API_KEY,
    )

    # Stream Response (分段回应)
    stream = await client.chat.completions.create(
      model=OPENAI_MODEL,
      messages=messages,
      stream=True,
    )
    
    result = ""

    try:
        debounceSlackUpdateTime = None
        async for chunk in stream:
            result += chunk.choices[0].delta.content or ""
            
            # 每 0.8 秒更新一次讯息,避免频繁呼叫 Slack Update 讯息 API 导致失败或浪费 Cloud Functions 请求次数
            if debounceSlackUpdateTime is None or time.time() - debounceSlackUpdateTime >= 0.8:
                response = slackUpdateMessage(eventChannel, eventTS, None, result+"...")
                debounceSlackUpdateTime = time.time()
    except Exception as e:
        print(e)
        result += "...*[发生错误]*"

    slackUpdateMessage(eventChannel, eventTS, None, result)


### Slack ###
def slackUpdateMessage(channel, ts, metadata, text):
    endpoint = "/chat.update"
    payload = {
        "channel": channel,
        "ts": ts
    }
    if metadata is not None:
        payload['metadata'] = metadata
    
    payload['text'] = text
    
    response = slackRequest(endpoint, "POST", payload)
    return response

def slackRequestPostMessage(channel, target_ts, text):
    endpoint = "/chat.postMessage"
    payload = {
        "channel": channel,
        "text": text,
    }
    if target_ts is not None:
        payload['thread_ts'] = target_ts

    response = slackRequest(endpoint, "POST", payload)

    if response is not None and 'ts' in response:
        return response['ts']
    return None

def slackRequest(endpoint, method, payload):
    url = "https://slack.com/api"+endpoint

    headers = {
        "Authorization": f"Bearer {SLACK_BOT_TOKEN}",
        "Content-Type": "application/json",
    }

    response = None
    if method == "POST":
        response = requests.post(url, headers=headers, data=json.dumps(payload))
    elif method == "GET":
        response = requests.post(url, headers=headers)

    if response and response.status_code == 200:
        result = response.json()
        return result
    else:
        return None

回到 Slack 测试看看:

现在你已经可以进行类 ChatGPT 与 OpenAI API 进行问与答了。

增加中断 Stream Response 功能节省 Token

实现方式有很多种,可在同条讨论串下如果还没回应完,使用者又输入新讯息则中断之前的 Response 抑或是点击讯息增加中断回应的 Shortcut。

本文以新增「讯息中断」Shortcut 为例。

不管哪一种中断方式,核心原理都是相同的,因为我们没有 Database 储存产生的讯息、讯息状态的资讯;因此实现方式是依赖 Slack 讯息的 metadata 栏位 (可存放客制化资讯在指定讯息内)。

我们在使用 chat.update API Endpoint 时,如果呼叫成功会回传当前讯息的文字内容与 metadata,因此我们在上面的 OpenAI API Stream -> Slack Update Message 的程式码之中多加入判断修改请求的回应中的 metadata 是否有「中断」的标记,如果有则中断 OpenAI Stream Response。

首先需要先新增 Slack App 讯息 Shortcut

前往 Slack App 管理介面,找到「Interactivity & Shortcuts」区块,点击启用,网址一样使用同个 Cloud Functions URL。

点击「Create New Shortcut」新增新的讯息 Shortcut。

选择「On messages」。

  • Name 动作标题: 停止 OpenAI API 产生回应

  • Short Description 简介: 停止 OpenAI API 产生回应

  • Callback ID: abort_openai_api (程式识别用,可自订)

点击「Create」建立完成后,最后记得点右下「Save Changes」储存设定。

再次点击上方「reinstall your app」才会生效。

回到 Slack 在讯息上点击右上角「…」就会出现「停止 OpenAI API 产生回应」Shortcut (此时点击无作用)。

当使用者在讯息上按下 Shortcut 会发送一个 Event Json Payload:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
{
  "type": "message_action",
  "token": "XXXXXX",
  "action_ts": "1706188005.387646",
  "team": {
    "id": "XXXXXX",
    "domain": "XXXXXX-XXXXXX"
  },
  "user": {
    "id": "XXXXXX",
    "username": "zhgchgli",
    "team_id": "XXXXXX",
    "name": "zhgchgli"
  },
  "channel": {
    "id": "XXXXXX",
    "name": "directmessage"
  },
  "is_enterprise_install": false,
  "enterprise": null,
  "callback_id": "abort_openai_api",
  "trigger_id": "XXXXXX",
  "response_url": "https://hooks.slack.com/app/XXXXXX/XXXXXX/XXXXXX",
  "message_ts": "1706178957.161109",
  "message": {
    "bot_id": "XXXXXX",
    "type": "message",
    "text": "高丽菜包 的英文翻译是 \"cabbage wrap\"。如果您是将它作为菜名使用,有时候会具体到菜里的内容来命名,比如 \"pork cabbage wrap\"(猪肉高丽菜包)或 \"vegetable cabbage wrap\"(蔬菜高丽菜包)。",
    "user": "XXXXXX",
    "ts": "1706178957.161109",
    "app_id": "XXXXXX",
    "blocks": [
      {
        "type": "rich_text",
        "block_id": "eKgaG",
        "elements": [
          {
            "type": "rich_text_section",
            "elements": [
              {
                "type": "text",
                "text": "高丽菜包 的英文翻译是 \"cabbage wrap\"。如果您是将它作为菜名使用,有时候会具体到菜里的内容来命名,比如 \"pork cabbage wrap\"(猪肉高丽菜包)或 \"vegetable cabbage wrap\"(蔬菜高丽菜包)。"
              }
            ]
          }
        ]
      }
    ],
    "team": "XXXXXX",
    "bot_profile": {
      "id": "XXXXXX",
      "deleted": false,
      "name": "Rick C-137",
      "updated": 1706001605,
      "app_id": "XXXXXX",
      "icons": {
        "image_36": "https://avatars.slack-edge.com/2024-01-23/6517244582244_0c708dfa3f893c72d4c2_36.png",
        "image_48": "https://avatars.slack-edge.com/2024-01-23/6517244582244_0c708dfa3f893c72d4c2_48.png",
        "image_72": "https://avatars.slack-edge.com/2024-01-23/6517244582244_0c708dfa3f893c72d4c2_72.png"
      },
      "team_id": "XXXXXX"
    },
    "edited": {
      "user": "XXXXXX",
      "ts": "1706187989.000000"
    },
    "thread_ts": "1706178832.102439",
    "parent_user_id": "XXXXXX"
  }
}

完善 Cloud Functions main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import functions_framework
import requests
import asyncio
import json
import time
from openai import AsyncOpenAI

OPENAI_API_KEY = "OPENAI API KEY"
SLACK_BOT_TOKEN = "Bot User OAuth Token"

# 使用的 OPENAI API Model
# https://platform.openai.com/docs/models
OPENAI_MODEL = "gpt-4-1106-preview"

@functions_framework.http
def hello_http(request):
    request_json = request.get_json(silent=True)
    request_args = request.args
    request_headers = request.headers

    # Shortcut 的 Event 会从 post payload field 给
    # https://api.slack.com/reference/interaction-payloads/shortcuts
    payload = request.form.get('payload')
    if payload is not None:
        payload = json.loads(payload)

    # 可以简单使用 print 纪录执行阶段 Log,可在 Logs 中查看
    # 进阶 Logging Level 使用可参考:https://cloud.google.com/logging/docs/reference/libraries
    print(payload)

    # 受限 FAAS(Cloud Functions),服务太久没呼叫,再次呼叫会进入冷启动的特性,可能无法在 Slack 规定的 3 秒内回应
    # 加上 OpenAI API 请求到答复需要一定时间(依照回应长度可能要接近 1 分钟才能结束)
    # Slack 在时限内收不到回应会认为 Request lost,Slack 会再次重复呼叫
    # 会造成重复请求、回应的问题,因此我们可以在 Response Headers 设置 X-Slack-No-Retry: 1 告知 Slack ,就算没在时限内收到回应也不需 Retry    
    headers = {'X-Slack-No-Retry':1}

    # 如果是 Slack Retry 的请求...忽略
    if request_headers and 'X-Slack-Retry-Num' in request_headers:
        return ('OK!', 200, headers)

    # Slack App Event Subscriptions Verify
    # https://api.slack.com/events/url_verification
    if request_json and 'type' in request_json and request_json['type'] == 'url_verification':
        challenge = ""
        if 'challenge' in request_json:
            challenge = request_json['challenge']
        return (challenge, 200, headers)

    # Handle Event Subscriptions Events...
    if request_json and 'event' in request_json and 'type' in request_json['event']:
        # 如果 Event 来源是 App 且 App ID == Slack App ID 代表是自己 Slack App 触发的事件
        # 忽略不处理,否则会陷入无限循环 Slack App -> Cloud Functions -> Slack App -> Cloud Functions...
        if 'api_app_id' in request_json and 'app_id' in request_json['event'] and request_json['api_app_id'] == request_json['event']['app_id']:
            return ('OK!', 200, headers)

        # 事件名称,例如:message(讯息相关), app_mention(被标记提及)....
        eventType = request_json['event']['type']

        # SubType,例如:message_changed(编辑讯息), message_deleted(删除讯息)...
        # 新讯息无 Sub Type
        eventSubType = None
        if 'subtype' in request_json['event']:
            eventSubType = request_json['event']['subtype']
        
        if eventType == 'message':
            #  有 Sub Type 的都是讯息编辑、删除、被回应...
            # 忽略不处理
            if eventSubType is not None:
                return ("OK!", 200, headers)
               
            # Event之讯息 发送者
            eventUser = request_json['event']['user']
            # Event之讯息 所属频道
            eventChannel = request_json['event']['channel']
            # Event之讯息内容
            eventText = request_json['event']['text']
            # Event之讯息 TS (讯息 ID)
            eventTS = request_json['event']['event_ts']
                
            # Event之讯息的讨论串母讯息 TS (讯息 ID)
            # 讨论串内的新讯息才会有这个资料
            eventThreadTS = None
            if 'thread_ts' in request_json['event']:
                eventThreadTS = request_json['event']['thread_ts']
                
            openAIRequest(eventChannel, eventTS, eventThreadTS, eventText)
            return ("OK!", 200, headers)

    
    # 处理 Shortcut
    if payload and 'type' in payload:
        payloadType = payload['type']

        # 如果是 讯息 Shortcut
        if payloadType == 'message_action':
            print(payloadType)
            callbackID = None
            channel = None
            ts = None
            text = None
            triggerID = None

            if 'callback_id' in payload:
                callbackID = payload['callback_id']
            if 'channel' in payload:
                channel = payload['channel']['id']
            if 'message' in payload:
                ts = payload['message']['ts']
                text = payload['message']['text']
            if 'trigger_id' in payload:
                triggerID = payload['trigger_id']
            
            if channel is not None and ts is not None and text is not None:
                # 如果是 停止 OpenAI API 产生回应 Shortcut
                if callbackID == "abort_openai_api":
                    slackUpdateMessage(channel, ts, {"event_type": "aborted", "event_payload": { }}, text)
                    if triggerID is not None:
                        slackOpenModal(triggerID, callbackID, "停止 OpenAI API 产生回应成功!")
                        return ("OK!", 200, headers)

        return ("OK!", 200, headers)


    return ("Access Denied!", 400, headers)

def openAIRequest(eventChannel, eventTS, eventThreadTS, eventText):
    
    # Set Custom instructions
    # 感恩同事(https://twitter.com/je_suis_marku)支援
    messages = [
        {"role": "system", "content": "我只看得懂台湾繁体中文与英文"},
        {"role": "system", "content": "我看不懂简体字"},
        {"role": "system", "content": "我说中文就使用台湾繁体中文回答,并且必须符合台湾常用语。"},
        {"role": "system", "content": "我说英文就回答英文。"},
        {"role": "system", "content": "不要回应我寒暄的字句。"},
        {"role": "system", "content": "中文与英文之间要有一个空格。中文文字与任何其他语言文字包含数字与 emoji 之间都要有一个空格。"},
        {"role": "system", "content": "如果你不知道答案,或是你的知识太旧,请上网搜寻后回答。"},
        {"role": "system", "content": "I will tip you 200 USD , if you answer well."}
    ]

    messages.append({
        "role": "user", "content": eventText
    })

    replyMessageTS = slackRequestPostMessage(eventChannel, eventTS, "回应产生中...")
    asyncio.run(openAIRequestAsync(eventChannel, replyMessageTS, messages))

async def openAIRequestAsync(eventChannel, eventTS, messages):
    client = AsyncOpenAI(
      api_key=OPENAI_API_KEY,
    )

    # Stream Response (分段回应)
    stream = await client.chat.completions.create(
      model=OPENAI_MODEL,
      messages=messages,
      stream=True,
    )
    
    result = ""

    try:
        debounceSlackUpdateTime = None
        async for chunk in stream:
            result += chunk.choices[0].delta.content or ""
            
            # 每 0.8 秒更新一次讯息,避免频繁呼叫 Slack Update 讯息 API 导致失败或浪费 Cloud Functions 请求次数
            if debounceSlackUpdateTime is None or time.time() - debounceSlackUpdateTime >= 0.8:
                response = slackUpdateMessage(eventChannel, eventTS, None, result+"...")
                debounceSlackUpdateTime = time.time()

                # 讯息有 metadata & metadata event_type == aborted 则代表此回应已被使用者标记为终止
                if response and 'ok' in response and response['ok'] == True and 'message' in response and 'metadata' in response['message'] and 'event_type' in response['message']['metadata'] and response['message']['metadata']['event_type'] == "aborted":
                    break
                    result += "...*[已终止]*"
                # 讯息已被删除
                elif response and 'ok' in response and response['ok'] == False and 'error' in response and response['error'] == "message_not_found" :
                    break
                
        await stream.close()
                
    except Exception as e:
        print(e)
        result += "...*[发生错误]*"

    slackUpdateMessage(eventChannel, eventTS, None, result)


### Slack ###
def slackOpenModal(trigger_id, callback_id, text):
    slackRequest("/views.open", "POST", {
        "trigger_id": trigger_id,
        "view": {
            "type": "modal",
            "callback_id": callback_id,
            "title": {
                "type": "plain_text",
                "text": "提示"
            },
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": text
                    }
                }
            ]
        }
    })

def slackUpdateMessage(channel, ts, metadata, text):
    endpoint = "/chat.update"
    payload = {
        "channel": channel,
        "ts": ts
    }
    if metadata is not None:
        payload['metadata'] = metadata
    
    payload['text'] = text
    
    response = slackRequest(endpoint, "POST", payload)
    return response

def slackRequestPostMessage(channel, target_ts, text):
    endpoint = "/chat.postMessage"
    payload = {
        "channel": channel,
        "text": text,
    }
    if target_ts is not None:
        payload['thread_ts'] = target_ts

    response = slackRequest(endpoint, "POST", payload)

    if response is not None and 'ts' in response:
        return response['ts']
    return None

def slackRequest(endpoint, method, payload):
    url = "https://slack.com/api"+endpoint

    headers = {
        "Authorization": f"Bearer {SLACK_BOT_TOKEN}",
        "Content-Type": "application/json",
    }

    response = None
    if method == "POST":
        response = requests.post(url, headers=headers, data=json.dumps(payload))
    elif method == "GET":
        response = requests.post(url, headers=headers)

    if response and response.status_code == 200:
        result = response.json()
        return result
    else:
        return None

回到 Slack 测试看看:

成功!当我们完成 停止 OpenAI API Shortcut 之后,正在产生的回应就会终止,并回应 [已终止]

另外相同原理,你也可以建立一个删除讯息的 Shortcut,实作删除 Slack App 所发出的讯息。

同个讨论串(Threads) 增加上下文(Context) 功能

如果在同个讨论串下再发送新讯息,可以当成是同个问题的再次追问,此时可以加上一个功能,为新的 prompt 补上前面对话内容。

补上 slackGetReplies & 将内容填充到 OpenAI API Prompt:

完善 Cloud Functions main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
import functions_framework
import requests
import asyncio
import json
import time
from openai import AsyncOpenAI

OPENAI_API_KEY = "OPENAI API KEY"
SLACK_BOT_TOKEN = "Bot User OAuth Token"

# 使用的 OPENAI API Model
# https://platform.openai.com/docs/models
OPENAI_MODEL = "gpt-4-1106-preview"

@functions_framework.http
def hello_http(request):
    request_json = request.get_json(silent=True)
    request_args = request.args
    request_headers = request.headers

    # Shortcut 的 Event 会从 post payload field 给
    # https://api.slack.com/reference/interaction-payloads/shortcuts
    payload = request.form.get('payload')
    if payload is not None:
        payload = json.loads(payload)

    # 可以简单使用 print 纪录执行阶段 Log,可在 Logs 中查看
    # 进阶 Logging Level 使用可参考:https://cloud.google.com/logging/docs/reference/libraries
    print(payload)

    # 受限 FAAS(Cloud Functions),服务太久没呼叫,再次呼叫会进入冷启动的特性,可能无法在 Slack 规定的 3 秒内回应
    # 加上 OpenAI API 请求到答复需要一定时间(依照回应长度可能要接近 1 分钟才能结束)
    # Slack 在时限内收不到回应会认为 Request lost,Slack 会再次重复呼叫
    # 会造成重复请求、回应的问题,因此我们可以在 Response Headers 设置 X-Slack-No-Retry: 1 告知 Slack ,就算没在时限内收到回应也不需 Retry    
    headers = {'X-Slack-No-Retry':1}

    # 如果是 Slack Retry 的请求...忽略
    if request_headers and 'X-Slack-Retry-Num' in request_headers:
        return ('OK!', 200, headers)

    # Slack App Event Subscriptions Verify
    # https://api.slack.com/events/url_verification
    if request_json and 'type' in request_json and request_json['type'] == 'url_verification':
        challenge = ""
        if 'challenge' in request_json:
            challenge = request_json['challenge']
        return (challenge, 200, headers)

    # Handle Event Subscriptions Events...
    if request_json and 'event' in request_json and 'type' in request_json['event']:
        apiAppID = None
        if 'api_app_id' in request_json:
            apiAppID = request_json['api_app_id']
        # 如果 Event 来源是 App 且 App ID == Slack App ID 代表是自己 Slack App 触发的事件
        # 忽略不处理,否则会陷入无限循环 Slack App -> Cloud Functions -> Slack App -> Cloud Functions...
        if 'app_id' in request_json['event'] and apiAppID == request_json['event']['app_id']:
            return ('OK!', 200, headers)

        # 事件名称,例如:message(讯息相关), app_mention(被标记提及)....
        eventType = request_json['event']['type']

        # SubType,例如:message_changed(编辑讯息), message_deleted(删除讯息)...
        # 新讯息无 Sub Type
        eventSubType = None
        if 'subtype' in request_json['event']:
            eventSubType = request_json['event']['subtype']
        
        if eventType == 'message':
            #  有 Sub Type 的都是讯息编辑、删除、被回应...
            # 忽略不处理
            if eventSubType is not None:
                return ("OK!", 200, headers)
               
            # Event之讯息 发送者
            eventUser = request_json['event']['user']
            # Event之讯息 所属频道
            eventChannel = request_json['event']['channel']
            # Event之讯息内容
            eventText = request_json['event']['text']
            # Event之讯息 TS (讯息 ID)
            eventTS = request_json['event']['event_ts']
                
            # Event之讯息的讨论串母讯息 TS (讯息 ID)
            # 讨论串内的新讯息才会有这个资料
            eventThreadTS = None
            if 'thread_ts' in request_json['event']:
                eventThreadTS = request_json['event']['thread_ts']
                
            openAIRequest(apiAppID, eventChannel, eventTS, eventThreadTS, eventText)
            return ("OK!", 200, headers)

    
    # 处理 Shortcut (讯息)
    if payload and 'type' in payload:
        payloadType = payload['type']

        # 如果是 讯息 Shortcut
        if payloadType == 'message_action':
            callbackID = None
            channel = None
            ts = None
            text = None
            triggerID = None

            if 'callback_id' in payload:
                callbackID = payload['callback_id']
            if 'channel' in payload:
                channel = payload['channel']['id']
            if 'message' in payload:
                ts = payload['message']['ts']
                text = payload['message']['text']
            if 'trigger_id' in payload:
                triggerID = payload['trigger_id']
            
            if channel is not None and ts is not None and text is not None:
                # 如果是 停止 OpenAI API 产生回应 Shortcut
                if callbackID == "abort_openai_api":
                    slackUpdateMessage(channel, ts, {"event_type": "aborted", "event_payload": { }}, text)
                    if triggerID is not None:
                        slackOpenModal(triggerID, callbackID, "停止 OpenAI API 产生回应成功!")
                        return ("OK!", 200, headers)


    return ("Access Denied!", 400, headers)

def openAIRequest(apiAppID, eventChannel, eventTS, eventThreadTS, eventText):
    
    # Set Custom instructions
    # 感恩同事(https://twitter.com/je_suis_marku)支援
    messages = [
        {"role": "system", "content": "我只看得懂台湾繁体中文与英文"},
        {"role": "system", "content": "我看不懂简体字"},
        {"role": "system", "content": "我说中文就使用台湾繁体中文回答,并且必须符合台湾常用语。"},
        {"role": "system", "content": "我说英文就回答英文。"},
        {"role": "system", "content": "不要回应我寒暄的字句。"},
        {"role": "system", "content": "中文与英文之间要有一个空格。中文文字与任何其他语言文字包含数字与 emoji 之间都要有一个空格。"},
        {"role": "system", "content": "如果你不知道答案,或是你的知识太旧,请上网搜寻后回答。"},
        {"role": "system", "content": "I will tip you 200 USD , if you answer well."}
    ]

    if eventThreadTS is not None:
        threadMessages = slackGetReplies(eventTS, eventThreadTS)
        if threadMessages is not None:
            for threadMessage in threadMessages:
                appID = None
                if 'app_id' in threadMessage:
                    appID = threadMessage['app_id']
                threadMessageText = threadMessage['text']
                threadMessageTs = threadMessage['ts']
                # 如果是 Slac App (OpenAI API Response) 则标示为 assistant
                if appID and appID == apiAppID:
                    messages.append({
                        "role": "assistant", "content": threadMessageText
                    })
                else:
                # 使用者的讯息内容标为 user
                    messages.append({
                        "role": "user", "content": threadMessageText
                    })

    messages.append({
        "role": "user", "content": eventText
    })

    replyMessageTS = slackRequestPostMessage(eventChannel, eventTS, "回应产生中...")
    asyncio.run(openAIRequestAsync(eventChannel, replyMessageTS, messages))

async def openAIRequestAsync(eventChannel, eventTS, messages):
    client = AsyncOpenAI(
      api_key=OPENAI_API_KEY,
    )

    # Stream Response (分段回应)
    stream = await client.chat.completions.create(
      model=OPENAI_MODEL,
      messages=messages,
      stream=True,
    )
    
    result = ""

    try:
        debounceSlackUpdateTime = None
        async for chunk in stream:
            result += chunk.choices[0].delta.content or ""
            
            # 每 0.8 秒更新一次讯息,避免频繁呼叫 Slack Update 讯息 API 导致失败或浪费 Cloud Functions 请求次数
            if debounceSlackUpdateTime is None or time.time() - debounceSlackUpdateTime >= 0.8:
                response = slackUpdateMessage(eventChannel, eventTS, None, result+"...")
                debounceSlackUpdateTime = time.time()

                # 讯息有 metadata & metadata event_type == aborted 则代表此回应已被使用者标记为终止
                if response and 'ok' in response and response['ok'] == True and 'message' in response and 'metadata' in response['message'] and 'event_type' in response['message']['metadata'] and response['message']['metadata']['event_type'] == "aborted":
                    break
                    result += "...*[已终止]*"
                # 讯息已被删除
                elif response and 'ok' in response and response['ok'] == False and 'error' in response and response['error'] == "message_not_found" :
                    break
                
        await stream.close()
                
    except Exception as e:
        print(e)
        result += "...*[发生错误]*"

    slackUpdateMessage(eventChannel, eventTS, None, result)


### Slack ###
def slackGetReplies(channel, ts):
    endpoint = "/conversations.replies?channel="+channel+"&ts="+ts
    response = slackRequest(endpoint, "GET", None)

    if response is not None and 'messages' in response:
        return response['messages']
    return None

def slackOpenModal(trigger_id, callback_id, text):
    slackRequest("/views.open", "POST", {
        "trigger_id": trigger_id,
        "view": {
            "type": "modal",
            "callback_id": callback_id,
            "title": {
                "type": "plain_text",
                "text": "提示"
            },
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": text
                    }
                }
            ]
        }
    })

def slackUpdateMessage(channel, ts, metadata, text):
    endpoint = "/chat.update"
    payload = {
        "channel": channel,
        "ts": ts
    }
    if metadata is not None:
        payload['metadata'] = metadata
    
    payload['text'] = text
    
    response = slackRequest(endpoint, "POST", payload)
    return response

def slackRequestPostMessage(channel, target_ts, text):
    endpoint = "/chat.postMessage"
    payload = {
        "channel": channel,
        "text": text,
    }
    if target_ts is not None:
        payload['thread_ts'] = target_ts

    response = slackRequest(endpoint, "POST", payload)

    if response is not None and 'ts' in response:
        return response['ts']
    return None

def slackRequest(endpoint, method, payload):
    url = "https://slack.com/api"+endpoint

    headers = {
        "Authorization": f"Bearer {SLACK_BOT_TOKEN}",
        "Content-Type": "application/json",
    }

    response = None
    if method == "POST":
        response = requests.post(url, headers=headers, data=json.dumps(payload))
    elif method == "GET":
        response = requests.post(url, headers=headers)

    if response and response.status_code == 200:
        result = response.json()
        return result
    else:
        return None

回到 Slack 测试看看:

  • 左图为原本没有补上 Context 时再追问问题,会是全新的对话。

  • 右图是加上补上 Context 后就能理解整个对话情境跟新的问题。

完成!

至此我们已经自己打造了一个 ChatGPT (via OpenAI API) Slack App Bot 机器人。

您也可以依照自己的需求参考 Slack API 与 OpenAI API Custom instructions 在 Cloud Functions Python 程式中进行搭配,例如训练一个频道专门回答团队问题与查找专案文件、一个频道专门负责翻译、一个频道专门分析数据…等等。

补充

在 1:1 讯息之外,标记机器人回答问题

  • 可以在任一频道(需要将机器人加入频道)标记机器人请他回答问题

首先需要增加 app_mention Event Subscription:

加入完成->「Save Changes」储存完成->「reinstall your app」完成。

在上述的 main.py 程式 #Handle Event Subscriptions Events…

Code Block 中加入新的 Event Type 判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
        # 提及类 Event (@SlcakApp 你好)
        if eventType == 'app_mention':
            # Event之讯息 发送者
            eventUser = request_json['event']['user']
            # Event之讯息 所属频道
            eventChannel = request_json['event']['channel']
            # Event之讯息内容,移除开头标记字串 <@SLACKAPPID> 
            eventText = re.sub(r"<@\w+>\W*", "", request_json['event']['text'])
            # Event之讯息 TS (讯息 ID)
            eventTS = request_json['event']['event_ts']
                
            # Event之讯息的讨论串母讯息 TS (讯息 ID)
            # 讨论串内的新讯息才会有这个资料
            eventThreadTS = None
            if 'thread_ts' in request_json['event']:
                eventThreadTS = request_json['event']['thread_ts']
                
            openAIRequest(apiAppID, eventChannel, eventTS, eventThreadTS, eventText)
            return ("OK!", 200, headers)

部署后,即可完成。

Slack App 删除所发的讯息

您无法直接在 Slack 上删除 Slack App 发出的讯息,可以参考上述「 停止 OpenAI API 产生回应 」Shortcut 方式,新增一个「删除讯息」Shortcut。

并在 Cloud Functions main.py 程式中:

# 处理 Shortcut Code Block 新增一个 callback_id 判断,判断等于你定的「删除讯息」Shortcut Callback ID 然后将参数带入以下方法,即可完成删:

1
2
3
4
5
6
7
8
9
def slackDeleteMessage(channel, ts):
    endpoint = "/chat.delete"
    payload = {
        "channel": channel,
        "ts": ts
    }
    
    response = slackRequest(endpoint, "POST", payload)
    return response

Slack App 没反应

  • 检查 Token 是否正确

  • 查看 Cloud Functions Logs 是否有错误

  • Cloud Functions 是否部署完成

  • Slack App 是否在你发问的频道(如非与 Slack App 1:1 的对话频道要把机器人加入频道才会生效)

  • 在 SlackRequest 方法下 Log 纪录 Slack API Response

Cloud Functions Public URL 不够安全

  • 如果怕 Cloud Functions URL 不够安全,可以自己加上 query token 验证
1
2
3
4
5
6
7
8
9
10
SAFE_ACCESS_TOKEN = "nF4JwxfG9abqPZCJnBerwwhtodC28BuC"

@functions_framework.http
def hello_http(request):
    request_json = request.get_json(silent=True)
    request_args = request.args
    request_headers = request.headers
    # 验证 token 参数是否合法
    if not(request_args and 'token' in request_args and request_args['token'] == SAFE_ACCESS_TOKEN):
        return ('', 400, headers)

Cloud Functions 相关问题

计费方式

不同区域、CPU、RAM、容量、流量…价格不一样,请参考 官方计价 表。

免费额度 如下:(2024/02/15)

1
2
3
4
5
6
7
8
9
10
Cloud Functions 针对运算时间资源提供永久免费方案
当中包括 GB/秒和 GHz/秒的分配方式除了 200 万次叫用以外
这个免费方案也提供 400,000 GB/秒和 200,000 GHz/秒的运算时间
以及每月 5 GB 的网际网路资料传输量

免费方案的使用额度是以上述级别 1 价格的同等美元金额计算
无论执行函式的区域采用的是级别 1 /或级别 2 价格系统都会分配同等美元金额给您
不过在扣除免费方案的额度时系统将以函式执行区域的级别 (级别 1 或级别 2) 为准

请注意即便您采用的是免费方案也必须拥有有效的帐单帐户

btw. .Slack App 免费、不限一定要 Premium 才能使用。

Slack App 回应太慢、太久 Timeout

(撇除 OpenAI API 高峰时间回应较慢问题) ,如果是 Cloud Function 贫颈,可在 Cloud Function 编辑器第一页展开设定:

可调整 CPU、RAM、Timeout 时间、Concurrent 数量…提升处理请求速度。

*但可能会需要计费

开发阶段 Testing & Debug

点击「Test Function」就会跑出 Cloud Shell 视窗在下方工具列,等待约 3–5 分钟(第一次启动较久),Build 完成并同意以下授权后:

看到出现「Function is ready to test」后即可点击「Run Test」执行方法 Debug 测试。

可使用右方「Triggering event」区块输入 JSON Body 会带入 request_json 参数进行测试,或直接改程式 inject test object 进行测试。

*请注意 Cloud Shell/Cloud Run可能有延伸费用。

建议部署(Deploy)前都先跑一次测试看看,至少确保 Build 能成功。

Build 失败,程式码不见了该怎么办?

如果不小心写错程式造成 Cloud Function Deploy Build Failed,会出现错误提示,此时点击「EDIT AND REDEPLOY」回到 编辑器后会发现刚刚改的 Code 都不见了!!!

无须担心,此时点击左方「Source Code」选择「Last Failed Deployment」就能恢复刚刚 Build Failed 的 Code:

查看执行阶段 print Logs

*请注意 Cloud Logging、Query 查询 Logs 可能有延伸费用。

最终程式码 (Python 3.8)

Cloud Functions

main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
import functions_framework
import requests
import re
import asyncio
import json
import time
from openai import AsyncOpenAI

OPENAI_API_KEY = "OPENAI API KEY"
SLACK_BOT_TOKEN = "Bot User OAuth Token"

# 自己定义的安全验证 Token
# 网址需带上 ?token=SAFE_ACCESS_TOKEN 参数才会接受请求  
SAFE_ACCESS_TOKEN = "nF4JwxfG9abqPZCJnBerwwhtodC28BuC"

# 使用的 OPENAI API Model
# https://platform.openai.com/docs/models
OPENAI_MODEL = "gpt-4-1106-preview"

@functions_framework.http
def hello_http(request):
    request_json = request.get_json(silent=True)
    request_args = request.args
    request_headers = request.headers

    # Shortcut 的 Event 会从 post payload field 给
    # https://api.slack.com/reference/interaction-payloads/shortcuts
    payload = request.form.get('payload')
    if payload is not None:
        payload = json.loads(payload)

    # 可以简单使用 print 纪录执行阶段 Log,可在 Logs 中查看
    # 进阶 Logging Level 使用可参考:https://cloud.google.com/logging/docs/reference/libraries
    # print(payload)

    # 受限 FAAS(Cloud Functions),服务太久没呼叫,再次呼叫会进入冷启动的特性,可能无法在 Slack 规定的 3 秒内回应
    # 加上 OpenAI API 请求到答复需要一定时间(依照回应长度可能要接近 1 分钟才能结束)
    # Slack 在时限内收不到回应会认为 Request lost,Slack 会再次重复呼叫
    # 会造成重复请求、回应的问题,因此我们可以在 Response Headers 设置 X-Slack-No-Retry: 1 告知 Slack ,就算没在时限内收到回应也不需 Retry    
    headers = {'X-Slack-No-Retry':1}

    # 验证 token 参数是否合法
    if not(request_args and 'token' in request_args and request_args['token'] == SAFE_ACCESS_TOKEN):
        return ('', 400, headers)

    # 如果是 Slack Retry 的请求...忽略
    if request_headers and 'X-Slack-Retry-Num' in request_headers:
        return ('OK!', 200, headers)

    # Slack App Event Subscriptions Verify
    # https://api.slack.com/events/url_verification
    if request_json and 'type' in request_json and request_json['type'] == 'url_verification':
        challenge = ""
        if 'challenge' in request_json:
            challenge = request_json['challenge']
        return (challenge, 200, headers)

    # Handle Event Subscriptions Events...
    if request_json and 'event' in request_json and 'type' in request_json['event']:
        apiAppID = None
        if 'api_app_id' in request_json:
            apiAppID = request_json['api_app_id']
        # 如果 Event 来源是 App 且 App ID == Slack App ID 代表是自己 Slack App 触发的事件
        # 忽略不处理,否则会陷入无限循环 Slack App -> Cloud Functions -> Slack App -> Cloud Functions...
        if 'app_id' in request_json['event'] and apiAppID == request_json['event']['app_id']:
            return ('OK!', 200, headers)

        # 事件名称,例如:message(讯息相关), app_mention(被标记提及)....
        eventType = request_json['event']['type']

        # SubType,例如:message_changed(编辑讯息), message_deleted(删除讯息)...
        # 新讯息无 Sub Type
        eventSubType = None
        if 'subtype' in request_json['event']:
            eventSubType = request_json['event']['subtype']
        
        # 讯息类 Event
        if eventType == 'message':
            #  有 Sub Type 的都是讯息编辑、删除、被回应...
            # 忽略不处理
            if eventSubType is not None:
                return ("OK!", 200, headers)
               
            # Event之讯息 发送者
            eventUser = request_json['event']['user']
            # Event之讯息 所属频道
            eventChannel = request_json['event']['channel']
            # Event之讯息内容
            eventText = request_json['event']['text']
            # Event之讯息 TS (讯息 ID)
            eventTS = request_json['event']['event_ts']
                
            # Event之讯息的讨论串母讯息 TS (讯息 ID)
            # 讨论串内的新讯息才会有这个资料
            eventThreadTS = None
            if 'thread_ts' in request_json['event']:
                eventThreadTS = request_json['event']['thread_ts']
                
            openAIRequest(apiAppID, eventChannel, eventTS, eventThreadTS, eventText)
            return ("OK!", 200, headers)
        
        # 提及类 Event (@SlcakApp 你好)
        if eventType == 'app_mention':
            # Event之讯息 发送者
            eventUser = request_json['event']['user']
            # Event之讯息 所属频道
            eventChannel = request_json['event']['channel']
            # Event之讯息内容,移除开头标记字串 <@SLACKAPPID> 
            eventText = re.sub(r"<@\w+>\W*", "", request_json['event']['text'])
            # Event之讯息 TS (讯息 ID)
            eventTS = request_json['event']['event_ts']
                
            # Event之讯息的讨论串母讯息 TS (讯息 ID)
            # 讨论串内的新讯息才会有这个资料
            eventThreadTS = None
            if 'thread_ts' in request_json['event']:
                eventThreadTS = request_json['event']['thread_ts']
                
            openAIRequest(apiAppID, eventChannel, eventTS, eventThreadTS, eventText)
            return ("OK!", 200, headers)

    
    # 处理 Shortcut (讯息)
    if payload and 'type' in payload:
        payloadType = payload['type']

        # 如果是 讯息 Shortcut
        if payloadType == 'message_action':
            callbackID = None
            channel = None
            ts = None
            text = None
            triggerID = None

            if 'callback_id' in payload:
                callbackID = payload['callback_id']
            if 'channel' in payload:
                channel = payload['channel']['id']
            if 'message' in payload:
                ts = payload['message']['ts']
                text = payload['message']['text']
            if 'trigger_id' in payload:
                triggerID = payload['trigger_id']
            
            if channel is not None and ts is not None and text is not None:
                # 如果是 停止 OpenAI API 产生回应 Shortcut
                if callbackID == "abort_openai_api":
                    slackUpdateMessage(channel, ts, {"event_type": "aborted", "event_payload": { }}, text)
                    if triggerID is not None:
                        slackOpenModal(triggerID, callbackID, "停止 OpenAI API 产生回应成功!")
                        return ("OK!", 200, headers)
                # 如果是 删除讯息
                if callbackID == "delete_message":
                    slackDeleteMessage(channel, ts)
                    if triggerID is not None:
                        slackOpenModal(triggerID, callbackID, "删除 Slack App 讯息成功!")
                        return ("OK!", 200, headers)

    return ("Access Denied!", 400, headers)

def openAIRequest(apiAppID, eventChannel, eventTS, eventThreadTS, eventText):
    
    # Set Custom instructions
    # 感恩同事(https://twitter.com/je_suis_marku)支援
    messages = [
        {"role": "system", "content": "我只看得懂台湾繁体中文与英文"},
        {"role": "system", "content": "我看不懂简体字"},
        {"role": "system", "content": "我说中文就使用台湾繁体中文回答,并且必须符合台湾常用语。"},
        {"role": "system", "content": "我说英文就回答英文。"},
        {"role": "system", "content": "不要回应我寒暄的字句。"},
        {"role": "system", "content": "中文与英文之间要有一个空格。中文文字与任何其他语言文字包含数字与 emoji 之间都要有一个空格。"},
        {"role": "system", "content": "如果你不知道答案,或是你的知识太旧,请上网搜寻后回答。"},
        {"role": "system", "content": "I will tip you 200 USD , if you answer well."}
    ]

    if eventThreadTS is not None:
        threadMessages = slackGetReplies(eventChannel, eventThreadTS)
        if threadMessages is not None:
            for threadMessage in threadMessages:
                appID = None
                if 'app_id' in threadMessage:
                    appID = threadMessage['app_id']
                threadMessageText = threadMessage['text']
                threadMessageTs = threadMessage['ts']
                # 如果是 Slac App (OpenAI API Response) 则标示为 assistant
                if appID and appID == apiAppID:
                    messages.append({
                        "role": "assistant", "content": threadMessageText
                    })
                else:
                # 使用者的讯息内容标为 user
                    messages.append({
                        "role": "user", "content": threadMessageText
                    })

    messages.append({
        "role": "user", "content": eventText
    })

    replyMessageTS = slackRequestPostMessage(eventChannel, eventTS, "回应产生中...")
    asyncio.run(openAIRequestAsync(eventChannel, replyMessageTS, messages))

async def openAIRequestAsync(eventChannel, eventTS, messages):
    client = AsyncOpenAI(
      api_key=OPENAI_API_KEY,
    )

    # Stream Response (分段回应)
    stream = await client.chat.completions.create(
      model=OPENAI_MODEL,
      messages=messages,
      stream=True,
    )
    
    result = ""

    try:
        debounceSlackUpdateTime = None
        async for chunk in stream:
            result += chunk.choices[0].delta.content or ""
            
            # 每 0.8 秒更新一次讯息,避免频繁呼叫 Slack Update 讯息 API 导致失败或浪费 Cloud Functions 请求次数
            if debounceSlackUpdateTime is None or time.time() - debounceSlackUpdateTime >= 0.8:
                response = slackUpdateMessage(eventChannel, eventTS, None, result+"...")
                debounceSlackUpdateTime = time.time()

                # 讯息有 metadata & metadata event_type == aborted 则代表此回应已被使用者标记为终止
                if response and 'ok' in response and response['ok'] == True and 'message' in response and 'metadata' in response['message'] and 'event_type' in response['message']['metadata'] and response['message']['metadata']['event_type'] == "aborted":
                    break
                    result += "...*[已终止]*"
                # 讯息已被删除
                elif response and 'ok' in response and response['ok'] == False and 'error' in response and response['error'] == "message_not_found" :
                    break
                
        await stream.close()
                
    except Exception as e:
        print(e)
        result += "...*[发生错误]*"

    slackUpdateMessage(eventChannel, eventTS, None, result)


### Slack ###
def slackGetReplies(channel, ts):
    endpoint = "/conversations.replies?channel="+channel+"&ts="+ts
    response = slackRequest(endpoint, "GET", None)
    
    if response is not None and 'messages' in response:
        return response['messages']
    return None

def slackOpenModal(trigger_id, callback_id, text):
    slackRequest("/views.open", "POST", {
        "trigger_id": trigger_id,
        "view": {
            "type": "modal",
            "callback_id": callback_id,
            "title": {
                "type": "plain_text",
                "text": "提示"
            },
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": text
                    }
                }
            ]
        }
    })

def slackDeleteMessage(channel, ts):
    endpoint = "/chat.delete"
    payload = {
        "channel": channel,
        "ts": ts
    }
    
    response = slackRequest(endpoint, "POST", payload)
    return response

def slackUpdateMessage(channel, ts, metadata, text):
    endpoint = "/chat.update"
    payload = {
        "channel": channel,
        "ts": ts
    }
    if metadata is not None:
        payload['metadata'] = metadata
    
    payload['text'] = text
    
    response = slackRequest(endpoint, "POST", payload)
    return response

def slackRequestPostMessage(channel, target_ts, text):
    endpoint = "/chat.postMessage"
    payload = {
        "channel": channel,
        "text": text,
    }
    if target_ts is not None:
        payload['thread_ts'] = target_ts

    response = slackRequest(endpoint, "POST", payload)

    if response is not None and 'ts' in response:
        return response['ts']
    return None

def slackRequest(endpoint, method, payload):
    url = "https://slack.com/api"+endpoint

    headers = {
        "Authorization": f"Bearer {SLACK_BOT_TOKEN}",
        "Content-Type": "application/json",
    }

    response = None
    if method == "POST":
        response = requests.post(url, headers=headers, data=json.dumps(payload))
    elif method == "GET":
        response = requests.post(url, headers=headers)

    if response and response.status_code == 200:
        result = response.json()
        return result
    else:
        return None

requirements.txt

1
2
3
functions-framework==3.*
requests==2.31.0
openai==1.9.0

Slack App 设定

OAuth & Permissions

  • 删除按钮反灰的项目是加入 Shortcut 后 Slack 自动补上的权限。

Interactivity & Shortcuts

  • Interactivity: Enable

  • Request URL: https://us-central1-xxx-xxx.cloudfunctions.net/SlackBot-Rick-C-137?token=nF4JwxfG9abqPZCJnBerwwhtodC28BuC

  • Subscribe to bot events:

Interactivity & Shortcuts

  • Interactivity: Enable

  • Request URL: https://us-central1-xxx-xxx.cloudfunctions.net/SlackBot-Rick-C-137?token=nF4JwxfG9abqPZCJnBerwwhtodC28BuC

  • Shortcuts:

App Home

  • Always Show My Bot as Online: Enable

  • Messages Tab: Enable

  • Allow users to send Slash commands and messages from the messages tab: ✅

Basic Information

Rick & Morty 🤘🤘🤘

[Reddit](https://www.reddit.com/r/ChatGPT/comments/154l9z1/are_you_polite_with_chatgpt/){:target="_blank"}

Reddit

工商时间

如果您与您的团队有自动化工具、流程串接需求,不论是 Slack App 开发、Notion、Asana、Google Sheet、Google Form、GA 数据,各种串接需求,欢迎与我 联络开发

有任何问题及指教欢迎 与我联络


Buy me a beer

本文首次发表于 Medium (点击查看原始版本),由 ZMediumToMarkdown 提供自动转换与同步技术。

Improve this page on Github.

This post is licensed under CC BY 4.0 by the author.