Skyframe StateMachines のガイド

問題を報告 ソースを表示 Nightly · 7.4 .

概要

Skyframe StateMachine は、ヒープにある分解された関数オブジェクトです。必要な値がすぐに利用できないものの、非同期で計算される場合に、冗長性のない柔軟性1 の評価がサポートされます。StateMachine は待機中にスレッド リソースを結合できませんが、代わりに一時停止して再開する必要があります。したがって、逆コンストラクションにより明示的な再入ポイントが公開され、以前の計算をスキップできます。

StateMachine は、シーケンス、分岐、構造化論理同時実行を表現するために使用でき、Skyframe のインタラクションに合わせて調整されています。StateMachine は、より大きな StateMachine にコンポーズし、サブ StateMachine を共有できます。同時実行は常に構成によって階層化され、純粋に論理的です。すべての同時実行サブタスクは、単一の共有親 SkyFunction スレッドで実行されます。

はじめに

このセクションでは、java.com.google.devtools.build.skyframe.state パッケージにある StateMachine について簡単に説明します。

Skyframe の再起動の概要

Skyframe は、依存関係グラフの並列評価を実行するフレームワークです。グラフの各ノードは、パラメータを指定する SkyKey と結果を指定する SkyValue を持つ SkyFunction の評価に対応しています。計算モデルは、SkyFunction が SkyKey で SkyValue を検索し、追加の SkyFunction の再帰的な並列評価をトリガーするように設計されています。計算のサブグラフの一部が不完全であるために、リクエストされた SkyValue がまだ準備できていない場合、リクエスト元の SkyFunction は null getValue レスポンスを検出し、SkyValue ではなく null を返して、入力がないため不完全であることを通知します。以前にリクエストされたすべての SkyValues が利用可能になると、Skyframe は SkyFunctions を再起動します。

SkyKeyComputeState が導入される前は、再起動を処理する従来の方法は、計算を完全に再実行することでした。これは複雑さは二次関数ですが、この方法で記述された関数は、再実行するたびに、null を返すルックアップが減るため、最終的には完了します。SkyKeyComputeState を使用すると、手動で指定したチェックポイント データを SkyFunction に関連付けることで、再計算を大幅に削減できます。

StateMachineSkyKeyComputeState 内に存在するオブジェクトであり、停止と再開の実行フックを公開することで、SkyFunction の再起動時にほぼすべての再計算を排除します(SkyKeyComputeState がキャッシュから外れないことを前提としています)。

SkyKeyComputeState 内のステートフル計算

オブジェクト指向の設計の観点からは、純粋なデータ値ではなく、SkyKeyComputeState 内に計算オブジェクトを格納することは理にかなっています。Java では、動作を伝播するオブジェクトの最小限の記述は関数インターフェースであり、これで十分です。StateMachine には、不思議なほど再帰的な定義2 があります。

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks インターフェースは SkyFunction.Environment に似ていますが、非同期用に設計されており、論理的に同時実行されるサブタスクのサポートが追加されています3

step の戻り値は別の StateMachine であり、一連のステップを帰納的に指定できます。StateMachine が完了すると、stepDONE を返します。例:

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

次の出力で StateMachine を記述します。

hello
world

step2StateMachine の関数インターフェース定義を満たしているため、メソッド参照 this::step2StateMachine です。メソッド参照は、StateMachine で次の状態を指定する最も一般的な方法です。

一時停止と再開

直感的には、計算をモノリシック関数ではなく StateMachine ステップに分割することで、計算の一時停止と再開に必要なフックが提供されます。StateMachine.step が返されると、明示的な停止ポイントがあります。返された StateMachine 値で指定された連続性は、明示的な再開ポイントです。計算は中断したところから正確に再開できるため、再計算を回避できます。

コールバック、継続、非同期計算

技術的な用語では、StateMachine は継続として機能し、実行される後続の計算を決定します。StateMachine は、ブロックを行う代わりに step 関数から戻ることで自発的に一時停止できます。この関数は制御を Driver インスタンスに戻します。その後、Driver は準備完了の StateMachine に切り替えるか、制御を Skyframe に戻すことができます。

