テックブログ

はじめに:なぜ Go + Python のハイブリッド構成か

はじめに:なぜ Go + Python のハイブリッド構成か

本記事では、Go言語のWebフレームワーク `chi` と、PythonのLLMライブラリ `LangGraph` を組み合わせたハイブリッドなバックエンドアーキテクチャについて、ドメイン駆動設計(DDD)の観点から解説します。公開APIはGoに一元化し、AI/LLM関連の重い処理はPythonに分離するこの構成は、それぞれの言語の得意分野を活かすための設計判断です。現在、LangGraphの公式実装はPythonのみであり、Go実装にはプロダクション採用レベルのものが存在しないという実情があります。このアーキテクチャでは、認証、認可、レートリミット、監査ログといった横断的関心事をGo製のAPI Gateway層に集約することで、堅牢性とメンテナンス性を両立させることを目指します。

採用アーキテクチャの全体像

本構成では、クリーンアーキテクチャとヘキサゴナルアーキテクチャ(ポーツ&アダプターズ)の思想を基盤に、戦術的DDDのパターンを適用します。これにより、ビジネスロジックを中心とした疎結合でテスト容易性の高いシステムを構築します。

Go (chi) 側の DDD ディレクトリ構成

Go側のAPI Gatewayは、以下のようなディレクトリ構成を採用しています。これは一般的なDDDのレイヤードアーキテクチャに沿っており、各レイヤーの責務を明確に分離しています。

internal/
├── interfaces/http/
│   ├── controller/   # 1 endpoint = 1 method
│   ├── schema/       # request/response + validate tag
│   ├── mapper/       # DTO ⇔ Schema
│   ├── route/        # chi のルート登録
│   └── presenter/    # error → HTTP status
├── application/<ctx>/
│   ├── dto/
│   ├── port/         # 外部サービス interface
│   └── usecase/      # 1 FE endpoint = 1 file
├── domain/<ctx>/
│   ├── entity/
│   ├── value_object/
│   ├── repository/   # interface
│   └── error/
└── infrastructure/
    ├── persistence/
    │   ├── sqlc/queries/   # *.sql
    │   ├── sqlc/gen/       # sqlc generate 出力
    │   └── repository/     # domain interface 実装
    └── dispatcher/         # 外部API集約

レイヤごとのサンプルコードと責務

Domain Entity

ドメインエンティティは、ビジネスの中核となる概念とルールを表現します。単なるデータ構造ではなく、不変条件を保護するためのロジックを持ちます。`Validate()`のようなメソッドを配置し、エンティティが常に正当な状態であることを保証します。

// internal/domain/subject/entity/subject.go

package entity

type Subject struct {
	ID   string
	Name string
	Code SubjectCode
}

func (s *Subject) Validate() error {
	if s.ID == "" || s.Name == "" {
		return errors.New("ID and Name are required")
	}
	return s.Code.Validate()
}

Value Object

値オブジェクトは、ドメインの「値」を表現する不変のオブジェクトです。例えば、単なる`string`ではなく、特定のフォーマットを持つ「科目コード」として定義することで、生成時にバリデーションを強制し、不正な値の存在を防ぎます。これにより、システムの堅牢性が向上します。

// internal/domain/subject/value_object/subject_code.go

package value_object

import "fmt"

type SubjectCode string

func NewSubjectCode(code string) (SubjectCode, error) {
	if len(code) != 5 {
		return "", fmt.Errorf("invalid subject code format")
	}
	return SubjectCode(code), nil
}

func (sc SubjectCode) Validate() error {
    // ... more validation logic
    return nil
}

Repository, Domain Error, DTO

// internal/domain/subject/repository/subject_repository.go
package repository

import "context"

// ...

type SubjectRepository interface {
	FindAll(ctx context.Context) ([]*entity.Subject, error)
}

// internal/domain/subject/error/errors.go
package error

import "errors"

var ErrSubjectNotFound = errors.New("subject not found")

// internal/application/subject/dto/subject_dto.go
package dto

type SubjectDTO struct {
	ID   string
	Name string
	Code string
}

UseCase

// internal/application/subject/usecase/create_subject.go
package usecase

// ...

type CreateSubjectUseCase struct {
	repo repository.SubjectRepository
}

func (uc *CreateSubjectUseCase) Execute(ctx context.Context, inputDTO dto.CreateSubjectInputDTO) (*dto.SubjectDTO, error) {
	// ... business logic ...
	// 1. DTO to Entity
	// 2. repo.Save(entity)
	// 3. Entity to DTO
	return &dto.SubjectDTO{}, nil
}

Repository 実装 + sqlc

// internal/infrastructure/persistence/repository/subject_repository.go
package repository

