Skyframe StateMachines 指南

回報問題 查看來源

總覽

Skyframe StateMachine 是位於堆積上的未建構函式物件。如果無法立即提供必要值,但以非同步的方式進行計算,則可支援沒有彈性的評估方式1StateMachine 在等待時無法連結執行緒資源,但必須暫停並繼續。刪除作業會公開明確的重新進入點,讓使用者可以略過先前的計算結果。

StateMachine 可用來表示序列、分支、結構化邏輯並行,並且專為 SkyFrame 互動而量身打造。StateMachine 可構成更大的 StateMachine,並共用子 StateMachine。並行一律以建構和純邏輯為階層結構。每個並行子工作都是在單一共用父項 SkyFunction 執行緒中執行。

簡介

本節將概略說明,並介紹 java.com.google.devtools.build.skyframe.state 套件中的 StateMachine

SkyFrame 重新啟動簡介

Skyframe 是一種可對依附元件圖表進行平行評估的架構。圖表中每個節點都與一個 SkyKey 的評估結果對應,SkyKey 會指定其參數,並用 SkyValue 指定其結果。計算模型讓 SkyFunction 能夠依據 SkyKey 查詢 SkyValue,觸發以遞迴方式平行計算的額外 SkyFunctions。當既有的 SkyValue 尚未進行要求的計算作業不完整時,而不封鎖執行緒 (意味著執行緒不可用),SkyFunction 會觀測 null getValue 回應,且應傳回 null (而不是 SkyValue),指出回應因缺少輸入內容而不完整。所有先前要求取得的 SkyValue 時,Skyframe 就會重新啟動 SkyFunctions。

在啟動 SkyKeyComputeState 之前,處理重新啟動重新啟動的傳統方法是使用完整運算。雖然這種形式的複雜度較高,但每次以此方式編寫的函式最終都會完成,因為查詢每次傳回時都會傳回 null。使用 SkyKeyComputeState 時,您可以將指定檢查點資料與 SkyFunction 建立關聯,以節省大量重新計算作業。

StateMachine 是位於 SkyKeyComputeState 中的物件,在 SkyFunction 重新啟動 (假定 SkyKeyComputeState 未快取的情況下) 時,可公開暫停並繼續執行執行掛鉤,藉此幾乎可以完全重新計算。

SkyKeyComputeState 中的有狀態運算

從物件導向的設計的角度,建議您考慮將運算物件儲存在 SkyKeyComputeState 中,而不是純資料值。在 Java 中,導入物件的物件基本最小值描述為功能介面,並足以滿足這項需求。StateMachine 有下列遞迴遞迴的定義、定義2

@FunctionalInterface
public interface StateMachine {
  StateMachine step(Tasks tasks) throws InterruptedException;
}

Tasks 介面與 SkyFunction.Environment 相似,但專為非同步作業所設計,並支援邏輯並行子工作3

step 的傳回值是另一個 StateMachine,可讓您指定一系列步驟的規格。StateMachine 完成後,step 會傳回 DONE。例如:

class HelloWorld implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    System.out.println("hello");
    return this::step2;  // The next step is HelloWorld.step2.
  }

  private StateMachine step2(Tasks tasks) {
     System.out.println("world");
     // DONE is special value defined in the `StateMachine` interface signaling
     // that the computation is done.
     return DONE;
  }
}

具有以下輸出內容的 StateMachine

hello
world

請注意,由於 step2 符合 StateMachine 的功能介面定義,因此方法參考資料 this::step2 也是 StateMachine。在 StateMachine 中指定下一個狀態時,方法參照是最常見的方法。

暫停和繼續

一開始,將運算細分為 StateMachine 步驟 (而非單體函式),提供暫停繼續計算所需的掛鉤。傳回 StateMachine.step 時,系統會有一個明確的暫停點。傳回的 StateMachine 值指定的接續是明確的 resume 點。因此,您可以避免重新計算作業,因為運算可能會從中斷的地方接續處理。

回呼、接續和非同步運算

在技術術語中,StateMachine 會做為「連續」,以判斷要執行的後續計算。除了不封鎖區塊,StateMachine 也可從 step 函式傳回,藉此將控制權移回 Driver 執行個體,藉此突然暫停。接著,Driver 可以切換至就緒的 StateMachine,或停止使用 SkyFrame。

