supervisor-go: add ledger package with Append/Read APIs, tests; refactor CLI to use package; update README
This commit is contained in:
@@ -3,14 +3,30 @@
|
||||
Runnable stub demonstrating CognitiveOS control-plane concepts without
|
||||
an actual LLM.
|
||||
|
||||
## Run
|
||||
This directory now contains a small ledger package (ledger/) with Append
|
||||
and Read APIs and unit tests, plus a tiny CLI demo (main.go) that
|
||||
appends route/session/output events to a JSONL ledger.
|
||||
|
||||
go run . -ledger ./ledger.jsonl
|
||||
## Run (demo)
|
||||
|
||||
Then type requests and observe: - deterministic route decision -
|
||||
session_spawn + child_output_received - final_output_emitted
|
||||
Build or run the CLI:
|
||||
|
||||
go run . -ledger ./ledger.jsonl
|
||||
|
||||
Then type requests and observe the deterministic route decision,
|
||||
spawn/output events, and final output. The ledger file will be created
|
||||
as JSONL at the path you provide.
|
||||
|
||||
## Tests
|
||||
|
||||
Run the unit tests for the ledger package:
|
||||
|
||||
go test ./ledger
|
||||
|
||||
The tests create a temporary ledger file, append events, and validate
|
||||
read-back and hash chaining.
|
||||
|
||||
## Notes
|
||||
|
||||
- This is intentionally minimal and deterministic.
|
||||
- Replace `childWorkerStub` with real model calls later.
|
||||
- This is intentionally minimal and deterministic.
|
||||
- Replace `childWorkerStub` with real model calls later.
|
||||
|
||||
144
kernel/supervisor-go/ledger/ledger.go
Normal file
144
kernel/supervisor-go/ledger/ledger.go
Normal file
@@ -0,0 +1,144 @@
|
||||
package ledger
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Event is a single ledger entry stored as JSON per-line (JSONL).
|
||||
type Event struct {
|
||||
ID string `json:"id"`
|
||||
TS string `json:"ts"`
|
||||
StreamID string `json:"stream_id"`
|
||||
Type string `json:"type"`
|
||||
Body map[string]any `json:"body"`
|
||||
PrevHash *string `json:"prev_hash"`
|
||||
Hash string `json:"hash"`
|
||||
}
|
||||
|
||||
// Ledger manages append-only JSONL ledger for a single stream.
|
||||
// It is intentionally minimal and safe for the demo (not highly concurrent).
|
||||
type Ledger struct {
|
||||
Path string
|
||||
StreamID string
|
||||
LastHash *string
|
||||
}
|
||||
|
||||
// New returns a Ledger instance. It does not create the file yet.
|
||||
func New(path, streamID string) *Ledger {
|
||||
return &Ledger{Path: path, StreamID: streamID, LastHash: nil}
|
||||
}
|
||||
|
||||
func sha256Hex(b []byte) string {
|
||||
h := sha256.Sum256(b)
|
||||
return hex.EncodeToString(h[:])
|
||||
}
|
||||
|
||||
func computeHash(id, ts, stream, typ string, body map[string]any, prev *string) (string, error) {
|
||||
env := map[string]any{
|
||||
"id": id,
|
||||
"ts": ts,
|
||||
"stream_id": stream,
|
||||
"type": typ,
|
||||
"body": body,
|
||||
"prev_hash": prev,
|
||||
}
|
||||
// Best-effort deterministic ordering for body keys.
|
||||
if body != nil {
|
||||
keys := make([]string, 0, len(body))
|
||||
for k := range body {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
ordered := make(map[string]any, len(body))
|
||||
for _, k := range keys {
|
||||
ordered[k] = body[k]
|
||||
}
|
||||
env["body"] = ordered
|
||||
}
|
||||
b, err := json.Marshal(env)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return sha256Hex(b), nil
|
||||
}
|
||||
|
||||
// Append writes a new event to the ledger file and updates LastHash.
|
||||
func (l *Ledger) Append(typ string, body map[string]any) (*Event, error) {
|
||||
if l == nil {
|
||||
return nil, errors.New("nil ledger")
|
||||
}
|
||||
id := fmt.Sprintf("%d", time.Now().UnixNano())
|
||||
ts := time.Now().UTC().Format(time.RFC3339Nano)
|
||||
h, err := computeHash(id, ts, l.StreamID, typ, body, l.LastHash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ev := &Event{
|
||||
ID: id,
|
||||
TS: ts,
|
||||
StreamID: l.StreamID,
|
||||
Type: typ,
|
||||
Body: body,
|
||||
PrevHash: l.LastHash,
|
||||
Hash: h,
|
||||
}
|
||||
f, err := os.OpenFile(l.Path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
enc := json.NewEncoder(f)
|
||||
if err := enc.Encode(ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l.LastHash = &ev.Hash
|
||||
return ev, nil
|
||||
}
|
||||
|
||||
// ReadAll reads all events from the ledger file. If the file does not exist, returns empty slice.
|
||||
func (l *Ledger) ReadAll() ([]*Event, error) {
|
||||
f, err := os.Open(l.Path)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return []*Event{}, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
var out []*Event
|
||||
sc := bufio.NewScanner(f)
|
||||
for sc.Scan() {
|
||||
line := sc.Bytes()
|
||||
var ev Event
|
||||
if err := json.Unmarshal(line, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, &ev)
|
||||
}
|
||||
if err := sc.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// LoadLastHash sets the ledger LastHash to the last entry in the file (if any).
|
||||
func (l *Ledger) LoadLastHash() error {
|
||||
evs, err := l.ReadAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(evs) == 0 {
|
||||
l.LastHash = nil
|
||||
return nil
|
||||
}
|
||||
l.LastHash = &evs[len(evs)-1].Hash
|
||||
return nil
|
||||
}
|
||||
56
kernel/supervisor-go/ledger/ledger_test.go
Normal file
56
kernel/supervisor-go/ledger/ledger_test.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package ledger
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAppendAndRead(t *testing.T) {
|
||||
f, err := os.CreateTemp("", "ledger-test-*.jsonl")
|
||||
if err != nil {
|
||||
t.Fatalf("tempfile: %v", err)
|
||||
}
|
||||
path := f.Name()
|
||||
f.Close()
|
||||
defer os.Remove(path)
|
||||
|
||||
l := New(path, "stream-1")
|
||||
// ensure empty read
|
||||
evs, err := l.ReadAll()
|
||||
if err != nil {
|
||||
t.Fatalf("read empty: %v", err)
|
||||
}
|
||||
if len(evs) != 0 {
|
||||
t.Fatalf("expected 0 events, got %d", len(evs))
|
||||
}
|
||||
|
||||
e1, err := l.Append("route_decision", map[string]any{"tier": "fast"})
|
||||
if err != nil {
|
||||
t.Fatalf("append1: %v", err)
|
||||
}
|
||||
if l.LastHash == nil || *l.LastHash != e1.Hash {
|
||||
t.Fatalf("lasthash not set after append1")
|
||||
}
|
||||
|
||||
e2, err := l.Append("final_output_emitted", map[string]any{"result": "ok"})
|
||||
if err != nil {
|
||||
t.Fatalf("append2: %v", err)
|
||||
}
|
||||
if l.LastHash == nil || *l.LastHash != e2.Hash {
|
||||
t.Fatalf("lasthash not set after append2")
|
||||
}
|
||||
if e2.PrevHash == nil || *e2.PrevHash != e1.Hash {
|
||||
t.Fatalf("prevhash chain broken")
|
||||
}
|
||||
|
||||
evs, err = l.ReadAll()
|
||||
if err != nil {
|
||||
t.Fatalf("read: %v", err)
|
||||
}
|
||||
if len(evs) != 2 {
|
||||
t.Fatalf("expected 2 events, got %d", len(evs))
|
||||
}
|
||||
if evs[0].Hash != e1.Hash || evs[1].Hash != e2.Hash {
|
||||
t.Fatalf("readback hashes mismatch")
|
||||
}
|
||||
}
|
||||
@@ -4,121 +4,20 @@ import (
|
||||
"bufio"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"cognitiveos/supervisor/ledger"
|
||||
)
|
||||
|
||||
// This is a runnable *stub* for the deterministic control plane.
|
||||
// It does NOT call real LLMs. Instead it demonstrates:
|
||||
// - deterministic routing (fast/default/heavy)
|
||||
// - append-only, hash-chained ledger per "stream"
|
||||
// - must-delegate enforcement: router cannot emit without a spawn+child output proof
|
||||
|
||||
type Event struct {
|
||||
ID string `json:"id"`
|
||||
TS string `json:"ts"`
|
||||
StreamID string `json:"stream_id"`
|
||||
Type string `json:"type"`
|
||||
Body map[string]any `json:"body"`
|
||||
PrevHash *string `json:"prev_hash"`
|
||||
Hash string `json:"hash"`
|
||||
}
|
||||
|
||||
func canonicalJSON(v any) ([]byte, error) {
|
||||
// Simple canonicalization: marshal with sorted keys for map[string]any at top-level envelope.
|
||||
// For a real system, use a stricter canonical JSON spec.
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func sha256Hex(b []byte) string {
|
||||
h := sha256.Sum256(b)
|
||||
return hex.EncodeToString(h[:])
|
||||
}
|
||||
|
||||
func newID() string {
|
||||
// Not cryptographically strong; good enough for a stub.
|
||||
return fmt.Sprintf("%d-%d", time.Now().UnixNano(), os.Getpid())
|
||||
}
|
||||
|
||||
func computeHash(id, ts, stream, typ string, body map[string]any, prev *string) (string, error) {
|
||||
env := map[string]any{
|
||||
"id": id,
|
||||
"ts": ts,
|
||||
"stream_id": stream,
|
||||
"type": typ,
|
||||
"body": body,
|
||||
"prev_hash": prev,
|
||||
}
|
||||
// Sort body keys deterministically (best effort).
|
||||
if body != nil {
|
||||
keys := make([]string, 0, len(body))
|
||||
for k := range body {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
ordered := make(map[string]any, len(body))
|
||||
for _, k := range keys {
|
||||
ordered[k] = body[k]
|
||||
}
|
||||
env["body"] = ordered
|
||||
}
|
||||
b, err := canonicalJSON(env)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return sha256Hex(b), nil
|
||||
}
|
||||
|
||||
type Ledger struct {
|
||||
Path string
|
||||
StreamID string
|
||||
LastHash *string
|
||||
}
|
||||
|
||||
func (l *Ledger) Append(typ string, body map[string]any) (*Event, error) {
|
||||
id := newID()
|
||||
ts := time.Now().UTC().Format(time.RFC3339Nano)
|
||||
h, err := computeHash(id, ts, l.StreamID, typ, body, l.LastHash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ev := &Event{
|
||||
ID: id,
|
||||
TS: ts,
|
||||
StreamID: l.StreamID,
|
||||
Type: typ,
|
||||
Body: body,
|
||||
PrevHash: l.LastHash,
|
||||
Hash: h,
|
||||
}
|
||||
f, err := os.OpenFile(l.Path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
enc := json.NewEncoder(f)
|
||||
if err := enc.Encode(ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l.LastHash = &ev.Hash
|
||||
return ev, nil
|
||||
}
|
||||
|
||||
// Deterministic routing rules (reference):
|
||||
// - explicit @fast/@heavy
|
||||
// - keywords: one-liner, regex, jq, bash, quick => FAST
|
||||
// - multi-file/architecture/restructure/rewrite => HEAVY
|
||||
// - else DEFAULT
|
||||
func decideRoute(req string) (tier string, agent string, reason string) {
|
||||
l := strings.ToLower(req)
|
||||
if strings.Contains(l, "@fast") {
|
||||
@@ -141,7 +40,7 @@ func decideRoute(req string) (tier string, agent string, reason string) {
|
||||
}
|
||||
|
||||
func childWorkerStub(agent string, req string) string {
|
||||
// Placeholder for actual specialist call. This returns deterministic output for demo.
|
||||
// Deterministic placeholder for demo.
|
||||
return fmt.Sprintf("[stub:%s] Received request (%d chars).", agent, len(req))
|
||||
}
|
||||
|
||||
@@ -150,11 +49,12 @@ func main() {
|
||||
flag.Parse()
|
||||
|
||||
routerStream := "router-session-1"
|
||||
ledger := &Ledger{Path: *ledgerPath, StreamID: routerStream, LastHash: nil}
|
||||
l := ledger.New(*ledgerPath, routerStream)
|
||||
// initialize last-hash from existing file if present
|
||||
_ = l.LoadLastHash()
|
||||
|
||||
fmt.Println("CognitiveOS Supervisor (stub). Type a request, then press Enter. Ctrl-D to exit.")
|
||||
fmt.Println("CognitiveOS Supervisor (demo). Type a request, then press Enter. Ctrl-D to exit.")
|
||||
sc := bufio.NewScanner(os.Stdin)
|
||||
|
||||
for sc.Scan() {
|
||||
req := sc.Text()
|
||||
if strings.TrimSpace(req) == "" {
|
||||
@@ -162,35 +62,32 @@ func main() {
|
||||
}
|
||||
|
||||
tier, agent, reason := decideRoute(req)
|
||||
_, _ = ledger.Append("route_decision", map[string]any{
|
||||
_, _ = l.Append("route_decision", map[string]any{
|
||||
"tier": tier,
|
||||
"target_agent": agent,
|
||||
"reason": reason,
|
||||
})
|
||||
|
||||
// session_spawn (must-delegate proof step)
|
||||
childSession := "child-" + newID()
|
||||
childSession := "child-" + fmt.Sprintf("%d", time.Now().UnixNano())
|
||||
payloadHash := sha256Hex([]byte(req))
|
||||
_, _ = ledger.Append("session_spawn", map[string]any{
|
||||
_, _ = l.Append("session_spawn", map[string]any{
|
||||
"child_session": childSession,
|
||||
"child_agent": agent,
|
||||
"payload_hash": payloadHash,
|
||||
})
|
||||
|
||||
// child_output_received
|
||||
childOut := childWorkerStub(agent, req)
|
||||
outHash := sha256Hex([]byte(childOut))
|
||||
_, _ = ledger.Append("child_output_received", map[string]any{
|
||||
_, _ = l.Append("child_output_received", map[string]any{
|
||||
"child_session": childSession,
|
||||
"output_hash": outHash,
|
||||
"output_len": len(childOut),
|
||||
})
|
||||
|
||||
// emit_guarded (enforced)
|
||||
preamble := fmt.Sprintf("Delegating to %s because %s.", agent, reason)
|
||||
final := preamble + "\n" + childOut
|
||||
_, _ = ledger.Append("final_output_emitted", map[string]any{
|
||||
"proof_ref": "proof-" + newID(),
|
||||
_, _ = l.Append("final_output_emitted", map[string]any{
|
||||
"proof_ref": "proof-" + fmt.Sprintf("%d", time.Now().UnixNano()),
|
||||
"output_hash": sha256Hex([]byte(final)),
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user