import (
	"context"
	domain_repo "your-project/internal/domain/subject/repository"
	"your-project/internal/infrastructure/persistence/sqlc/gen"
)

type subjectRepository struct {
	q *gen.Queries
}

func NewSubjectRepository(q *gen.Queries) domain_repo.SubjectRepository {
	return &subjectRepository{q: q}
}

func (r *subjectRepository) FindAll(ctx context.Context) ([]*entity.Subject, error) {
	// sqlc generated method
	dbSubjects, err := r.q.ListSubjects(ctx)
	// ... map db model to entity
	return subjects, nil
}

-- internal/infrastructure/persistence/sqlc/queries/subject.sql
-- name: ListSubjects :many
SELECT * FROM subjects ORDER BY name;

Controller, Route, Presenter

// internal/interfaces/http/controller/subject_controller.go
func (c *SubjectController) ListSubjects(w http.ResponseWriter, r *http.Request) {
	// ... call usecase ...
	// ... map DTO to Schema ...
	presenter.JSON(w, http.StatusOK, schema)
}

// internal/interfaces/http/route/subject_route.go
func (sr *SubjectRoutes) RegisterRoutes(r *chi.Mux) {
	r.Get("/subjects", sr.controller.ListSubjects)
}

// internal/interfaces/http/presenter/presenter.go
func Error(w http.ResponseWriter, err error) {
	switch {
	case errors.Is(err, domain_error.ErrSubjectNotFound):
		JSON(w, http.StatusNotFound, map[string]string{"error": err.Error()})
	default:
		JSON(w, http.StatusInternalServerError, map[string]string{"error": "internal server error"})
	}
}

Python LangGraph 側のアーキテクチャ

Pythonで実装されたAIワーカーは、LangGraphを利用して複雑なLLMワークフローを構築します。ここでも重要なのは責務の分離です。LangGraphのワークフロー、ノード、ツールから、直接Go側のユースケースやリポジトリを呼び出すことはありません。必要なビジネスロジックは、Goのアプリケーション層で定義されたPort(インターフェース)を通じて抽象化され、Python側はそのインターフェースの実装(アダプター)を呼び出す形を取ります。

StateGraphの組み立て

# infrastructure/langgraph/problem_creation/workflows/problem_creation_workflow.py
from langgraph.graph import StateGraph, END
from .state import ProblemCreationState
from ..nodes import core_logic_node, tool_node

def create_problem_creation_workflow():
    workflow = StateGraph(ProblemCreationState)

    workflow.add_node("core_logic", core_logic_node)
    workflow.add_node("tools", tool_node)

    workflow.set_entry_point("core_logic")

    workflow.add_conditional_edges(
        "core_logic",
        lambda state: "tools" if state.get("tool_calls") else END,
    )
    workflow.add_edge("tools", "core_logic")

    return workflow.compile()

PortとAdapterによる依存性逆転

Goのアプリケーション層で定義されたPort(インターフェース)を、Python側で実装(アダプター)することで、依存性の逆転を実現します。LangGraphのワークフローは、具象クラスである`LangGraphShapeExtractor`ではなく、抽象インターフェースである`ShapeExtractorPort`に依存します。これにより、ビジネスロジックと具体的なLLM実装が分離され、テストや将来の変更が容易になります。

# application/problem/port/shape_extractor_port.py (Interface)
from abc import ABC, abstractmethod

class ShapeExtractorPort(ABC):

    @abstractmethod
    def extract_and_save_shapes(self, problem_id: str, image_url: str) -> None:
        pass

# infrastructure/langgraph/problem_creation/adapter/langgraph_shape_extractor.py (Implementation)
from application.problem.port.shape_extractor_port import ShapeExtractorPort
from ..workflows import create_problem_creation_workflow

class LangGraphShapeExtractor(ShapeExtractorPort):

    def extract_and_save_shapes(self, problem_id: str, image_url: str) -> None:
        workflow = create_problem_creation_workflow()
        # ... invoke workflow with inputs ...

Go ⇔ Python の接続: Redis Streams + Pub/Sub

同期的なCRUD処理はGoのAPI内で完結させますが、LLMが関わるような時間のかかる非同期処理は、Redis Streamsを介してGoからPythonワーカーに委譲します。Goのコントローラーはジョブをエンキューするだけですぐにレスポンスを返し、Pythonのワーカーがそのジョブを非同期に処理します。処理の進捗や結果はRedis Pub/Subを通じてリアルタイムにフロントエンドに通知されます。

// infrastructure/redis/job_publisher.go

func (p *JobPublisher) EnqueueProblemCreationJob(ctx context.Context, jobData map[string]interface{}) (string, error) {
	jobID, err := p.redisClient.XAdd(ctx, &redis.XAddArgs{
		Stream: "jobs:problem_creation",
		Values: jobData,
	}).Result()

	return jobID, err
}
# infrastructure/redis/problem_creation_stream_consumer.py