一般來說,「回呼」和「連續」會合併為一個概念。不過,StateMachine 會區分兩者。

  • 回呼 - 說明非同步計算結果的儲存位置。
  • 繼續 - 指定下一個執行狀態。

叫用非同步作業時需要回呼,亦即呼叫方法時不會立即執行實際作業,就像在 SkyValue 查詢時一樣。回呼應盡可能簡單。

接續StateMachineStateMachine 傳回值,並封裝所有非同步運算後都會執行的複雜執行作業。此結構化方法有助於維持回呼的複雜性。

工作

Tasks 介面提供 StateMachine 搭配 API,可依據 SkyKey 查詢 SkyValue 並安排並行子工作。

interface Tasks {
  void enqueue(StateMachine subtask);

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

  <E extends Exception>
  void lookUp(SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  // lookUp overloads for 2 and 3 exception types exist, but are elided here.
}
StateMachine

SkyValue 查詢

StateMachine 會使用 Tasks.lookUp 超載來查詢 SkyValues。與 SkyFunction.Environment.getValueSkyFunction.Environment.getValueOrThrow 相似,且有類似的例外狀況處理語意。實作內容不會立即執行查詢,而是在執行前盡可能多次批次處理4。舉例來說,可能無法立即取得該值,例如需要使用 SkyFrame 重新啟動,以及呼叫端如何處理使用回呼處理結果值。

StateMachine 處理器 (Driver 和橋接至 SkyFrame) 保證可在下一個狀態之前可用。範例如下所示。

class DoesLookup implements StateMachine, Consumer<SkyValue> {
  private Value value;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key(), (Consumer<SkyValue>) this);
    return this::processValue;
  }

  // The `lookUp` call in `step` causes this to be called before `processValue`.
  @Override  // Implementation of Consumer<SkyValue>.
  public void accept(SkyValue value) {
    this.value = (Value)value;
  }

  private StateMachine processValue(Tasks tasks) {
    System.out.println(value);  // Prints the string representation of `value`.
    return DONE;
  }
}

在上述範例中,第一個步驟會執行 new Key() 查詢,並將 this 做為消費者傳遞。這可能是因為 DoesLookup 實作了 Consumer<SkyValue>

依合約,在下一個狀態 DoesLookup.processValue 之前,DoesLookup.step 的所有查詢都已完成。因此,在 processValue 中存取此 value 時可以使用。

子工作

Tasks.enqueue 要求執行邏輯並行的子工作。子工作也是 StateMachine,可以執行一般 StateMachine 執行的所有操作,包括遞迴建立更多子工作,或查詢 SkyValues。與 lookUp 類似,狀態機器驅動程式可確保所有子工作都已完成,再進行下一個步驟。範例如下所示。

class Subtasks implements StateMachine {
  private int i = 0;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new Subtask1());
    tasks.enqueue(new Subtask2());
    // The next step is Subtasks.processResults. It won't be called until both
    // Subtask1 and Subtask 2 are complete.
    return this::processResults;
  }

  private StateMachine processResults(Tasks tasks) {
    System.out.println(i);  // Prints "3".
    return DONE;  // Subtasks is done.
  }

  private class Subtask1 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 1;
      return DONE;  // Subtask1 is done.
    }
  }

  private class Subtask2 implements StateMachine {
    @Override
    public StateMachine step(Tasks tasks) {
      i += 2;
      return DONE;  // Subtask2 is done.
    }
  }
}

雖然 Subtask1Subtask2 在邏輯上同時執行,但所有執行緒都會在單一執行緒中執行,因此 i 的「並行」更新不需要任何同步處理。

結構化並行

由於每個 lookUpenqueue 都必須在達到下一個狀態之前解決,因此並行自然僅限於樹狀結構。您可以建立階層式5 並行,如以下範例所示。

結構化並行

很難從 UML 判斷並行結構是否形成樹狀結構。我們提供替代檢視畫面,以更準確的樹狀結構結構。

非結構化並行

結構化並行很容易詢問。

組合與控制流程模式

本節提供多個 StateMachine 的組合範例,以及特定控制流程問題的解決方法。

依序狀態

這是最簡單易懂的控制流程模式,相關範例請參閱「SkyKeyComputeState 中的有狀態計算」一節。

分支版本

使用一般的 Java 控制流程傳回不同值,即可達到 StateMachine 中的分支版本狀態,如以下範例所示。

