将 ChatGPT 接入 QQ 机器人
2022-04-09
Reference
- OpenAI API (GPT3 as a candidate)
- reversed ChatGPT API
- Go-CQHttp
- Celery
- Django-Redis
How to
The Celery broker is Redis, since my bot only accepts numerous messages, the RabbitMQ is too excessive for me.
Every time user sends a message which not captured by the normal robot rules will be passed to ChatGPT or GPT-3 celery task.
GPT-3
simple GPT-3 compeletion
@app.task(bind=True, expires=60, ignore_result=True, rate_limit='10/m')
def get_GPT3_reply(self, prompt, bot_id, user_id, group_id=None):
# global
logger = create_logger(taskid='gpt3-{}'.format(self.request.id))
start_time = time.time()
logger.info.info('start asking gpt3 question: %s', prompt)
try:
resp = generate_response(prompt)
except openai.error.APIConnectionError as e:
logger.warning.warn('openAI api failure: %s', e)
time.sleep(3) # cool down
resp = generate_response(prompt)
except openai.error.RateLimitError as e:
logger.warning.warn('openAI api rate limit: %s', e)
time.sleep(3)
resp = generate_response(prompt)
logger.info.info('got gpt3 response, elapsed=%f, resp=%s', time.time()-start_time, resp)
api = get_onebot_api(bot_id=bot_id).with_logger(logger.info)
if group_id:
api.send_group_msg(group_id=group_id, message=resp)
else:
api.send_private_msg(user_id=user_id, message=resp)
reversed ChatGPT
Below gives the main chat event loop. For every group, if somebody triggers a chat conversation with the bot, the celery will create a Job to preserve the context, using the Long polling method to check whether have a new message. using Redis SETNX to lock the conversation.
@redis_lock(key=lambda self, bot_id, user_id, prompt: 'group_chatGPT_conversation-{}-{}'.format(bot_id, user_id))
def group_chatGPT_conversation(self, bot_id, group_id, prompt):
# global
logger = create_logger(taskid='chatGPT-{}'.format(self.request.id))
api = get_onebot_api(bot_id=bot_id).with_logger(logger.info)
start_time = get_datetime_now()
session_lifetime, sleep_time = 300, 3
logger.info.info('group_chatGPT_conversation logging in group %s, bot=%s', group_id, bot_id)
chat = newConversation()
logger.info.info('group_chatGPT_conversation logged, elapsed %f', (get_datetime_now() - start_time).seconds)
start_time = get_datetime_now()
while get_datetime_now() < start_time + timedelta(seconds=session_lifetime):
# get conversation response
if prompt:
resp = getResponse(chat, prompt).strip()
if not resp:
logger.warning.warn('chatGPT empty response, prompt=%s', prompt)
# tell user bot is down
ret, err = api.send_group_msg('🤖️脑子瓦特了,群主快来!\nempty Response', group_id)
if err:
logger.error.error('send response to bot failed, err=%s, resp=%s', err, ret)
break
ret, err = api.send_group_msg(resp, group_id)
if err:
logger.error.error('send response to bot failed, err=%s, resp=%s', err, ret)
break
logger.info.info(
'group conversation response, elapsed=%f, usersay=%s, botsay=%s',
(get_datetime_now()-start_time).seconds, prompt, resp)
prompt = ''
# get new input from user after sleep
time.sleep(sleep_time)
sleep_time += sleep_time * random.random()
at_prefix = f'[CQ:at,qq={bot_id}]'
message = OneBotEventTab.objects.filter(
self_id=bot_id,
group_id=group_id,
post_type=OneBotEventTab.POST_TYPE_MESSAGE,
message_type=OneBotEventTab.MESSAGE_TYPE_GROUP,
time__gt=start_time,
message__startswith=at_prefix,
).order_by('-time').first()
if message:
logger.info.info('got new group conversation user message, message=%s', message.message)
prompt = message.message.replace(at_prefix, '').strip()
start_time = message.time
sleep_time = 3
logger.info.info('group_chatGPT_conversation ended')
group_chatGPT_conversation_is_locked = group_chatGPT_conversation.is_locked
group_chatGPT_conversation_get_cache_key = group_chatGPT_conversation.get_cache_key
group_chatGPT_conversation = app.task(
bind=True, expires=3600 * 4, ignore_result=True, throws=(LockError,))(group_chatGPT_conversation)
@app.task
def on_group_chatGPT_conversation(request, exc, traceback):
# global
logger = create_logger(taskid='chatGPT-{}'.format(request.id))
bot_id, group_id, _ = request.args
api = get_onebot_api(bot_id=bot_id)
task_id = group_chatGPT_conversation_get_cache_key(None, bot_id, group_id, '')
# log
logger.error.error('group_chatGPT_conversation failed, exc=%s, trace=%s', exc, traceback)
if not isinstance(exc, LockError):
api.send_group_msg('🤖️脑子瓦特了,群主快来!', group_id)
cache.delete(task_id)
def start_group_chatGPT_conversation(bot_id, group_id, prompt):
if not group_chatGPT_conversation_is_locked(None, bot_id, group_id, ''):
group_chatGPT_conversation.apply_async(
args=(bot_id, group_id, prompt),
ignore_result=False, # to monitor whether the task has done
link_error=on_group_chatGPT_conversation.s(),
)
Performance
The bottleneck is OpenAI API, after my test, the average logging operation cost around 7 seconds, and each request needs around 6 secs. It's very high latency. So I put all the OpenAI-related requests into the async queue, every request has a specified worker sub-process in celery. Which also is a high resource waste. Can consider using Eventlet. Current performance depends on how many workers(sub-processes) were created.