Gunosyデータ分析ブログ

Gunosyで働くデータエンジニアが知見を共有するブログです。

ChatGPTを活用した業務支援ツール「ウデキキ」のチャット実装

はじめに

こんにちは。最近所属が変わり、新しく新規事業開発チームに配属された上村です。 本記事は、Gunosyアドベントカレンダー2023 13日目の記事です。 昨日の記事は、田辺さんの「A/Bテストの情報過多と戦う」でした。

今回は、新規事業開発チームで開発している「ウデキキ」という業務支援ツールについてご紹介します。 また、ウデキキのチャット機能の実装についてもご紹介します。

「ウデキキ」とは

サービス紹介

ウデキキ

ウデキキは、ChatGPTという強力な自然言語処理モデルを活用した業務支援特化の生成AIサービスです。

通常、ChatGPTから望ましい回答を引き出すためには、LLM(Language Model Learning)の特性を理解し、適切なプロンプトを設定する専門知識が必要です。 これには、研究論文の読解や英語のスキルが必要とされることもあります。 しかし、ウデキキを利用することで、そんな面倒な手間を省くことができます。 ウデキキでは、事前に最適なプロンプトがセットされたスキルを選択することで、目的に応じたサポートを受けることができます。

例えば、アイデアブレストや文章校正、エクセル支援サポートなど、さまざまな業務に特化したスキルが用意されており、それぞれの目的に最適な回答を生成することができます。 ウデキキを活用することで、専門知識や英語のスキルに頼らずに、効率的かつ正確な情報を得ることができます。 また、AIの回答によって業務の効率化や品質向上が期待できるため、生産性の向上にもつながります。

ウデキキのチャット機能

ウデキキのバックエンドでは、Azure OpenAI の ChatGPT *1と、ChatGPTなどの大規模言語モデルの機能拡張を効率的に実装するためのフレームワークである LangChain*2 を組み合わせて、チャット機能を実装しています。これにより、ユーザーは自然な対話形式で質問やメッセージを入力し、ウデキキはChatGPTを活用することで、適切な回答を生成します。

チャット機能の実装について

ChatGPTの生成について

チャットの実装において、通常の ChatGPT 呼び出しでは、レスポンスが全て生成されるまで出力を待つ必要があります。 これは、ユーザーが入力した質問やメッセージに対して、一度に全ての回答が生成されるまで待たなければならないということを意味します。

しかし、この方法では、レスポンスの生成に時間がかかる場合や、長いテキストの場合には、ユーザーとの対話が途切れてしまう可能性があります。 ユーザーは、自分の質問やメッセージに対する回答を待ちながら、何も表示されない状態になり、不便を感じることがあります。

この問題を解決するために、LangChain の ストリーミング Callback を利用することが有効です。

LangChainのストリーミング Callback

Callback を利用することで、ストリーミング形式でレスポンスを出力することができます。つまり、レスポンスが生成されるたびに、その都度ユーザーに表示することができます。

具体的には、ユーザーからの入力をChatGPTに送信し、レスポンスが生成されるたびに、その都度ユーザーに対して回答を表示することができます。これにより、ユーザーは待つことなく、リアルタイムで回答を受け取ることができます。

Callback を利用することで、ユーザーとの対話がスムーズに進み、待ち時間がなくなるため、より良いユーザーエクスペリエンスを提供することができます。

標準的なストリーミング Callback を用いた実装例を以下に示します。 基本的なコードの構成はこちらの記事を参考にしています。

import asyncio

from langchain import ConversationChain
from langchain.chat_models import AzureChatOpenAI
from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler

# 実際は、APIにて非同期処理を行うため、Async Callback Handler を利用
async_callback = AsyncIteratorCallbackHandler()
llm = AzureChatOpenAI(
    ...,
    streaming=True,
    callback_manager=CallbackManager([async_callback]),
)

# 必要に応じて、Prompt, Memory を設定
prompt = ...
memory = ...
chain = ConversationChain(llm=llm, prompt=prompt, memory=memory)


# 実行環境に合わせて、非同期実行する関数を定義
async def stream_conversation(chain, async_callback):
    async def predict():
    """生成のリクエストが異常終了した場合でも、チャットが終了するようにイベントを設定"""
        try:
            await chain.apredict(input="こんにちは")
        finally:
            # done 自体は Callback Class に定義されている
            async_callback.done.set()

    task = asyncio.create_task(predict())
    async for token in async_callback.aiter():
        yield token
    await task

Langchain 標準の Callback の課題

