Skyframe StateMachines のガイド

7.3 · 7.2 · 7.1 · 7.0 · 6.5

概要

Skyframe StateMachine は、ヒープにある分解された関数オブジェクトです。必要な値がすぐに利用できないものの、非同期で計算される場合に、冗長性のない柔軟性1 の評価がサポートされます。StateMachine は待機中にスレッド リソースを結合できませんが、代わりに一時停止して再開する必要があります。したがって、逆コンストラクションにより明示的な再入ポイントが公開され、以前の計算をスキップできます。

StateMachine は、シーケンス、分岐、構造化論理同時実行を表現するために使用でき、Skyframe の操作用に特別に調整されています。StateMachine は、より大きな StateMachine にコンポーズしてサブ StateMachine を共有できます。コンカレンスは、常に構造的に階層的であり、純粋に論理的です。すべての同時実行サブタスクは、単一の共有親 SkyFunction スレッドで実行されます。

はじめに

このセクションでは、java.com.google.devtools.build.skyframe.state パッケージにある StateMachine について簡単に説明します。

Skyframe の再起動の概要

Skyframe は、依存関係グラフを並列評価するフレームワークです。グラフ内の各ノードは、パラメータを指定する SkyKey とその結果を指定する SkyValue による SkyFunction の評価に対応しています。計算モデルは、SkyFunction が SkyKey で SkyValue を検索し、追加の SkyFunction の再帰的な並列評価をトリガーするように設計されています。計算の一部のサブグラフが不完全であるために要求された SkyValue の準備ができていない場合、ブロックではなく、要求された SkyFunction が null getValue レスポンスを監視し、SkyValue の代わりに null を返し、入力がないために不完全であることを知らせます。以前にリクエストされた SkyValues がすべて使用可能になると、Skyframe は SkyFunctions を再起動します。

SkyKeyComputeState の導入前は、再起動を処理する従来の方法は、計算を完全に再実行することでした。これは複雑さは二次関数ですが、この方法で記述された関数は、再実行するたびに、null を返すルックアップが減るため、最終的には完了します。SkyKeyComputeState を使用すると、手動で指定したチェックポイント データを SkyFunction に関連付けることができるため、再計算を大幅に削減できます。

StateMachineSkyKeyComputeState 内に存在するオブジェクトであり、停止と再開の実行フックを公開することで、SkyFunction の再起動時にほぼすべての再計算を排除します(SkyKeyComputeState がキャッシュから外れないことを前提としています)。

SkyKeyComputeState 内のステートフル計算

オブジェクト指向設計の観点からは、純粋なデータ値ではなく、計算オブジェクトを SkyKeyComputeState 内に保存することを検討するのが理にかなっています。Java では、動作を伝播するオブジェクトの最小限の記述は関数インターフェースであり、これで十分です。StateMachine には、次の再帰的な定義があります2

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks インターフェースは SkyFunction.Environment に似ていますが、非同期用に設計されており、論理的に同時実行されるサブタスクのサポートが追加されています3

step の戻り値は別の StateMachine であり、一連のステップを帰納的に指定できます。StateMachine が完了すると、stepDONE を返します。例:

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

次の出力で StateMachine を記述します。

hello
world

step2StateMachine の関数インターフェース定義を満たしているため、メソッド参照 this::step2StateMachine です。メソッド参照は、StateMachine で次の状態を指定する最も一般的な方法です。

一時停止と再開

直感的に、計算をモノリシックな関数ではなく StateMachine ステップに分割すると、計算を停止して再開するために必要なフックが提供されます。StateMachine.step が返されると、明示的な停止ポイントがあります。返される StateMachine 値で指定された継続は、明示的な再開ポイントです。中断したところから計算を再開できるため、再計算を回避できます。

コールバック、継続、非同期計算

技術的な用語では、StateMachine は継続として機能し、実行される後続の計算を決定します。StateMachine は、ブロックを行う代わりに step 関数から戻ることで自発的に一時停止できます。この関数は制御を Driver インスタンスに戻します。その後、Driver は準備完了の StateMachine に切り替えるか、制御を Skyframe に戻すことができます。

