diff --git a/kernel/supervisor-go/README.md b/kernel/supervisor-go/README.md index 1443ca7..11dc7a3 100644 --- a/kernel/supervisor-go/README.md +++ b/kernel/supervisor-go/README.md @@ -3,14 +3,30 @@ Runnable stub demonstrating CognitiveOS control-plane concepts without an actual LLM. -## Run +This directory now contains a small ledger package (ledger/) with Append +and Read APIs and unit tests, plus a tiny CLI demo (main.go) that +appends route/session/output events to a JSONL ledger. -go run . -ledger ./ledger.jsonl +## Run (demo) -Then type requests and observe: - deterministic route decision - -session_spawn + child_output_received - final_output_emitted +Build or run the CLI: + + go run . -ledger ./ledger.jsonl + +Then type requests and observe the deterministic route decision, +spawn/output events, and final output. The ledger file will be created +as JSONL at the path you provide. + +## Tests + +Run the unit tests for the ledger package: + + go test ./ledger + +The tests create a temporary ledger file, append events, and validate +read-back and hash chaining. ## Notes -- This is intentionally minimal and deterministic. -- Replace `childWorkerStub` with real model calls later. +- This is intentionally minimal and deterministic. +- Replace `childWorkerStub` with real model calls later. diff --git a/kernel/supervisor-go/ledger/ledger.go b/kernel/supervisor-go/ledger/ledger.go new file mode 100644 index 0000000..487f800 --- /dev/null +++ b/kernel/supervisor-go/ledger/ledger.go @@ -0,0 +1,144 @@ +package ledger + +import ( + "bufio" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "os" + "sort" + "time" +) + +// Event is a single ledger entry stored as JSON per-line (JSONL). +type Event struct { + ID string `json:"id"` + TS string `json:"ts"` + StreamID string `json:"stream_id"` + Type string `json:"type"` + Body map[string]any `json:"body"` + PrevHash *string `json:"prev_hash"` + Hash string `json:"hash"` +} + +// Ledger manages append-only JSONL ledger for a single stream. +// It is intentionally minimal and safe for the demo (not highly concurrent). +type Ledger struct { + Path string + StreamID string + LastHash *string +} + +// New returns a Ledger instance. It does not create the file yet. +func New(path, streamID string) *Ledger { + return &Ledger{Path: path, StreamID: streamID, LastHash: nil} +} + +func sha256Hex(b []byte) string { + h := sha256.Sum256(b) + return hex.EncodeToString(h[:]) +} + +func computeHash(id, ts, stream, typ string, body map[string]any, prev *string) (string, error) { + env := map[string]any{ + "id": id, + "ts": ts, + "stream_id": stream, + "type": typ, + "body": body, + "prev_hash": prev, + } + // Best-effort deterministic ordering for body keys. + if body != nil { + keys := make([]string, 0, len(body)) + for k := range body { + keys = append(keys, k) + } + sort.Strings(keys) + ordered := make(map[string]any, len(body)) + for _, k := range keys { + ordered[k] = body[k] + } + env["body"] = ordered + } + b, err := json.Marshal(env) + if err != nil { + return "", err + } + return sha256Hex(b), nil +} + +// Append writes a new event to the ledger file and updates LastHash. +func (l *Ledger) Append(typ string, body map[string]any) (*Event, error) { + if l == nil { + return nil, errors.New("nil ledger") + } + id := fmt.Sprintf("%d", time.Now().UnixNano()) + ts := time.Now().UTC().Format(time.RFC3339Nano) + h, err := computeHash(id, ts, l.StreamID, typ, body, l.LastHash) + if err != nil { + return nil, err + } + ev := &Event{ + ID: id, + TS: ts, + StreamID: l.StreamID, + Type: typ, + Body: body, + PrevHash: l.LastHash, + Hash: h, + } + f, err := os.OpenFile(l.Path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + return nil, err + } + defer f.Close() + enc := json.NewEncoder(f) + if err := enc.Encode(ev); err != nil { + return nil, err + } + l.LastHash = &ev.Hash + return ev, nil +} + +// ReadAll reads all events from the ledger file. If the file does not exist, returns empty slice. +func (l *Ledger) ReadAll() ([]*Event, error) { + f, err := os.Open(l.Path) + if err != nil { + if os.IsNotExist(err) { + return []*Event{}, nil + } + return nil, err + } + defer f.Close() + var out []*Event + sc := bufio.NewScanner(f) + for sc.Scan() { + line := sc.Bytes() + var ev Event + if err := json.Unmarshal(line, &ev); err != nil { + return nil, err + } + out = append(out, &ev) + } + if err := sc.Err(); err != nil { + return nil, err + } + return out, nil +} + +// LoadLastHash sets the ledger LastHash to the last entry in the file (if any). +func (l *Ledger) LoadLastHash() error { + evs, err := l.ReadAll() + if err != nil { + return err + } + if len(evs) == 0 { + l.LastHash = nil + return nil + } + l.LastHash = &evs[len(evs)-1].Hash + return nil +} diff --git a/kernel/supervisor-go/ledger/ledger_test.go b/kernel/supervisor-go/ledger/ledger_test.go new file mode 100644 index 0000000..a0a40a2 --- /dev/null +++ b/kernel/supervisor-go/ledger/ledger_test.go @@ -0,0 +1,56 @@ +package ledger + +import ( + "os" + "testing" +) + +func TestAppendAndRead(t *testing.T) { + f, err := os.CreateTemp("", "ledger-test-*.jsonl") + if err != nil { + t.Fatalf("tempfile: %v", err) + } + path := f.Name() + f.Close() + defer os.Remove(path) + + l := New(path, "stream-1") + // ensure empty read + evs, err := l.ReadAll() + if err != nil { + t.Fatalf("read empty: %v", err) + } + if len(evs) != 0 { + t.Fatalf("expected 0 events, got %d", len(evs)) + } + + e1, err := l.Append("route_decision", map[string]any{"tier": "fast"}) + if err != nil { + t.Fatalf("append1: %v", err) + } + if l.LastHash == nil || *l.LastHash != e1.Hash { + t.Fatalf("lasthash not set after append1") + } + + e2, err := l.Append("final_output_emitted", map[string]any{"result": "ok"}) + if err != nil { + t.Fatalf("append2: %v", err) + } + if l.LastHash == nil || *l.LastHash != e2.Hash { + t.Fatalf("lasthash not set after append2") + } + if e2.PrevHash == nil || *e2.PrevHash != e1.Hash { + t.Fatalf("prevhash chain broken") + } + + evs, err = l.ReadAll() + if err != nil { + t.Fatalf("read: %v", err) + } + if len(evs) != 2 { + t.Fatalf("expected 2 events, got %d", len(evs)) + } + if evs[0].Hash != e1.Hash || evs[1].Hash != e2.Hash { + t.Fatalf("readback hashes mismatch") + } +} diff --git a/kernel/supervisor-go/main.go b/kernel/supervisor-go/main.go index 4dfa28a..1c2f9af 100644 --- a/kernel/supervisor-go/main.go +++ b/kernel/supervisor-go/main.go @@ -4,121 +4,20 @@ import ( "bufio" "crypto/sha256" "encoding/hex" - "encoding/json" "flag" "fmt" "os" - "sort" "strings" "time" + + "cognitiveos/supervisor/ledger" ) -// This is a runnable *stub* for the deterministic control plane. -// It does NOT call real LLMs. Instead it demonstrates: -// - deterministic routing (fast/default/heavy) -// - append-only, hash-chained ledger per "stream" -// - must-delegate enforcement: router cannot emit without a spawn+child output proof - -type Event struct { - ID string `json:"id"` - TS string `json:"ts"` - StreamID string `json:"stream_id"` - Type string `json:"type"` - Body map[string]any `json:"body"` - PrevHash *string `json:"prev_hash"` - Hash string `json:"hash"` -} - -func canonicalJSON(v any) ([]byte, error) { - // Simple canonicalization: marshal with sorted keys for map[string]any at top-level envelope. - // For a real system, use a stricter canonical JSON spec. - b, err := json.Marshal(v) - if err != nil { - return nil, err - } - return b, nil -} - func sha256Hex(b []byte) string { h := sha256.Sum256(b) return hex.EncodeToString(h[:]) } -func newID() string { - // Not cryptographically strong; good enough for a stub. - return fmt.Sprintf("%d-%d", time.Now().UnixNano(), os.Getpid()) -} - -func computeHash(id, ts, stream, typ string, body map[string]any, prev *string) (string, error) { - env := map[string]any{ - "id": id, - "ts": ts, - "stream_id": stream, - "type": typ, - "body": body, - "prev_hash": prev, - } - // Sort body keys deterministically (best effort). - if body != nil { - keys := make([]string, 0, len(body)) - for k := range body { - keys = append(keys, k) - } - sort.Strings(keys) - ordered := make(map[string]any, len(body)) - for _, k := range keys { - ordered[k] = body[k] - } - env["body"] = ordered - } - b, err := canonicalJSON(env) - if err != nil { - return "", err - } - return sha256Hex(b), nil -} - -type Ledger struct { - Path string - StreamID string - LastHash *string -} - -func (l *Ledger) Append(typ string, body map[string]any) (*Event, error) { - id := newID() - ts := time.Now().UTC().Format(time.RFC3339Nano) - h, err := computeHash(id, ts, l.StreamID, typ, body, l.LastHash) - if err != nil { - return nil, err - } - ev := &Event{ - ID: id, - TS: ts, - StreamID: l.StreamID, - Type: typ, - Body: body, - PrevHash: l.LastHash, - Hash: h, - } - f, err := os.OpenFile(l.Path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) - if err != nil { - return nil, err - } - defer f.Close() - - enc := json.NewEncoder(f) - if err := enc.Encode(ev); err != nil { - return nil, err - } - l.LastHash = &ev.Hash - return ev, nil -} - -// Deterministic routing rules (reference): -// - explicit @fast/@heavy -// - keywords: one-liner, regex, jq, bash, quick => FAST -// - multi-file/architecture/restructure/rewrite => HEAVY -// - else DEFAULT func decideRoute(req string) (tier string, agent string, reason string) { l := strings.ToLower(req) if strings.Contains(l, "@fast") { @@ -141,7 +40,7 @@ func decideRoute(req string) (tier string, agent string, reason string) { } func childWorkerStub(agent string, req string) string { - // Placeholder for actual specialist call. This returns deterministic output for demo. + // Deterministic placeholder for demo. return fmt.Sprintf("[stub:%s] Received request (%d chars).", agent, len(req)) } @@ -150,11 +49,12 @@ func main() { flag.Parse() routerStream := "router-session-1" - ledger := &Ledger{Path: *ledgerPath, StreamID: routerStream, LastHash: nil} + l := ledger.New(*ledgerPath, routerStream) + // initialize last-hash from existing file if present + _ = l.LoadLastHash() - fmt.Println("CognitiveOS Supervisor (stub). Type a request, then press Enter. Ctrl-D to exit.") + fmt.Println("CognitiveOS Supervisor (demo). Type a request, then press Enter. Ctrl-D to exit.") sc := bufio.NewScanner(os.Stdin) - for sc.Scan() { req := sc.Text() if strings.TrimSpace(req) == "" { @@ -162,35 +62,32 @@ func main() { } tier, agent, reason := decideRoute(req) - _, _ = ledger.Append("route_decision", map[string]any{ + _, _ = l.Append("route_decision", map[string]any{ "tier": tier, "target_agent": agent, "reason": reason, }) - // session_spawn (must-delegate proof step) - childSession := "child-" + newID() + childSession := "child-" + fmt.Sprintf("%d", time.Now().UnixNano()) payloadHash := sha256Hex([]byte(req)) - _, _ = ledger.Append("session_spawn", map[string]any{ + _, _ = l.Append("session_spawn", map[string]any{ "child_session": childSession, "child_agent": agent, "payload_hash": payloadHash, }) - // child_output_received childOut := childWorkerStub(agent, req) outHash := sha256Hex([]byte(childOut)) - _, _ = ledger.Append("child_output_received", map[string]any{ + _, _ = l.Append("child_output_received", map[string]any{ "child_session": childSession, "output_hash": outHash, "output_len": len(childOut), }) - // emit_guarded (enforced) preamble := fmt.Sprintf("Delegating to %s because %s.", agent, reason) final := preamble + "\n" + childOut - _, _ = ledger.Append("final_output_emitted", map[string]any{ - "proof_ref": "proof-" + newID(), + _, _ = l.Append("final_output_emitted", map[string]any{ + "proof_ref": "proof-" + fmt.Sprintf("%d", time.Now().UnixNano()), "output_hash": sha256Hex([]byte(final)), })