class Branch implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    // Returns different state machines, depending on condition.
    if (shouldUseA()) {
      return this::performA;
    }
    return this::performB;
  }
  …
}

某些分支版本很早就傳回 DONE,以便提早完成。

進階依序組合

StateMachine 控制項結構沒有記憶體,因此共用 StateMachine 定義有時可能會成為子工作。讓 M1M2StateMachine,這些執行個體會共用 StateMachineS,其中 M1M2 則依序為 <A, S, B><X, S, Y>。問題是,「S」無法確定作業完成後要繼續前往「B」或「Y」,而 StateMachine 也「不太」呼叫堆疊。本節會探討一些達成這個目標的技巧。

StateMachine 設為終端機序列元素

這個方法無法解決初步問題。只有在共用 StateMachine 為序列中的終端機時,系統才會顯示依序組合。

// S is the shared state machine.
class S implements StateMachine { … }

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    return new S();
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    return new S();
  }
}

即使 S 本身也是複雜的狀態機器,也能正常運作。

子組合的子工作

佇列中的子工作一定會在下一個狀態之前完成,因此有時也可以稍微濫用6子工作機制。

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // S starts after `step` returns and by contract must complete before `doB`
    // begins. It is effectively sequential, inducing the sequence < A, S, B >.
    tasks.enqueue(new S());
    return this::doB;
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Similarly, this induces the sequence < X, S, Y>.
    tasks.enqueue(new S());
    return this::doY;
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

注入runAfter

有時候,停用 Tasks.enqueue 是不可能的,因為還有其他平行處理子工作或 Tasks.lookUp 呼叫必須在 S 執行之前完成。在這個範例中,將 runAfter 參數插入 S 可以用來告知 S 後續作業。

class S implements StateMachine {
  // Specifies what to run after S completes.
  private final StateMachine runAfter;

  @Override
  public StateMachine step(Tasks tasks) {
    … // Performs some computations.
    return this::processResults;
  }

  @Nullable
  private StateMachine processResults(Tasks tasks) {
    … // Does some additional processing.

    // Executes the state machine defined by `runAfter` after S completes.
    return runAfter;
  }
}

class M1 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performA();
    // Passes `this::doB` as the `runAfter` parameter of S, resulting in the
    // sequence < A, S, B >.
    return new S(/* runAfter= */ this::doB);
  }

  private StateMachine doB(Tasks tasks) {
    performB();
    return DONE;
  }
}

class M2 implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks) {
    performX();
    // Passes `this::doY` as the `runAfter` parameter of S, resulting in the
    // sequence < X, S, Y >.
    return new S(/* runAfter= */ this::doY);
  }

  private StateMachine doY(Tasks tasks) {
    performY();
    return DONE;
  }
}

這個做法比暫停子工作更加簡潔。不過,如果過度套用這種做法 (例如以 runAfter 建立多個 StateMachine,就必須是 Callback Hell)。最好將連續 runAfter 分割為一般循序狀態。

  return new S(/* runAfter= */ new T(/* runAfter= */ this::nextStep))

可以替換成以下內容。

  private StateMachine step1(Tasks tasks) {
     doStep1();
     return new S(/* runAfter= */ this::intermediateStep);
  }

  private StateMachine intermediateStep(Tasks tasks) {
    return new T(/* runAfter= */ this::nextStep);
  }

禁止替代方案:runAfterUnlessError

在先前的草稿中,我們已判定 runAfterUnlessError 會在發生錯誤時取消。之所以會出現這項結果,是因為錯誤錯誤通常會發生兩次檢查,分別由含有 runAfter 參照的 StateMachinerunAfter 機器自行檢查。

修正後,我們認為程式碼的一致性比複製錯誤更重要。如果 runAfter 機制未與 tasks.enqueue 機制不一致,而這或機制總是需要檢查錯誤,就會混淆。

直接委派

每次進行狀態轉換時,主要的 Driver 迴圈就會向前提升。如依合約規定,升級狀態表示所有先前納入佇列的 SkyValue 查詢和子工作,皆會在下次狀態執行前解決。有時,委派 StateMachine 的邏輯會使階段變得不必要或有損。舉例來說,如果委派的第一個 step 執行 SkyKey 查詢,而這些查詢可對委派狀態的查詢進行平行處理,則階段階段會將其設為依序。執行直接委派是較合理的做法,如以下範例所示。

