Know Your Wisdom

将 ChatGPT 接入 QQ 机器人

2022-04-09

Reference

How to

The Celery broker is Redis, since my bot only accepts numerous messages, the RabbitMQ is too excessive for me.

Every time user sends a message which not captured by the normal robot rules will be passed to ChatGPT or GPT-3 celery task.

GPT-3

simple GPT-3 compeletion

@app.task(bind=True, expires=60, ignore_result=True, rate_limit='10/m')
def get_GPT3_reply(self, prompt, bot_id, user_id, group_id=None):
    # global
    logger = create_logger(taskid='gpt3-{}'.format(self.request.id))
    start_time = time.time()
    logger.info.info('start asking gpt3 question: %s', prompt)
    try:
        resp = generate_response(prompt)
    except openai.error.APIConnectionError as e:
        logger.warning.warn('openAI api failure: %s', e)
        time.sleep(3)   # cool down
        resp = generate_response(prompt)
    except openai.error.RateLimitError as e:
        logger.warning.warn('openAI api rate limit: %s', e)
        time.sleep(3)
        resp = generate_response(prompt)
    logger.info.info('got gpt3 response, elapsed=%f, resp=%s', time.time()-start_time, resp)
    api = get_onebot_api(bot_id=bot_id).with_logger(logger.info)
    if group_id:
        api.send_group_msg(group_id=group_id, message=resp)
    else:
        api.send_private_msg(user_id=user_id, message=resp)

reversed ChatGPT

Below gives the main chat event loop. For every group, if somebody triggers a chat conversation with the bot, the celery will create a Job to preserve the context, using the Long polling method to check whether have a new message. using Redis SETNX to lock the conversation.

@redis_lock(key=lambda self, bot_id, user_id, prompt: 'group_chatGPT_conversation-{}-{}'.format(bot_id, user_id))
def group_chatGPT_conversation(self, bot_id, group_id, prompt):
    # global
    logger = create_logger(taskid='chatGPT-{}'.format(self.request.id))
    api = get_onebot_api(bot_id=bot_id).with_logger(logger.info)
    start_time = get_datetime_now()
    session_lifetime, sleep_time = 300, 3
    logger.info.info('group_chatGPT_conversation logging in group %s, bot=%s', group_id, bot_id)
    chat = newConversation()
    logger.info.info('group_chatGPT_conversation logged, elapsed %f', (get_datetime_now() - start_time).seconds)
    start_time = get_datetime_now()
    while get_datetime_now() < start_time + timedelta(seconds=session_lifetime):
        # get conversation response
        if prompt:
            resp = getResponse(chat, prompt).strip()
            if not resp:
                logger.warning.warn('chatGPT empty response, prompt=%s', prompt)
                # tell user bot is down
                ret, err = api.send_group_msg('🤖️脑子瓦特了,群主快来!\nempty Response', group_id)
                if err:
                    logger.error.error('send response to bot failed, err=%s, resp=%s', err, ret)
                break
            ret, err = api.send_group_msg(resp, group_id)
            if err:
                logger.error.error('send response to bot failed, err=%s, resp=%s', err, ret)
                break
            logger.info.info(
                'group conversation response, elapsed=%f, usersay=%s, botsay=%s',
                (get_datetime_now()-start_time).seconds, prompt, resp)
            prompt = ''

        # get new input from user after sleep
        time.sleep(sleep_time)
        sleep_time += sleep_time * random.random()
        at_prefix = f'[CQ:at,qq={bot_id}]'
        message = OneBotEventTab.objects.filter(
            self_id=bot_id,
            group_id=group_id,
            post_type=OneBotEventTab.POST_TYPE_MESSAGE,
            message_type=OneBotEventTab.MESSAGE_TYPE_GROUP,
            time__gt=start_time,
            message__startswith=at_prefix,
        ).order_by('-time').first()
        if message:
            logger.info.info('got new group conversation user message, message=%s', message.message)
            prompt = message.message.replace(at_prefix, '').strip()
            start_time = message.time
            sleep_time = 3
    logger.info.info('group_chatGPT_conversation ended')
        

group_chatGPT_conversation_is_locked = group_chatGPT_conversation.is_locked
group_chatGPT_conversation_get_cache_key = group_chatGPT_conversation.get_cache_key
group_chatGPT_conversation = app.task(
    bind=True, expires=3600 * 4, ignore_result=True, throws=(LockError,))(group_chatGPT_conversation)
        

@app.task
def on_group_chatGPT_conversation(request, exc, traceback):
    # global
    logger = create_logger(taskid='chatGPT-{}'.format(request.id))
    bot_id, group_id, _ = request.args
    api = get_onebot_api(bot_id=bot_id)
    task_id = group_chatGPT_conversation_get_cache_key(None, bot_id, group_id, '')

    # log
    logger.error.error('group_chatGPT_conversation failed, exc=%s, trace=%s', exc, traceback)
    if not isinstance(exc, LockError):
        api.send_group_msg('🤖️脑子瓦特了,群主快来!', group_id)
        cache.delete(task_id)


def start_group_chatGPT_conversation(bot_id, group_id, prompt):
    if not group_chatGPT_conversation_is_locked(None, bot_id, group_id, ''):
        group_chatGPT_conversation.apply_async(
            args=(bot_id, group_id, prompt),
            ignore_result=False,    # to monitor whether the task has done
            link_error=on_group_chatGPT_conversation.s(),
        )

Performance

The bottleneck is OpenAI API, after my test, the average logging operation cost around 7 seconds, and each request needs around 6 secs. It's very high latency. So I put all the OpenAI-related requests into the async queue, every request has a specified worker sub-process in celery. Which also is a high resource waste. Can consider using Eventlet. Current performance depends on how many workers(sub-processes) were created.