従来、コールバック継続は 1 つのコンセプトに統合されていました。ただし、StateMachine では 2 つの区別が維持されます。

  • コールバック - 非同期計算の結果を保存する場所を指定します。
  • 継続 - 次の実行状態を指定します。

非同期オペレーションを呼び出す場合はコールバックが必要です。つまり、SkyValue ルックアップの場合と同様に、メソッドを呼び出した直後に実際のオペレーションは行われません。コールバックはできる限りシンプルにする必要があります。

継続処理StateMachineStateMachine 戻り値であり、すべての非同期計算が解決された後に続く複雑な実行をカプセル化します。この構造化されたアプローチにより、コールバックの複雑さを管理しやすくなります。

タスク

Tasks インターフェースは、SkyKey で SkyValue を検索し、同時実行サブタスクをスケジュールする API を StateMachine に提供します。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}

SkyValue ルックアップ

StateMachineTasks.lookUp オーバーロードを使用して SkyValue を検索します。SkyFunction.Environment.getValueSkyFunction.Environment.getValueOrThrow に類似しており、例外処理のセマンティクスも同様です。この実装では、ルックアップはすぐに実行されず、可能な限り多くのルックアップをバッチ処理してから実行されます。値がすぐに利用できない場合があります(Skyframe の再起動が必要な場合など)。そのため、呼び出し元はコールバックを使用して、結果の値をどのように処理するかを指定します。4

StateMachine プロセッサ(Driver と SkyFrame へのブリッジング)は、次の状態が開始される前に値が使用可能であることを保証します。以下の例を

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

上記の例では、最初のステップで new Key() を検索し、コンシューマーとして this を渡します。これが可能なのは、DoesLookupConsumer<SkyValue> を実装しているためです。

契約により、次の状態 DoesLookup.processValue が開始される前に、DoesLookup.step のすべてのルックアップが完了します。したがって、valueprocessValue でアクセスされた場合に使用できます。

サブタスク

Tasks.enqueue は、論理的に同時実行されるサブタスクの実行をリクエストします。サブタスクも StateMachine であり、通常の StateMachine ができることは何でもできます。たとえば、サブタスクを再帰的に作成したり、SkyValue を検索したりできます。lookUp と同様に、ステートマシン ドライバは、次のステップに進む前にすべてのサブタスクが完了していることを確認します。以下の例を

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

Subtask1Subtask2 は論理的には同時実行されますが、すべてが単一スレッドで実行されるため、i の「同時実行」更新に同期は必要ありません。

構造化された同時実行

すべての lookUpenqueue は、次の状態に進む前に解決する必要があります。つまり、同時実行は当然、ツリー構造に制限されます。次の例に示すように、階層型5 同時実行を作成できます。

構造化同時実行

UML からは、同時実行構造がツリーを形成していることを区別するのは困難です。ツリー構造をより適切に示す代替ビューがあります。

非構造化の同時実行

構造化された同時実行は、簡単に推測できます。

コンポジションと制御フロー パターン

このセクションでは、複数の StateMachine を作成する方法の例と、特定の制御フローの問題に対する解決策について説明します。

連続状態

これは、最も一般的な直感的な制御フロー パターンです。例については、SkyKeyComputeState 内のステートフル計算をご覧ください。

ブランチ

StateMachine の分岐状態は、次の例に示すように、通常の Java 制御フローを使用して異なる値を返すことで実現できます。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  …
}

特定のブランチが早期に完了するために DONE を返すことは非常に一般的です。

高度な順次合成

StateMachine 制御構造はメモリレスであるため、StateMachine 定義をサブタスクとして共有すると不便な場合があります。M1M2 を、StateMachine インスタンスとし、StateMachine S を共有します。M1M2 は、それぞれシーケンス <A, S, B><X, S, Y> です。問題は、S が完了後に BY のどちらを継続するかがわからず、StateMachine がコールスタックを完全に保持しないことです。このセクションでは、これを実現するための手法について説明します。

終端シーケンス要素としての StateMachine

これでは、最初に提示された問題は解決しません。共有 StateMachine がシーケンスの終端である場合にのみ、シーケンシャル コンポジションを示します。

// S is the shared state machine.
class S implements StateMachine { … }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