class Parent implements StateMachine {
  @Override
  public StateMachine step(Tasks tasks ) {
    tasks.lookUp(new Key1(), this);
    // Directly delegates to `Delegate`.
    //
    // The (valid) alternative:
    //   return new Delegate(this::afterDelegation);
    // would cause `Delegate.step` to execute after `step` completes which would
    // cause lookups of `Key1` and `Key2` to be sequential instead of parallel.
    return new Delegate(this::afterDelegation).step(tasks);
  }

  private StateMachine afterDelegation(Tasks tasks) {
    …
  }
}

class Delegate implements StateMachine {
  private final StateMachine runAfter;

  Delegate(StateMachine runAfter) {
    this.runAfter = runAfter;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new Key2(), this);
    return …;
  }

  // Rest of implementation.
  …

  private StateMachine complete(Tasks tasks) {
    …
    return runAfter;
  }
}

資料流程

先前的討論的重點是管理控制流程。本節將說明資料值的傳播。

實作 Tasks.lookUp 回呼

範例是在 SkyValue 查詢中實作 Tasks.lookUp 回呼。本節提供原因,並說明處理多個 SkyValue 的方法。

Tasks.lookUp 回呼

Tasks.lookUp 方法會採用回呼 sink 做為參數。

  void lookUp(SkyKey key, Consumer<SkyValue> sink);

慣用式是使用 Java lambda 來實作這樣:

  tasks.lookUp(key, value -> myValue = (MyValueClass)value);

其中 myValue 是執行查詢的 StateMachine 執行個體成員變數。不過,相較於在 StateMachine 實作中實作 Consumer<SkyValue> 介面,lambda 需要額外的記憶體配置。如有多項查詢會導致混淆,lambda 仍然非常實用。

還有處理 Tasks.lookUp 的超載,該錯誤與 SkyFunction.Environment.getValueOrThrow 相似。

  <E extends Exception> void lookUp(
      SkyKey key, Class<E> exceptionClass, ValueOrExceptionSink<E> sink);

  interface ValueOrExceptionSink<E extends Exception> {
    void acceptValueOrException(@Nullable SkyValue value, @Nullable E exception);
  }

以下為實作範例。

class PerformLookupWithError extends StateMachine, ValueOrExceptionSink<MyException> {
  private MyValue value;
  private MyException error;

  @Override
  public StateMachine step(Tasks tasks) {
    tasks.lookUp(new MyKey(), MyException.class, ValueOrExceptionSink<MyException>) this);
    return this::processResult;
  }

  @Override
  public acceptValueOrException(@Nullable SkyValue value, @Nullable MyException exception) {
    if (value != null) {
      this.value = (MyValue)value;
      return;
    }
    if (exception != null) {
      this.error = exception;
      return;
    }
    throw new IllegalArgumentException("Both parameters were unexpectedly null.");
  }

  private StateMachine processResult(Tasks tasks) {
    if (exception != null) {
      // Handles the error.
      …
      return DONE;
    }
    // Processes `value`, which is non-null.
    …
  }
}

與在沒有錯誤處理的情況下處理查詢一樣,使用 StateMachine 類別直接實作回呼可儲存 lamba 的記憶體配置。

錯誤處理雖然有點細節,但基本上,錯誤性和一般值之間並沒有差異。

取用多個 SkyValue

通常需要多個 SkyValue 查詢。大部分時間都有效的做法是開啟 SkyValue 類型。以下範例已從原型正式版程式碼簡化簡化。

  @Nullable
  private StateMachine fetchConfigurationAndPackage(Tasks tasks) {
    var configurationKey = configuredTarget.getConfigurationKey();
    if (configurationKey != null) {
      tasks.lookUp(configurationKey, (Consumer<SkyValue>) this);
    }

    var packageId = configuredTarget.getLabel().getPackageIdentifier();
    tasks.lookUp(PackageValue.key(packageId), (Consumer<SkyValue>) this);

    return this::constructResult;
  }

  @Override  // Implementation of `Consumer<SkyValue>`.
  public void accept(SkyValue value) {
    if (value instanceof BuildConfigurationValue) {
      this.configurationValue = (BuildConfigurationValue) value;
      return;
    }
    if (value instanceof PackageValue) {
      this.pkg = ((PackageValue) value).getPackage();
      return;
    }
    throw new IllegalArgumentException("unexpected value: " + value);
  }