class ProblemCreationStreamConsumer:
    def __init__(self, redis_client, use_case):
        self.redis_client = redis_client
        self.use_case = use_case
        # ...

    def consume(self):
        while True:
            # XREADGROUP from "jobs:problem_creation"
            # ...
            self.use_case.execute(job_data)
            # XACK ...

FE リクエストの全フロー

同期CRUDパス (例: GET /api/v1/subjects)

sequenceDiagram
    participant FE as Frontend
    participant GW as Go API Gateway
    participant DB as PostgreSQL

    FE->>+GW: GET /api/v1/subjects
    GW->>GW: Middleware (Auth, CORS, etc)
    GW->>GW: Controller -> Usecase
    GW->>+DB: repo.FindAll()
    DB-->>-GW: Rows
    GW->>GW: Entity -> DTO -> Schema
    GW-->>-FE: 200 OK (JSON)
  1. FrontendがGo API Gatewayにリクエストを送信
  2. Goのmiddleware群(認証、CORS、ロギング)が実行
  3. `controller`がリクエストを受け取り、`usecase`を呼び出す
  4. `usecase`が`repository`(sqlc実装)を呼び出す
  5. `pgx`経由でPostgreSQLにクエリが発行される
  6. DBから返された行を`mapper`が`entity`に変換
  7. `usecase`が`entity`を`DTO`に変換して返す
  8. `controller`が`DTO`を`schema`に変換
  9. `presenter`がHTTPステータスコードとJSONレスポンスを生成し、Frontendに返す

非同期AIパス (例: POST /api/problem-creation/start)

sequenceDiagram
    participant FE as Frontend
    participant GW as Go API Gateway
    participant RS as Redis Streams
    participant PW as Python Worker
    participant LLM as AI/LLM Service
    participant RPubSub as Redis Pub/Sub

    FE->>+GW: POST /api/problem-creation/start
    GW->>+RS: XADD jobs:problem_creation
    RS-->>-GW: job_id
    GW-->>-FE: 202 Accepted (session_id)

    FE->>+GW: GET /sse/stream?sid=...
    GW->>+RPubSub: SUBSCRIBE sse:<sid>

    PW->>+RS: XREADGROUP
    RS-->>-PW: job
    PW->>+LLM: Execute LangGraph Workflow
    LLM-->>-PW: Result
    PW->>+RPubSub: PUBLISH sse:<sid>
    RPubSub-->>-GW: chunk
    GW-->>-FE: Stream chunk
  1. FrontendがGo API Gatewayに非同期処理開始リクエストを送信
  2. Goの`controller`が認可とバリデーションを行い、`usecase`を呼び出す
  3. `usecase`がRedis Streamsにジョブをエンキュー (`XADD`)
  4. Goは即座に`202 Accepted`とセッションIDをFrontendに返す
  5. Pythonの`stream-worker`がブロッキング読み取り (`XREADGROUP`) でジョブを取得
  6. ワーカーは`usecase`を起動し、Port経由でLangGraphワークフローを実行
  7. ワークフロー内でLLM呼び出しやDB書き込みが行われる
  8. 進捗があるたびに、ワーカーはRedis Pub/SubにイベントをPUBLISHする
  9. Frontendは別途EventSourceでSSEストリームに接続 (`/sse/stream`)
  10. GoのSSEハンドラがPub/SubをSUBSCRIBEし、受け取ったメッセージをそのままFrontendにプロキシする

テスト戦略

  • **Entity / Value Object**: ビジネスロジックの正当性を確認するため、単体テストを徹底します。
  • **UseCase**: 依存するRepositoryをインラインモックに差し替え、ユースケースのロジックのみをテストします。
  • **Repository**: `testcontainers-go`を使い、実際のPostgreSQLコンテナを起動して、DBとの結合をテストします。
  • **Controller**: `httptest`パッケージを利用し、UseCaseをモック化して、HTTPリクエストからレスポンスまでをテストします。
  • **E2E**: 主要なシナリオ(同期CRUD、非同期AIフローなど)に絞り、システム全体を通したEnd-to-Endテストを実施します。
Presentation Layer(Controllers, Schemas)Application Layer(Use Cases, DTOs)Domain Layer(Entities, VOs, Repositories)Infrastructure Layer(DB, External APIs)depends ondepends onimplements
クリーンアーキテクチャのレイヤー構造と依存関係の方向
Frontend(React/Next.js)Go API Gateway(chi)Redis Streams(Job Queue)Python Worker(LangGraph)LLM Service(OpenAI, etc)Database(PostgreSQL)
全体構成図: Go API GatewayとPython AIワーカーの連携