従来、コールバック継続は 1 つのコンセプトに統合されていました。ただし、StateMachine では 2 つの区別が維持されます。

  • コールバック - 非同期計算の結果を保存する場所を指定します。
  • 継続 - 次の実行状態を指定します。

非同期処理を呼び出す場合はコールバックが必要です。つまり、SkyValue 検索の場合のように、メソッド呼び出し直後に実際の処理が実行されることはありません。コールバックはできるだけシンプルにする必要があります。

継続処理StateMachineStateMachine 戻り値であり、すべての非同期計算が解決された後に続く複雑な実行をカプセル化します。この構造化されたアプローチにより、コールバックの複雑さを管理しやすくなります。

タスク

Tasks インターフェースは、SkyKey で SkyValue を検索し、同時実行サブタスクをスケジュールする API を StateMachine に提供します。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}
StateMachine

SkyValue ルックアップ

StateMachineTasks.lookUp オーバーロードを使用して SkyValue を検索します。SkyFunction.Environment.getValueSkyFunction.Environment.getValueOrThrow に類似しており、例外処理のセマンティクスも同様です。この実装では、ルックアップはすぐに実行されず、可能な限り多くのルックアップをバッチ処理してから実行されます。値がすぐに利用できない場合があります(Skyframe の再起動が必要な場合など)。そのため、呼び出し元はコールバックを使用して、結果の値をどのように処理するかを指定します。4

StateMachine プロセッサ(Driver と SkyFrame へのブリッジ)は、次の状態が開始される前に値が使用可能であることを保証します。以下の例を

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

上記の例では、最初のステップで new Key() を検索し、コンシューマとして this を渡します。これは、DoesLookupConsumer<SkyValue> を実装しているため可能です。

契約により、次の状態 DoesLookup.processValue が開始される前に、DoesLookup.step のすべてのルックアップが完了します。したがって、valueprocessValue でアクセスされた場合に使用できます。

サブタスク

Tasks.enqueue は、論理的に同時実行されるサブタスクの実行をリクエストします。サブタスクも StateMachine であり、通常の StateMachine ができることは何でもできます。たとえば、サブタスクを再帰的に作成したり、SkyValue を検索したりできます。lookUp と同様に、ステートマシン ドライバは、次のステップに進む前にすべてのサブタスクが完了していることを確認します。以下の例を

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

Subtask1Subtask2 は論理的には同時実行されますが、すべてが単一スレッドで実行されるため、i の「同時実行」更新に同期は必要ありません。

構造化された同時実行

すべての lookUpenqueue は次の状態に進む前に解決する必要があるため、同時実行は当然ながらツリー構造に限定されます。次の例に示すように、階層型の 5 同時実行を作成できます。

構造化された同時実行

UML からは、同時実行構造がツリーを形成していることを判断するのは困難です。ツリー構造をより適切に示す代替ビューがあります。

非構造化の同時実行

構造化された同時実行は、推論がはるかに簡単です。

コンポジションと制御フロー パターン

このセクションでは、複数の StateMachine をコンポーズする方法の例と、特定の制御フローの問題の解決策について説明します。

連続状態

これは最も一般的でわかりやすい制御フロー パターンです。例については、SkyKeyComputeState 内のステートフル計算をご覧ください。

ブランチ

StateMachine の分岐状態は、次の例に示すように、通常の Java 制御フローを使用して異なる値を返すことで実現できます。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  
}

一部のブランチでは、早期に完了するために DONE を返すのが一般的です。

高度な順次合成

StateMachine 制御構造はメモリレスであるため、StateMachine 定義をサブタスクとして共有するのは難しい場合があります。M1M2StateMachineS を共有する StateMachine インスタンスとします。M1M2 はそれぞれシーケンス <A, S, B><X, S, Y> です。問題は、S が完了後に B に進むか Y に進むかを認識しておらず、StateMachine が呼び出しスタックを保持していないことです。このセクションでは、これを実現するための手法をいくつか確認します。

StateMachine を終端シーケンス要素として使用

これでは、最初に提示された問題は解決しません。共有 StateMachine がシーケンスの終端にある場合にのみ、シーケンシャル コンポジションを示します。

// S is the shared state machine.
class S implements StateMachine {  }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

これは、S 自体が複雑な状態マシンであっても機能します。