Consumer<SkyValue> 回呼實作方式不同,因為值類型不同。如果不是,請改用 lambda 實作,或執行適當的回呼的完整內部類別執行個體。

傳播 StateMachine 之間的值

目前為止,本文件僅說明如何在子工作中安排工作,但子工作也需要向呼叫端回報值。由於子工作邏輯是非同步的,因此其結果會透過回呼傳回呼叫端。如要執行這項工作,子工作會定義透過其建構函式插入的接收器介面。

class BarProducer implements StateMachine {
  // Callers of BarProducer implement the following interface to accept its
  // results. Exactly one of the two methods will be called by the time
  // BarProducer completes.
  interface ResultSink {
    void acceptBarValue(Bar value);
    void acceptBarError(BarException exception);
  }

  private final ResultSink sink;

  BarProducer(ResultSink sink) {
     this.sink = sink;
  }

  … // StateMachine steps that end with this::complete.

  private StateMachine complete(Tasks tasks) {
    if (hasError()) {
      sink.acceptBarError(getError());
      return DONE;
    }
    sink.acceptBarValue(getValue());
    return DONE;
  }
}

呼叫端 StateMachine 應如下所示。

class Caller implements StateMachine, BarProducer.ResultSink {
  interface ResultSink {
    void acceptCallerValue(Bar value);
    void acceptCallerError(BarException error);
  }

  private final ResultSink sink;

  private Bar value;

  Caller(ResultSink sink) {
    this.sink = sink;
  }

  @Override
  @Nullable
  public StateMachine step(Tasks tasks) {
    tasks.enqueue(new BarProducer((BarProducer.ResultSink) this));
    return this::processResult;
  }

  @Override
  public void acceptBarValue(Bar value) {
    this.value = value;
  }

  @Override
  public void acceptBarError(BarException error) {
    sink.acceptCallerError(error);
  }

  private StateMachine processResult(Tasks tasks) {
    // Since all enqueued subtasks resolve before `processResult` starts, one of
    // the `BarResultSink` callbacks must have been called by this point.
    if (value == null) {
      return DONE;  // There was a previously reported error.
    }
    var finalResult = computeResult(value);
    sink.acceptCallerValue(finalResult);
    return DONE;
  }
}

上例示範了一些做法,Caller 必須傳播結果,並定義自己的 Caller.ResultSinkCaller 會實作 BarProducer.ResultSink 回呼。恢復時,processResult 會檢查 value 是否為空值,以判斷是否發生錯誤。這是從子工作或 SkyValue 查詢接受輸出後,常見的行為模式。

請注意,實作 acceptBarError 會快速將結果轉送至 Caller.ResultSink,如 錯誤對話框

如需頂層 StateMachine 的替代方案,請參閱 Driver,並橋接至 SkyFunctions

處理錯誤

以下列舉幾個 Tasks.lookUp 回呼StateMachines 之間值之間對應的錯誤處理方式。系統不會擲回 InterruptedException 以外的例外狀況,而是透過回呼做為值來傳遞。這類回呼通常有專屬的或語意,並剛好傳送一個值或錯誤。

下一節將說明與 SkyFrame 錯誤處理機制細微但重要的互動。

錯誤對話框 (--nokeep_going)

發生錯誤時,即使並非所有要求的 SkyValue 可用,也可能會重新啟動 SkyFunction。在這類情況下,由於 Tasks API 合約,後續狀態都不會進入。不過,StateMachine 仍會傳播例外狀況。

無論是否達到下一個狀態,都必須進行傳播,因此錯誤處理回呼必須執行這項工作。針對內部 StateMachine,您可以叫用父項回呼來完成。

只要在頂層 StateMachine (提供 SkyFunction 介面) 中呼叫 ValueOrExceptionProducersetException 方法,即可完成這項作業。即使缺少觀測值,ValueOrExceptionProducer.tryProduceValue 也會擲回例外狀況。

如果直接使用 Driver,請務必檢查 SkyFunction 是否有傳播錯誤,即使機器尚未完成處理也一樣。

事件處理