これは、S 自体が複雑な状態マシンであっても機能します。

連続コンポジションのサブタスク

キューに登録されたサブタスクは、次の状態になる前に必ず完了するため、サブタスク メカニズムを少し悪用する6こともできます。

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

runAfter インジェクション

S の実行前に完了する必要がある他の並列サブタスクや Tasks.lookUp 呼び出しがあるため、Tasks.enqueue を悪用できない場合があります。この場合、runAfter パラメータを S に挿入することで、次に行うべきことを S に通知できます。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
    … // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
    … // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

このアプローチは、サブタスクを不正使用するよりもクリーンです。ただし、複数の StateMachinerunAfter でネストするなど、これを過度に適用すると、コールバック ヘルルに陥る可能性があります。代わりに、連続した runAfter を通常の連続状態で分割することをおすすめします。

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

は次のように置き換えることができます。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止の代替: runAfterUnlessError

以前のドラフトでは、エラー発生時に早期に中止する runAfterUnlessError を検討していました。これは、エラーが 2 回チェックされることがよくあるという事実に基づいています。1 回は runAfter 参照を持つ StateMachine によって、もう 1 回は runAfter マシン自体によってチェックされます。

検討を重ねた結果、エラーチェックの重複除去よりもコードの均一性の方が重要であると判断しました。runAfter メカニズムが tasks.enqueue メカニズムと整合して動作しない場合、エラー チェックが常に必要になります。

直接委任

正式な状態遷移が発生するたびに、メイン Driver ループが進行します。契約に基づき、状態を進めるということは、次の状態が実行される前に、以前にキューに登録されたすべての SkyValue ルックアップとサブタスクが解決されることを意味します。デリゲート StateMachine のロジックにより、フェーズ進展が不要または逆効果になることがあります。たとえば、デリゲートの最初の step が委任状態のルックアップと並列化できる SkyKey ルックアップを実行する場合、フェーズを進めると、これらはシーケンシャルになります。次の例に示すように、直接委任を行う方が適切な場合があります。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    …
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return …;
  }

  // Rest of implementation.
  …

  private StateMachine complete(Tasks tasks) {
    …
    return runAfter;
  }
}

データフロー

これまでの説明では、制御フローの管理に重点を置いてきました。このセクションでは、データ値の伝播について説明します。

Tasks.lookUp コールバックの実装

SkyValue ルックアップTasks.lookUp コールバックを実装する例があります。このセクションでは、複数の SkyValue を処理する理由を説明し、提案します。

Tasks.lookUp コールバック

Tasks.lookUp メソッドは、コールバック sink をパラメータとして受け取ります。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

慣用的な方法では、Java ラムダを使用してこれを実装します。

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

ここで、myValue はルックアップを行う StateMachine インスタンスのメンバー変数です。ただし、ラムダには、StateMachine 実装で Consumer<SkyValue> インターフェースを実装する場合と比較して、追加のメモリ割り当てが必要です。ラムダは、あいまいな複数のルックアップがある場合に便利です。

SkyFunction.Environment.getValueOrThrow と同様に、Tasks.lookUp のエラー処理のオーバーロードもあります。

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

実装例を以下に示します。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      …
      return DONE;
    }
    // Processes `value`, which is non-null.
    …
  }
}

エラー処理のないルックアップと同様に、StateMachine クラスがコールバックを直接実装すると、ラムバのメモリ割り当てを節約できます。

エラー処理ではもう少し詳しく説明しますが、基本的には、エラーの伝播と通常の値に大きな違いはありません。

複数の SkyValue を使用する

多くの場合、複数の SkyValue ルックアップが必要になります。ほとんどの場合、SkyValue のタイプをオンにすると問題が解決します。次の例は、プロトタイプの本番環境コードから簡素化したものです。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

値の型が異なるため、Consumer<SkyValue> コールバックの実装は明確に共有できます。そうでない場合は、ラムダベースの実装または適切なコールバックを実装する完全な内部クラス インスタンスにフォールバックできます。

StateMachine 間での値の伝播

このドキュメントでは、サブタスクで作業を整理する方法のみを説明してきましたが、サブタスクは呼び出し元に値を報告する必要もあります。サブタスクは論理的に非同期であるため、その結果はコールバックを使用して呼び出し元に通知されます。これを実現するため、サブタスクでは、コンストラクタを介して挿入されるシンク インターフェースを定義します。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

  … // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