シーケンシャル コンポジションのサブタスク

キューに登録されたサブタスクは、次の状態になる前に必ず完了するため、サブタスク メカニズムを少し悪用する6こともできます。

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter インジェクション

S の実行前に完了する必要がある他の並列サブタスクや Tasks.lookUp 呼び出しがあるため、Tasks.enqueue を悪用できない場合があります。この場合、runAfter パラメータを S に挿入して、次に何を行うかを S に通知できます。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
     // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
     // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

このアプローチは、サブタスクを不正使用するよりもクリーンです。ただし、複数の StateMachinerunAfter でネストするなど、これを過度に適用すると、コールバック ヘルルに陥る可能性があります。代わりに、通常の連続状態を使用して連続した runAfter を分割することをおすすめします。

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

は次のように置き換えることができます。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止の代替: runAfterUnlessError

以前のドラフトでは、エラーが発生した時点で早期に中止する runAfterUnlessError を検討していました。これは、エラーが 2 回チェックされることがよくあるという事実に基づいています。1 回は runAfter 参照を持つ StateMachine によって、もう 1 回は runAfter マシン自体によってチェックされます。

慎重に検討した結果、エラーチェックの重複除去よりもコードの統一性が重要であると判断しました。runAfter メカニズムが tasks.enqueue メカニズムと整合して動作しない場合、エラー チェックが常に必要になります。

直接委任

正式な状態遷移が発生するたびに、メインの Driver ループが進みます。契約に基づき、状態を進めるということは、次の状態が実行される前に、以前にキューに登録されたすべての SkyValue ルックアップとサブタスクが解決されることを意味します。デリゲート StateMachine のロジックにより、フェーズ進展が不要または逆効果になることがあります。たとえば、委任先の最初の step が、委任元の状態の検索と並列化できる SkyKey 検索を実行する場合、フェーズ アドバンスにより、検索が順序付けられます。次の例に示すように、直接委任を行う方が適切な場合があります。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return ;
  }

  // Rest of implementation.
  

  private StateMachine complete(Tasks tasks) {
    
    return runAfter;
  }
}

データフロー

これまでの説明では、制御フローの管理に重点を置いてきました。このセクションでは、データ値の伝播について説明します。

Tasks.lookUp コールバックの実装

以下は、SkyValue ルックアップTasks.lookUp コールバックを実装する例です。このセクションでは、複数の SkyValue を処理する理由とアプローチについて説明します。

Tasks.lookUp コールバック

Tasks.lookUp メソッドは、コールバック sink をパラメータとして受け取ります。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

慣用的な方法では、Java ラムダを使用してこれを実装します。

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

ここで、myValue はルックアップを行う StateMachine インスタンスのメンバー変数です。ただし、ラムダには、StateMachine 実装で Consumer<SkyValue> インターフェースを実装する場合と比較して、追加のメモリ割り当てが必要です。このラムダは、あいまいになるような複数のルックアップがある場合にも有用です。

SkyFunction.Environment.getValueOrThrow と同様に、Tasks.lookUp のオーバーロードに対するエラー処理もあります。

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

実装例を以下に示します。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      
      return DONE;
    }
    // Processes `value`, which is non-null.
    
  }
}

エラー処理のないルックアップと同様に、StateMachine クラスがコールバックを直接実装すると、ラムバのメモリ割り当てを節約できます。

エラー処理で詳しく説明していますが、基本的には、エラーと通常の値の伝播に大きな違いはありません。

複数の SkyValue を使用する

多くの場合、複数の SkyValue ルックアップが必要になります。SkyValue のタイプに切り替えるのが、ほとんどの場合で有効なアプローチです。次の例は、プロトタイプの本番環境コードから簡素化したものです。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

値の型が異なるため、Consumer<SkyValue> コールバックの実装は明確に共有できます。そうでない場合は、ラムダベースの実装または適切なコールバックを実装する完全な内部クラス インスタンスにフォールバックできます。

StateMachine 間の値の伝播

このドキュメントでは、サブタスクで作業を整理する方法のみを説明してきましたが、サブタスクは呼び出し元に値を報告する必要もあります。サブタスクは論理的に非同期であるため、その結果はコールバックを使用して呼び出し元に通知されます。これを実現するため、サブタスクでは、コンストラクタを介して挿入されるシンク インターフェースを定義します。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

   // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