針對需要發出事件的 SkyFunction,請將 StoredEventHandler 插入 SkyKeyComputeState,並進一步插入需要這些事件的 StateMachine。過去,如果使用的是 Skyframe 特定事件的重播事件,就必須使用 StoredEventHandler,但後續情況已經修正。系統會保留 StoredEventHandler 插入,因為這樣可簡化錯誤處理回呼發出的事件實作。

Driver 和橋接函式

Driver 負責管理 StateMachine 的執行作業,從指定根 StateMachine 開始。由於 StateMachine 會遞迴將子工作 StateMachine 加入佇列,因此單一 Driver 可以管理多項子工作。這些子工作會建立樹狀結構結構,並形成結構化並行Driver 會批次處理子工作中的 SkyValue 查詢,藉此提高效率。

以下類別根據 Driver 建構而成多個類別。

public final class Driver {
  public Driver(StateMachine root);
  public boolean drive(SkyFunction.Environment env) throws InterruptedException;
}

Driver 會使用單一根 StateMachine 做為參數。呼叫 Driver.drive 時,會盡可能執行 StateMachine,而不需要重新啟動畫面。當 StateMachine 完成後,其會傳回 True,否則會傳回所有值。

Driver 會保留 StateMachine 的並行狀態,且適合嵌入 SkyKeyComputeState 中。

直接執行個體化 Driver

StateMachine 實作通常會透過回呼傳達結果。您可以直接將 Driver 執行個體化,如以下範例所示。

Driver 嵌入 SkyKeyComputeState 中,並導入對應的 ResultSink,以便進一步定義。在頂層,State 物件是適合用於計算結果的正確接收器,它保證保證會過 Driver

class State implements SkyKeyComputeState, ResultProducer.ResultSink {
  // The `Driver` instance, containing the full tree of all `StateMachine`
  // states. Responsible for calling `StateMachine.step` implementations when
  // asynchronous values are available and performing batched SkyFrame lookups.
  //
  // Non-null while `result` is being computed.
  private Driver resultProducer;

  // Variable for storing the result of the `StateMachine`
  //
  // Will be non-null after the computation completes.
  //
  private ResultType result;

  // Implements `ResultProducer.ResultSink`.
  //
  // `ResultProducer` propagates its final value through a callback that is
  // implemented here.
  @Override
  public void acceptResult(ResultType result) {
    this.result = result;
  }
}

以下程式碼會繪製 ResultProducer

class ResultProducer implements StateMachine {
  interface ResultSink {
    void acceptResult(ResultType value);
  }

  private final Parameters parameters;
  private final ResultSink sink;

  … // Other internal state.

  ResultProducer(Parameters parameters, ResultSink sink) {
    this.parameters = parameters;
    this.sink = sink;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
    return this::complete;
  }

  private StateMachine complete(Tasks tasks) {
    sink.acceptResult(getResult());
    return DONE;
  }
}

然後延遲計算的程式碼看起來會像這樣。

@Nullable
private Result computeResult(State state, Skyfunction.Environment env)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new Driver(new ResultProducer(
      new Parameters(), (ResultProducer.ResultSink)state));
  }
  if (state.resultProducer.drive(env)) {
    // Clears the `Driver` instance as it is no longer needed.
    state.resultProducer = null;
  }
  return state.result;
}

嵌入Driver

如果 StateMachine 會產生值且沒有例外狀況,嵌入 Driver 是另一個可行的實作方式,如以下範例所示。

class ResultProducer implements StateMachine {
  private final Parameters parameters;
  private final Driver driver;

  private ResultType result;

  ResultProducer(Parameters parameters) {
    this.parameters = parameters;
    this.driver = new Driver(this);
  }

  @Nullable  // Null when a Skyframe restart is needed.
  public ResultType tryProduceValue( SkyFunction.Environment env)
      throws InterruptedException {
    if (!driver.drive(env)) {
      return null;
    }
    return result;
  }

