At the core of every DE engine is some form of continuous execution log. You can think of it somewhat like a write-ahead log of a database. It captures the intent to execute a given flow step, making it possible to retry that step if it fails using the same parameter values. Once executed successfully, the result of the step will also be recorded in the log, so that it can be replayed from there if needed, without actually executing the step again.
DE logs broadly come in two flavours; One is in the form of an external state store that is accessed through some kind of SDK. Example frameworks that take this approach include Temporal, Restate, Resonate, and Ingest. The second option is to persist DE state in the local database of a given application or (micro)service. One solution in this category is DBOS, which implements DE on top of Postgres.
To keep things simple, I went with the local database model for Persistasaurus, using SQLite to store the execution logs. But as we’ll see later, depending on your specific use case, SQLite can actually be a great choice for a production scenario as well, for example when building a self-contained agent system.
The structure of the execution log table in SQLite is straight-forward. It contains an entry for each sustainable execution phase:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE IF NOT EXISTS execution_log (
flowId TEXT NOT NULL, (1)
step INTEGER NOT NULL, (2)
timestamp INTEGER NOT NULL, (3)
class_name TEXT NOT NULL, (4)
method_name TEXT NOT NULL, (5)
delay INTEGER, (6)
status TEXT (7)
CHECK( status IN ('PENDING','WAITING_FOR_SIGNAL','COMPLETE') )
NOT NULL,
attempts INTEGER NOT NULL DEFAULT 1, (8)
parameters BLOB, (9)
return_value BLOB, (10)
PRIMARY KEY (flowId, step)
)
| 1 | UUID of flow |
| 2 | Sequence number of the step within the flow, in order of execution |
| 3 | Timestamp of the first time this step was run |
| 4 | Name of the class that defines the step method |
| 5 | Step method name (currently ignoring overloaded methods for this PoC) |
| 6 | For delayed steps, delay in milli-seconds |
| 7 | Current state of phase |
| 8 | A counter to keep track of how many times the step has been tried |
| 9 | Serialized form of input parameters of the step, if any |
| 10 | Sequential form of result of step, if any |
This log table stores all the information needed to capture execution intent and persist the results. More details on the concepts of delay and signaling are given further.
When running a flow, the engine needs to know when a given step is executed so that it can be logged. A common way to do this is through explicit API calls to the engine, for example with DBOSTransact:
1
2
3
4
5
@Workflow
public void workflow() {
DBOS.runStep(() -> stepOne(), "stepOne");
DBOS.runStep(() -> stepTwo(), "stepTwo");
}
This works, but tightly coupled the workflow to the DE engine’s API. My goal for Persistaurus was to avoid this dependency as much as possible. Instead, the idea is to transparently intercept invocations of all step methods and track them in the execution log, allowing very concise flow expressions, without any API dependencies:
1
2
3
4
5
@Flow
public void workflow() {
stepOne();
stepTwo();
}
To let the DE engine know when a flow or step method is invoked, the proxy pattern is being used: a proxy actually wraps the flow object and handles each of its method invocations, updating the state in the execution log before and after passing the call to the flow. Thanks to the dynamic nature of Java, creating such a proxy is relatively easy, requiring just a little bytecode generation. Not surprisingly, I’m using the ByteBuddy library for this purpose:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static <T> T getFlowProxy(Class<T> clazz, UUID id) {
try {
return new ByteBuddy()
.subclass(clazz) (1)
.method(ElementMatchers.any()) (2)
.intercept( (3)
MethodDelegation.withDefaultConfiguration()
.withBinders(
Morph.Binder.install(OverrideCallable.class))
.to(new Interceptor(id)))
.make()
.load(Persistasaurus.class.getClassLoader()) (4)
.getLoaded()
.getDeclaredConstructor()
.newInstance(); (5)
}
catch (Exception e) {
throw new RuntimeException("Couldn't instantiate flow", e);
}
}
| 1 | Create a subclass proxy for the flow type |
| 2 | Block all method invocations on this proxy… |
| 3 | …and hand them over to a Interceptor object |
| 4 | Load generated proxy class |
| 5 | accelerate flow proxy |
On one hand, Cloud Code does an excellent job of generating code using the ByteBuddy API, which is not always self-explanatory. Now, whenever a method is invoked on the Flow proxy, the call is handed over to Interceptor Class, which will record the step in the execution log before invoking the actual flow method. I’m going to give you a full description of the method interceptor implementation (you can find it here on GitHub), but the high-level logic looks like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public Object intercept(@This Object instance,
@Origin Method method,
@AllArguments Object[] args,
@Morph OverrideCallable callable) throws Throwable {
if (!isFlowOrStep(method)) {
return callable.call(args);
}
Invocation loggedInvocation = executionLog.getInvocation(id, step);
if (loggedInvocation != null &&
loggedInvocation.status() == InvocationStatus.COMPLETE) { (1)
step++;
return loggedInvocation.returnValue();
}
else {
executionLog.logInvocationStart(
id, step, method.getName(), InvocationStatus.PENDING, args); (2)
int currentStep = step;
step++;
Object result = callable.call(args); (3)
executionLog.logInvocationCompletion(id, currentStep, result); (4)
return result;
}
}
| 1 | Rerun complete step if present |
| 2 | log invocation |
| 3 | execute actual step method |
| 4 | log results |
It is necessary to replay completed steps from the log to ensure deterministic execution. Each step typically runs exactly once, capturing non-deterministic values such as the current time or random numbers while doing so.
However, there is one important failure mode: if the system crashes after A step has been executed but First The result can be recorded in a log, that step will be repeated when the flow is run again. This is very unlikely to happen, but whether it is acceptable depends on the particular use case. When executing steps that have side-effects, such as remote API calls, it may be a good idea to add an Idempotency key to the requests, which lets the invoked services detect and ignore any potential duplicate calls.
The actual execution log implementation is not that interesting, you can find its source code here. It simply continues the invocation of the steps and their positions. execution_log The SQLite table is shown above.