呼び出し元 StateMachine は次のようになります。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上記の例では、いくつかの点を示しています。Caller は結果を伝播し、独自の Caller.ResultSink を定義する必要があります。CallerBarProducer.ResultSink コールバックを実装します。再開時に、processResultvalue が null かどうかを確認し、エラーが発生したかどうかを判断します。これは、サブタスクまたは SkyValue ルックアップの出力を受け入れた後の一般的な動作パターンです。

acceptBarError の実装は、エラーバブリングで要求されているように、結果を Caller.ResultSink に積極的に転送します。

トップレベルの StateMachine の代替方法については、Driver と SkyFunctions へのブリッジをご覧ください。

エラー処理

Tasks.lookUp コールバックStateMachines 間での値のプロパゲーションに、エラー処理の例がいくつかあります。InterruptedException 以外の例外はスローされず、代わりに値としてコールバックを介して渡されます。このようなコールバックには、値またはエラーのいずれか 1 つが渡される排他的論理和のセマンティクスがよくあります。

次のセクションでは、Skyframe エラー処理との微妙な、しかし重要な相互作用について説明します。

エラーの再帰(--nokeep_going)

エラーの再帰処理中、リクエストされたすべての SkyValue が使用可能でなくても、SkyFunction が再起動されることがあります。このような場合、Tasks API コントラクトにより、後続の状態に到達することはありません。ただし、StateMachine は引き続き例外を伝播する必要があります。

伝播は次の状態に到達したかどうかに関係なく行われる必要があるため、エラー処理コールバックはこのタスクを実行する必要があります。内部 StateMachine の場合、これは親コールバックを呼び出すことで実現されます。

SkyFunction とインターフェースするトップレベルの StateMachine では、ValueOrExceptionProducersetException メソッドを呼び出すことでこれを行えます。その後、SkyValues が欠落している場合でも、ValueOrExceptionProducer.tryProduceValue は例外をスローします。

Driver が直接使用されている場合は、マシンが処理を完了していなくても、SkyFunction から伝播されたエラーを確認することが重要です。

イベント処理

イベントを出力する必要がある SkyFunctions の場合、StoredEventHandler が SkyKeyComputeState に挿入され、さらにそれを必要とする StateMachine に挿入されます。以前は、Skyframe が特定のイベントを再生しない限りドロップするため、StoredEventHandler が必要でしたが、これはその後修正されました。StoredEventHandler 注入は保持されます。これは、エラー処理コールバックから出力されるイベントの実装を簡素化するためです。

Driver と SkyFunctions へのブリッジ

Driver は、指定されたルート StateMachine から始まる StateMachine の実行を管理します。StateMachine はサブタスク StateMachine を再帰的にキューに追加できるため、1 つの Driver で複数のサブタスクを管理できます。これらのサブタスクは、構造化された同時実行の結果としてツリー構造を作成します。Driver は、サブタスク間で SkyValue ルックアップを一括処理して効率を高めます。

Driver を中心に構築されたクラスはいくつかあり、次の API があります。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver は、単一のルート StateMachine をパラメータとして受け取ります。Driver.drive を呼び出すと、Skyframe を再起動せずに StateMachine が実行されます。StateMachine が完了した場合は true を返し、完了しない場合は false を返します。これは、すべての値が使用可能ではなかったことを示します。

DriverStateMachine の同時状態を維持できるため、SkyKeyComputeState への埋め込みに適しています。

Driver を直接インスタンス化する

StateMachine の実装では、通常、コールバックを介して結果が通知されます。次の例に示すように、Driver を直接インスタンス化できます。

Driver は、後で定義する対応する ResultSink の実装とともに、SkyKeyComputeState の実装に埋め込まれます。最上位レベルでは、State オブジェクトは Driver よりも長く存続することが保証されているため、計算結果の適切なレシーバーです。

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下のコードでは、ResultProducer をスケッチしています。

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

  … // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

この場合、結果を遅延計算するコードは次のようになります。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

Driver の埋め込み