  @Override
  public StateMachine step(Tasks tasks) {
    …  // Implementation.
}

SkyFunction 的程式碼大致如下所示 (State 是函式專用的 SkyKeyComputeState 類型)。

@Nullable  // Null when a Skyframe restart is needed.
Result computeResult(SkyFunction.Environment env, State state)
    throws InterruptedException {
  if (state.result != null) {
    return state.result;
  }
  if (state.resultProducer == null) {
    state.resultProducer = new ResultProducer(new Parameters());
  }
  var result = state.resultProducer.tryProduceValue(env);
  if (result == null) {
    return null;
  }
  state.resultProducer = null;
  return state.result = result;
}

StateMachine 實作中嵌入 Driver 更適合使用 Skyframe 的同步程式碼樣式。

可能會產生例外狀況的 StateMachine

或者,有些 SkyKeyComputeState 可嵌入的 ValueOrExceptionProducerValueOrException2Producer 類別,具有同步 API 來比對同步 SkyFunction 程式碼。

ValueOrExceptionProducer 抽象類別包含下列方法。

public abstract class ValueOrExceptionProducer<V, E extends Exception>
    implements StateMachine {
  @Nullable
  public final V tryProduceValue(Environment env)
      throws InterruptedException, E {
    …  // Implementation.
  }

  protected final void setValue(V value)  {  … // Implementation. }
  protected final void setException(E exception) {  … // Implementation. }
}

其內含嵌入的 Driver 執行個體,類似於類似嵌入驅動程式中的 ResultProducer 類別,以及與 SkyFunction 類似的介面。如果有實作 ResultSinksetException,會改為在發生上述任一情況時實作 setValue。當這兩種情況發生時,系統都會優先採用例外狀況。tryProduceValue 方法會將非同步回呼程式碼橋接至同步程式碼,並在設定時擲回例外狀況。

如前文所述,即使發生錯誤,即使機器尚未提供所有輸入資料,只要裝置尚未完成,也可能會發生錯誤。為因應此情況,即使機器已完成,tryProduceValue 也會擲回任何已設定的例外狀況。

Epilogue:最終移除回呼

StateMachine 是非常有效率的做法,但會密集執行非同步運算。接續 (尤其是以 ListenableFuture 傳遞的 Runnable) 形式,在 Bazel 程式碼的特定部分會廣泛使用,但在分析 SkyFunctions 時並不是。分析作業大多受到 CPU 限制,且磁碟 I/O 沒有有效的非同步 API。最後,如果回呼有學習曲線且容易判讀,就適合最佳化回呼。

另一個最可行的替代方案為 Java 虛擬執行緒。您不必使用回呼,而是以同步的封鎖呼叫取代所有項目。之所以會這麼做,原因在於連結虛擬執行緒資源與平台執行緒不同,這種做法的費用應較低。不過,即使是使用虛擬執行緒,如果用執行緒建立和同步處理基本項目取代簡單的同步處理作業,則代價太昂貴。我們執行了從 StateMachine 遷移至 Java 虛擬執行緒遷移作業,且速度大幅加快,將端對端分析延遲時間提高了 3 倍。由於虛擬執行緒目前仍為預先發布版功能,因此在效能提升時,較晚可以執行這項遷移作業。

另一個還要考慮的 Loom 協同程式 (如果有) 可供使用。這樣做的好處是,您可以使用合作的多工處理減少同步處理負擔。

如果其他方法都失敗,則低階位元碼重新編寫或許是可行的替代選項。如果經過充足的最佳化作業,您或許可以獲得使用手動回呼回呼的效能。

附錄

回呼堆

回呼修復是使用回呼的非同步程式碼,是嚴重的問題。事實上,上一個步驟的下一步是繼續下一步。如果有許多步驟,這個巢狀結構可能會相當深層。如果搭配使用控制流程,程式碼就會變得無法管理。

class CallbackHell implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return (t, l) -> {
      doB();
      return (t1, l2) -> {
        doC();
        return DONE;
      };
    };
  }
}

巢狀實作的其中一項優點是,您可以保留外部步驟的堆疊框架。在 Java 中,擷取的 lambda 變數必須有效,因此使用這類變數可能相當麻煩。傳回方法參照為連續值 (而非 lambda),如下所示,避免巢狀層級巢狀結構傳回。

class CallbackHellAvoided implements StateMachine {
  @Override
  public StateMachine step(Tasks task) {
    doA();
    return this::step2;
  }

  private StateMachine step2(Tasks tasks) {
    doB();
    return this::step3;
  }