しかし、LangChain 標準の StreamingCallback では、正常に動作しないことがあるのが課題でした。 特に、生成された文章が長い場合には、文章が途切れてしまうことがよくある印象です。

この問題の原因は、Callback 内での asyncio.wait() の処理方法にあります。 2023年12月現在の実装では、ChatGPTがトークンを生成するたびに queue にトークンが詰められて、都度吐き出され、ChatGPTが生成を終了したことをdone Event (生成が終了したことを示すフラグのようなもの)で管理するような形になっています。しかし、それらを管理するための asyncio.wait()FIRST_COMPLETED モードで非同期タスクを監視しているため、 done Event が早いうちにセットされると、queue (生成結果のトークンがつめられているキュー) の中身が残っていてもwhileループから抜けてしまい、応答が終了してしまうことがあるみたいです*3。この結果、生成された文章が長い場合には、途中で文章が切れてしまう問題が生じているようでした。

このような問題点が存在するため、ウデキキでは独自のカスタムな Callback を開発し利用しています。このカスタムな Callbackでは、上記の問題を解決するために適切な処理方法を採用しています。

CustomCallback の実装

CustomCallback では、従来の asyncio.wait()FIRST_COMPLETED モードで非同期タスクを監視する方法をやめています。 具体的には、ChatGPT が文章の生成を完了した後に EOS_TOKEN を追加することで、queue の中身が空になるまでwhileループを回すようにしています。

import asyncio
from typing import Any, AsyncIterator, cast

from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler
from langchain.schema.output import LLMResult

EOS_TOKEN = "<__EOS__>"  # LLMの応答が終わったことを示すトークン (実際のトークンと被らなければどんな値でもよい)


class CustomAsyncIteratorCallbackHandler(AsyncIteratorCallbackHandler):
    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        self.queue.put_nowait(EOS_TOKEN)  # LLMの応答が終わったら EOS_TOKEN を queue に詰める
        self.done.set()

    async def aiter(self) -> AsyncIterator[str]:
        future = None
        while not self.queue.empty() or not self.done.is_set():
            done, future = await asyncio.wait(
                [asyncio.ensure_future(self.queue.get())],
                # done がセットされている場合は、token を取り出すだけなので短めのタイムアウトを設定しておく
                timeout=None if not self.done.is_set() else 5.0,
            )
            if future:
                # 基本処理が詰まったりするなどなければこの分岐には入らないはず
                # 何らかの影響で timeout などになった場合は、ハンドリングとして処理を終了させる
                break

            token = cast(str, done.pop().result())
            if token == EOS_TOKEN:
                # EOS_TOKEN が queue に入ってきたら、応答生成が終わったと判断して処理を終了させる
                break

            yield token


# 利用する際には、指定する Callback を新しく定義した CustomCallback に置き換えるだけ
async_callback = AsyncIteratorCallbackHandler()
llm = AzureChatOpenAI(
    ...,
    streaming=True,
    callback_manager=CallbackManager([async_callback]),
)

この処理方法により、生成された文章が長い場合でも途切れることなく完全な回答を出力することができるため、ユーザーはスムーズな対話を楽しむことができます。

まとめ

今回のチャット実装の紹介では、CustomCallbackを利用したストリーミング形式の回答生成についてご紹介しました。 さらに実際のサービスでは、ユーザーの入力に対して、適切な回答を生成するために、プロンプトの最適化やチャット履歴の活用など、様々な工夫を行っています。

今後もウデキキは、ユーザーのニーズに合わせた高品質なチャット機能を提供するために、継続的な改善と技術の進化を行っていきます。ユーザーのフィードバックや要望にも耳を傾けながら、より使いやすく効果的な業務支援ツールとしてのウデキキを提供していきます。

以上が、ウデキキのチャット機能とCustomCallbackについてのご紹介でした。ウデキキを活用して、効率的で正確な情報の提供や業務の効率化を実現しましょう。

最後に

以上の文章は、ウデキキを通して生成されました。

ウデキキは、強力な自然言語処理モデルを活用した業務支援特化の生成AIサービスです。 現時点では、ウデキキはチャット機能がメインですが、今後はRAGやその他生成AIを活用した様々な機能を提供していく予定です。 ぜひ、ウデキキのサービスをご活用いただき、効率的な業務支援を体験していただければ幸いです。

さらに、お知らせです。ウデキキでは、新規契約を受け付けております。 新規契約やお問い合わせに関しては、ウデキキの公式ウェブサイトからご連絡ください。

明日は、fujishiroさんの「tfaction を導入したら便利だった話」です。お楽しみに!