StateMachine が値を生成し、例外を発生させない場合は、次の例に示すように、Driver を埋め込むこともできます。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
}

SkyFunction には、次のようなコードが含まれている場合があります(ここで、StateSkyKeyComputeState の関数固有の型です)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

DriverStateMachine 実装に埋め込むことは、Skyframe の同期コーディング スタイルに適しています。

例外を生成する可能性がある StateMachine

それ以外の場合は、SkyKeyComputeState に埋め込める ValueOrExceptionProducer クラスと ValueOrException2Producer クラスがあり、同期 SkyFunction コードに対応する同期 API があります。

ValueOrExceptionProducer 抽象クラスには次のメソッドが含まれています。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
    …  // Implementation.
  }

  protected final void setValue(V value)  {  … // Implementation. }
  protected final void setException(E exception) {  … // Implementation. }
}

埋め込まれた Driver インスタンスが含まれており、埋め込みドライバResultProducer クラスによく似ており、SkyFunction と同様の方法でインターフェースします。実装では、ResultSink を定義する代わりに、setValue または setException のいずれかが発生したときに呼び出します。両方が発生した場合は、例外が優先されます。tryProduceValue メソッドは、非同期コールバック コードを同期コードにブリッジし、例外が設定されている場合は例外をスローします。

前述のように、エラーバブリング中は、すべての入力が使用可能ではないため、マシンがまだ完了していなくてもエラーが発生する可能性があります。これに対応するため、tryProduceValue は、マシンが完了する前でも、設定された例外をスローします。

エピローグ: コールバックの削除

StateMachine は、非同期計算を実行するための非常に効率的な方法ですが、ボイラープレートが大量に必要になります。継続(特に ListenableFuture に渡される Runnable の形式)は Bazel コードの特定の部分で広く使用されていますが、分析 SkyFunctions では一般的ではありません。分析のほとんどは CPU の制約を受けており、ディスク I/O 用の効率的な非同期 API はありません。コールバックは習得に時間がかかり、可読性を損なうため、最終的には最適化して削除することをおすすめします。

最も有望な選択肢の一つは、Java 仮想スレッドです。コールバックを記述する代わりに、すべてが同期ブロッキング呼び出しに置き換えられます。これは、仮想スレッド リソースはプラットフォーム スレッドとは異なり、コストが低いと想定されているためです。ただし、仮想スレッドを使用しても、単純な同期オペレーションをスレッド作成と同期プリミティブに置き換えると、コストが高すぎます。StateMachine から Java 仮想スレッドへの移行を実行したところ、桁違いに速度が低下し、エンドツーエンドの分析レイテンシが約 3 倍増加しました。仮想スレッドはまだプレビュー版の機能であるため、パフォーマンスが向上した後でこの移行を実行することもできます。

検討すべきもう 1 つの方法は、Loom コルーチンが利用可能になったら待機することです。この方法の利点は、協調的マルチタスクを使用して同期のオーバーヘッドを削減できることです。

他のどれでもうまくいかない場合は、低レベルのバイトコードの書き換えを行うこともできます。十分な最適化を行うことで、手書きのコールバック コードに近いパフォーマンスを実現できる場合があります。

付録

コールバック地獄

コールバック ヘルは、コールバックを使用する非同期コードでよく見られる問題です。これは、後続のステップの継続が前のステップ内にネストされていることに起因しています。ステップが多い場合、このネストはかなり深くなる可能性があります。制御フローと組み合わせると、コードが管理不能になります。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

ネストされた実装の利点の一つは、外側のステップのスタック フレームを保持できることです。Java では、キャプチャされたラムダ変数は実質的に最終的である必要があるため、このような変数を使用するのは面倒です。次のように、ラムダではなく継続としてメソッド参照を返すことで、深いネストを回避できます。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

runAfter インジェクション パターンが高密度に使用された場合にもコールバック HEll が発生することがありますが、これは、連続するステップを注入することで回避できます。

例: 連鎖した SkyValue ルックアップ

アプリケーション ロジックで、2 番目の SkyKey が最初の SkyValue に依存している場合など、SkyValue ルックアップの依存チェーンが必要なことがよくあります。単純に考えると、複雑でネストされたコールバック構造が作成されます。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

ただし、継続はメソッド参照として指定されるため、コードは状態遷移をまたいで手続き型に見えるため、step2step1 に従います。ここでは、ラムダを使用して value2 を割り当てています。これにより、コードの順序が上から下の計算の順序と一致するようになります。

その他のヒント

読みやすさ: 実行順序

可読性を高めるには、StateMachine.step の実装を実行順に保ち、コールバックの実装はコード内で渡される場所の直後に配置します。これは、制御フローが分岐する場所では常に実現できるとは限りません。このような場合は、追加のコメントが役立つ場合があります。

例: 連鎖された SkyValue ルックアップでは、この目的で中間メソッド参照が作成されます。これにより、パフォーマンスがわずかに低下しますが、読みやすさが向上します。

世代仮説

中程度の存続期間の Java オブジェクトは、非常に短い存続期間のオブジェクトまたは永続的に存続するオブジェクトを処理するように設計された Java ガベージ コレクタの世代仮説を破ります。定義上、SkyKeyComputeState 内のオブジェクトはこの仮説に違反します。Driver をルートとして、実行中のすべての StateMachine の構成ツリーを含むこのようなオブジェクトは、非同期計算の完了を待機する中間の存続期間を持ちます。

JDK19 では問題が軽減されているようですが、StateMachine を使用すると、生成される実際のガベージが大幅に減少しても、GC 時間が長くなることがあります。StateMachine は中程度の存続期間があるため、オールド ジェネレーションに昇格し、オールド ジェネレーションがより速くいっぱいになる可能性があります。そのため、クリーンアップにメジャー GC またはフル GC が必要になります。

最初の予防策は StateMachine 変数の使用を最小限に抑えることですが、複数の状態で値が必要な場合などは、常に実現できるとは限りません。可能な場合、ローカル スタックの step 変数は新しい世代の変数であり、効率的に GC されます。

StateMachine 変数の場合は、サブタスクに分割し、StateMachine 間で値を伝播する推奨パターンに従うことも役立ちます。このパターンに従う場合、子 StateMachine のみが親 StateMachine を参照しており、その逆はないことに注目してください。つまり、子が完了して結果コールバックを使用して親を更新すると、子は自然にスコープから外れ、GC の対象になります。

最後に、以前の状態では StateMachine 変数が必要で、後の状態では必要ない場合があります。大きなオブジェクトが不要であることが判明したら、そのオブジェクトの参照を null にするのが有益な場合があります。

状態に名前を付ける

メソッドに名前を付けるときは、通常、そのメソッド内で発生する動作にメソッド名を付けることができます。スタックが存在しないため、StateMachine で行う方法はわかりにくい。たとえば、メソッド foo がサブメソッド bar を呼び出すとします。StateMachine では、これを状態シーケンス foo、その後に bar に変換できます。foobar の動作が含まれなくなりました。その結果、状態のメソッド名はスコープが狭くなり、ローカル動作を反映する可能性があります。

同時実行のツリー図

次の図は、構造化同時実行の図の別のビューで、ツリー構造をより適切に示しています。ブロックが小さな木を形成しています。

構造化された同時実行 3D


  1. 値が使用できない場合に最初から再開するという Skyframe の規則とは対照的です。 

  2. stepInterruptedException をスローできますが、この例では省略しています。Bazel コードには、この例外をスローする低レベルのメソッドがいくつかあり、StateMachine を実行する Driver(後述)まで伝播します。必要ない場合は、スローを宣言しなくてもかまいません。

  3. 同時実行サブタスクは、依存関係ごとに独立した処理を行う ConfiguredTargetFunction を基に作成されました。すべての依存関係を一度に処理する複雑なデータ構造を操作して非効率性を導入するのではなく、各依存関係に独自の独立した StateMachine があります。 

  4. 1 つのステップ内の複数の tasks.lookUp 呼び出しはまとめてバッチ処理されます。追加のバッチ処理は、同時実行サブタスク内で発生するルックアップによって作成できます。 

  5. これは、Java の構造化コンカレンシーの jeps/428 に概念的に似ています。 

  6. これは、スレッドを生成して結合してシーケンシャル コンポジションを実現する方法と似ています。