  private StateMachine step3(Tasks tasks) {
    doC();
    return DONE;
  }
}

如果過度使用 runAfter 插入模式,可能也會使用回呼堆積,但透過依序略過插入程式碼來避免這種情況。

範例:鏈結 SkyValue 查詢

應用程式邏輯往往需要多個與 SkyValue 查詢的相依關係,例如第二個第二個 KeyKey 依附於第一個 SkyValue 時。經過仔細考慮,這會導致複雜的複雜回呼結構。

private ValueType1 value1;
private ValueType2 value2;

private StateMachine step1(...) {
  tasks.lookUp(key1, (Consumer<SkyValue>) this);  // key1 has type KeyType1.
  return this::step2;
}

@Override
public void accept(SkyValue value) {
  this.value1 = (ValueType1) value;
}

private StateMachine step2(...) {
  KeyType2 key2 = computeKey(value1);
  tasks.lookup(key2, this::acceptValueType2);
  return this::step3;
}

private void acceptValueType2(SkyValue value) {
  this.value2 = (ValueType2) value;
}

不過,由於連續接續指定方法參照,因此程式碼會在狀態轉換期間進行程序:step2step1。請注意,這裡的 lambda 用於指派 value2。這會讓程式碼的順序符合從上到下的順序。

其他提示

可讀性:執行順序

如要提升可讀性,請將 StateMachine.step 實作的執行順序持續傳遞,並在回呼中傳入其回呼後立即實作。控制流程分支版本不一定訂閱項目。在這種情況下,其他註解或許會有幫助。

範例:鏈結 SkyValue 查詢中,會建立中繼方法的參照以達成此目標。這樣會稍微少量效能進行可讀,而在這裡或許值得。

新一代假設

中型的 Java 物件會破壞 Java 垃圾收集器的產生假設,而這類物件是專門用於處理極短時間的物件或永久存在的物件。根據定義,SkyKeyComputeState 中的物件已違反這項假設。這類物件,包含所有處於執行中 StateMachine 的建構樹狀結構,而在 Driver 中取得的根層級會暫停,在等待非同步運算完成之前。

JDK19 看起來似乎不太好,但使用 StateMachine 時,即使實際產生的垃圾次數大幅降低,有時還是能觀察 GC 時間增加。由於 StateMachine 具有中繼壽命,因此也可以升級至舊版,因此可更快完成填入作業,因此需要耗費較多且較昂貴的主要或完整 GC。

初始的注意事項是盡可能減少使用 StateMachine 變數,但有時不適用於多個狀態,可以的話,本機堆疊 step 變數是年輕世代的變數,有效提升 GCd。

對於 StateMachine 變數,將工作細分為子工作,並遵循建議StateMachine 之間產生值的建議模式。請注意,當遵循模式時,只有子項 StateMachine 會參照父項 StateMachine,反之亦然。這表示當子項使用結果回呼來完成並更新父項時,子項會自然超出範圍,成為 GC 使用資格。

最後,在某些情況下,在之前的狀態中需要 StateMachine 變數,但稍後並沒有狀態。如果已知大型物件已不再需要使用,此時可為空值傳回空值。

命名狀態

命名方法時,您通常可以為該方法內特定行為的名稱命名。這麼做並不清楚在 StateMachine 中執行此操作,因為沒有堆疊。舉例來說,假設 foo 方法會呼叫子方法 bar。在 StateMachine 中,這可以轉譯為狀態序列 foo,後面加上 barfoo 不再包含 bar 行為。因此,狀態的方法名稱通常較窄,可能會反映本機行為。

並行情況樹狀結構圖

以下為結構化並行中的圖表替代說明樹狀結構的結構。 積木形成一個小的樹木。

結構化並行 3D


  1. 相對於 SkyFrame 在沒有值時從頭重新啟動資料慣例的慣例。 

  2. 請注意,step 可以擲回 InterruptedException,但範例省略此項。Bazel 程式碼中有幾種方法可以擲回此例外狀況,並傳遞至 Driver,稍後會執行 StateMachine。您可以視需要宣告不需要。

  3. 並行子工作受到 ConfiguredTargetFunction 啟發,其會針對每個依附元件執行「獨立」工作。每個依附元件都有其專屬的 StateMachine,而非操控一次處理所有依附元件的複雜資料結構。

  4. 系統會將同一個步驟中的多個 tasks.lookUp 呼叫批次處理。並行作業可於並行子工作內進行的查詢建立。 

  5. 這個概念與 Java 的結構化並行 jeps/428 類似。 

  6. 這類似於產生執行緒並加入執行緒,以達成依序組合。