呼び出し元 StateMachine は次のようになります。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上記の例は、いくつかの点を示しています。Caller は結果を伝播し、独自の Caller.ResultSink を定義する必要があります。CallerBarProducer.ResultSink コールバックを実装します。再開時に、processResultvalue が null かどうかを確認し、エラーが発生したかどうかを判断します。これは、サブタスクまたは SkyValue ルックアップの出力を受け入れた後の一般的な動作パターンです。

acceptBarError の実装は、エラーバブリングで要求されているように、結果を Caller.ResultSink に積極的に転送します。

トップレベルの StateMachine の代替方法については、Driver と SkyFunctions へのブリッジをご覧ください。

エラー処理

Tasks.lookUp コールバックStateMachines 間での値のプロパゲーションに、エラー処理の例がいくつかあります。InterruptedException 以外の例外はスローされず、代わりに値としてコールバックを介して渡されます。このようなコールバックには、値またはエラーのいずれか 1 つが渡される排他的論理和のセマンティクスがよくあります。

次のセクションでは、Skyframe のエラー処理との微妙ながらも重要な操作について説明します。

エラーの再帰(--nokeep_going)

エラーバブリング中に、リクエストされたすべての SkyValue が使用可能でなくても、SkyFunction が再起動されることがあります。このような場合、Tasks API コントラクトにより、後続の状態に到達することはありません。ただし、StateMachine は引き続き例外を伝播する必要があります。

次の状態に到達したかどうかにかかわらず伝播が行われる必要があるため、エラー処理コールバックがこのタスクを実行する必要があります。内部 StateMachine の場合、これは親コールバックを呼び出すことで実現されます。

SkyFunction とインターフェースする最上位の StateMachine では、ValueOrExceptionProducersetException メソッドを呼び出します。その後、SkyValues が欠落している場合でも、ValueOrExceptionProducer.tryProduceValue は例外をスローします。

Driver が直接使用されている場合は、マシンが処理を完了していなくても、SkyFunction から伝播されたエラーを確認することが重要です。

イベント処理

イベントを出力する必要がある SkyFunctions の場合、StoredEventHandler が SkyKeyComputeState に挿入され、さらにそれを必要とする StateMachine に挿入されます。以前は、Skyframe が特定のイベントを再生しない限りドロップするため、StoredEventHandler が必要でしたが、これはその後修正されました。StoredEventHandler インジェクションが保持されるのは、エラー処理コールバックから出力されるイベントの実装が簡素化されるためです。

Driver と SkyFunctions へのブリッジ

Driver は、指定されたルート StateMachine から始まる StateMachine の実行を管理します。StateMachine はサブタスク StateMachine を再帰的にキューに追加できるため、1 つの Driver で複数のサブタスクを管理できます。これらのサブタスクは、構造化された同時実行の結果としてツリー構造を作成します。Driver は、サブタスク間で SkyValue ルックアップを一括処理して効率を高めます。

Driver を中心に構築されたクラスはいくつかあり、次の API があります。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver は、単一のルート StateMachine をパラメータとして受け取ります。Driver.drive を呼び出すと、Skyframe を再起動せずに StateMachine が実行されます。StateMachine が完了すると true を返し、それ以外の場合は false を返します。これは、すべての値が使用可能ではなかったことを示します。

DriverStateMachine の同時状態を維持できるため、SkyKeyComputeState への埋め込みに適しています。

Driver を直接インスタンス化する

StateMachine の実装では、通常、コールバックを介して結果が通知されます。次の例に示すように、Driver を直接インスタンス化できます。

Driver は、後で定義する対応する ResultSink の実装とともに、SkyKeyComputeState の実装に埋め込まれます。最上位レベルでは、State オブジェクトは Driver よりも長く存続することが保証されているため、計算結果の適切なレシーバーです。

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下のコードでは、ResultProducer をスケッチしています。

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

   // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
      // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

結果を遅延計算するコードは次のようになります。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

エンベディング Driver

StateMachine が値を生成し、例外を発生させない場合は、次の例に示すように、Driver を埋め込むこともできます。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
      // Implementation.
}

SkyFunction には、次のようなコードが含まれている場合があります(ここで、StateSkyKeyComputeState の関数固有の型です)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

DriverStateMachine 実装に埋め込むことは、Skyframe の同期コーディング スタイルに適しています。

例外を生成する可能性がある StateMachine

それ以外の場合は、SkyKeyComputeState に埋め込める ValueOrExceptionProducer クラスと ValueOrException2Producer クラスがあり、同期 SkyFunction コードに対応する同期 API があります。

ValueOrExceptionProducer 抽象クラスには次のメソッドが含まれています。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
      // Implementation.
  }

  protected final void setValue(V value)  {   // Implementation. }
  protected final void setException(E exception) {   // Implementation. }
}

これには埋め込みの Driver インスタンスが含まれており、埋め込みドライバResultProducer クラスによく似ており、同様の方法で SkyFunction とインターフェースがあります。実装では、ResultSink を定義する代わりに、どちらかが発生したときに setValue または setException を呼び出します。両方が発生した場合は、例外が優先されます。tryProduceValue メソッドは、非同期コールバック コードを同期コードにブリッジし、例外が設定されている場合は例外をスローします。

前述のように、エラーの発生時に、すべての入力が使用できないため、マシンがまだ完了していない場合でもエラーが発生する可能性があります。これに対応するため、tryProduceValue は、マシンが完了する前であっても、設定された例外をスローします。

エピローグ: コールバックの削除

StateMachine は、非同期計算を実行するための非常に効率的ですが、ボイラープレートを多用する方法です。継続(特に ListenableFuture に渡される Runnable の形式)は、Bazel コードの特定部分では広く使用されていますが、SkyFunctions 分析ではあまり使用されていません。分析はほとんど CPU バウンドであり、ディスク I/O に効率的な非同期 API はありません。最終的には、コールバックは習得に時間がかかり、読みやすさを損なうため、最適化することをおすすめします。

最も有望な代替手段の 1 つは、Java 仮想スレッドです。コールバックを記述する代わりに、すべてが同期ブロッキング呼び出しに置き換えられます。これが可能になるのは、プラットフォーム スレッドとは異なり、仮想スレッド リソースのタイアップは安価であるためです。ただし、仮想スレッドを使用する場合でも、単純な同期オペレーションをスレッド作成と同期のプリミティブに置き換えると、コストがかかりすぎることになります。StateMachine から Java 仮想スレッドに移行しましたが、速度が数桁遅くなり、エンドツーエンドの分析レイテンシがほぼ 3 倍になりました。仮想スレッドはまだプレビュー版の機能であるため、パフォーマンスが向上した後でこの移行を実行することもできます。

検討すべきもう 1 つの方法は、Loom コルーチンが利用可能になったら待機することです。この方法の利点は、協調的マルチタスクを使用して同期のオーバーヘッドを削減できることです。

それでも問題が解決しない場合は、低レベルのバイトコードの書き換えも有効な方法です。十分に最適化すると、手書きのコールバック コードに近いパフォーマンスを実現できる場合があります。

付録

コールバック地獄

コールバックの地獄は、コールバックを使用する非同期コードにおける悪名高い問題です。これは、後続のステップの継続が前のステップ内にネストされているという事実に起因しています。ステップが多い場合、このネストはかなり深くなる可能性があります。制御フローと組み合わせると、コードが管理不能になります。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

ネストされた実装の利点の一つは、外側のステップのスタック フレームを保持できることです。Java では、キャプチャされたラムダ変数は実質的に最終的である必要があるため、このような変数を使用するのは面倒です。次のように、ラムダではなく継続としてメソッド参照を返すことで、深いネストを回避できます。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

runAfter インジェクション パターンが高密度に使用された場合にもコールバック HEll が発生することがありますが、これは、連続するステップを注入することで回避できます。

例: 連鎖した SkyValue ルックアップ

多くの場合、アプリ ロジックで SkyValue ルックアップの依存チェーンが必要となることがあります。たとえば、2 つ目の SkyKey が 1 つ目の SkyValue に依存している場合などです。単純なことを考えてみると、複雑で深くネストされたコールバック構造になってしまいます。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

ただし、継続はメソッド参照として指定されるため、コードは状態遷移全体で手続き型に見えます。step2step1 に続くものです。ここでは、ラムダを使用して value2 を割り当てます。これにより、コードの順序が上から下の順に計算の順序と一致します。

その他のヒント

読みやすさ: 実行順序

可読性を高めるには、StateMachine.step の実装を実行順に保ち、コールバックの実装はコード内で渡される場所の直後に配置します。これは、制御フローが分岐する場所では常に実現できるとは限りません。このような場合は、追加のコメントが役立つ場合があります。

例: 連鎖された SkyValue ルックアップでは、この目的で中間メソッド参照が作成されます。これにより、パフォーマンスがわずかに低下しますが、読みやすさが向上します。

世代仮説

中程度の存続期間の Java オブジェクトは、非常に短い存続期間のオブジェクトまたは永続的に存続するオブジェクトを処理するように設計された Java ガベージ コレクタの世代仮説を破ります。定義上、SkyKeyComputeState 内のオブジェクトはこの仮説に違反します。このようなオブジェクトには、Driver をルートとする、まだ実行中のすべての StateMachine の作成済みツリーが含まれます。このオブジェクトは、非同期計算が完了するのを待機しながら一時停止するため、中間の存続期間があります。

JDK19 では問題が軽減されているようですが、StateMachine を使用すると、生成される実際のガベージが大幅に減少しても、GC 時間が長くなることがあります。StateMachine は中程度の存続期間があるため、オールド ジェネレーションに昇格し、オールド ジェネレーションがより速くいっぱいになる可能性があります。そのため、クリーンアップにメジャー GC またはフル GC が必要になります。

最初の予防策は StateMachine 変数の使用を最小限に抑えることですが、複数の状態で値が必要な場合などは、常に実現できるとは限りません。可能な場合、ローカル スタックの step 変数は新しい世代の変数であり、効率的に GC されます。

StateMachine 変数の場合は、物事をサブタスクに分割し、StateMachine 間の値の伝播の推奨パターンに従うことも有効です。このパターンに従うと、子 StateMachine のみが親 StateMachine を参照し、その逆は参照しません。つまり、子が結果のコールバックを使用して親を完了して更新すると、子は自然にスコープ外になり、GC の対象になります。

最後に、以前の状態では StateMachine 変数が必要で、後の状態では必要ない場合があります。大規模なオブジェクトが不要になったことが判明したら、その参照を null にすると便利です。

状態に名前を付ける

メソッドに名前を付けるときは、通常、そのメソッド内で発生する動作にメソッド名を付けることができます。スタックがないため、StateMachine でこれを行う方法は明確ではありません。たとえば、メソッド foo がサブメソッド bar を呼び出すとします。StateMachine では、これは状態シーケンス foo に翻訳され、その後に bar が続きます。foo に動作 bar が含まれなくなりました。その結果、状態のメソッド名はスコープが狭くなり、ローカル動作を反映する可能性があります。

同時実行のツリー図

以下は、ツリー構造をより良く表している構造化された同時実行の図の別のビューです。ブロックが小さな木を形成しています。

構造化された同時実行 3D


  1. これは、値が使用できない場合に最初から再起動する Skyframe の規則とは対照的です。 

  2. stepInterruptedException をスローできますが、この例では省略しています。Bazel コードには、この例外をスローする低レベルのメソッドがいくつかあり、StateMachine を実行する Driver(後述)まで伝播します。必要ない場合は、スローを宣言しなくてもかまいません。 

  3. 同時実行サブタスクは、依存関係ごとに独立した処理を行う ConfiguredTargetFunction を基に作成されました。すべての依存関係を一度に処理する複雑なデータ構造を操作して非効率性が発生するのではなく、各依存関係に独自の独立した StateMachine があります。 

  4. 1 つのステップ内の複数の tasks.lookUp 呼び出しはまとめてバッチ処理されます。追加のバッチ処理は、同時実行サブタスク内で発生するルックアップによって作成できます。 

  5. これは、Java の構造化コンカレンシーの jeps/428 に概念的に似ています。 

  6. これは、スレッドを生成して結合してシーケンシャル コンポジションを実現する方法と似ています。