Build AI agents as decentralized, event-driven microservices.
Agents, tools, and consumers are independently deployable nodes that dynamically route messages to each other and communicate asynchronously. Build and compose AI workflows as a distributed network of interconnected AI agents.
$ pip install calfkitStatus: Alpha (pre-1.0). Calfkit is under active development and the public API can change between versions. Pin a version and check the CHANGELOG before big version bumps.
Calfkit shifts the paradigm of agent development from orchestration (centralized control) to choreography (decoupled interactions). Building agent teams with traditional agent frameworks rely on tightly coupled, hard-coded interactions. Calfkit allows you to build agents into an interconnected mesh right out of the box.
-
Distributed by default: Every agent, tool, and consumer is its own node in a connected network โ so agents and tools are a distributed system you deploy and scale piece by piece, not a monolith.
-
Stream-native: Agents communicate on event streams, so consuming live data โ market feeds, IoT sensors, user activity โ is the default, not a bolted-on integration. Plug into any upstream source and publish to any downstream system: CRMs, warehouses, even other agents.
-
Composable without coupling: Build multi-agent teams simply by dropping in agents on shared topics โ no extra wiring, no edits to existing code.
$ pip install calfkitThe command-line tools (calfkit run, calfkit topics) live behind an optional extra:
$ pip install "calfkit[cli]"- Python 3.10 or later
- Docker installed and running (for running with a local Calfkit broker)
- An LLM provider API key
Every Calfkit deployment needs a running broker.
Option A: Local Broker (Requires Docker)
Clone the calfkit-broker repo and start a local broker:
$ git clone https://github.com/calf-ai/calfkit-broker && cd calfkit-broker && make dev-upOnce it's ready, open a new terminal to continue.
Option B: โ๏ธ Calfkit Cloud (In Beta)
Skip the infrastructure. Calfkit Cloud is a fully-managed broker for Calfkit agents โ nothing to self-host, with built-in observability and agent-event tracing. You deploy against a hosted broker endpoint instead of running one locally.
A complete, runnable version of this walkthrough lives in examples/quickstart/.
A tool is an independent service, not owned by any agent. Define one with @agent_tool; once deployed, any agent can invoke it. Deploy once, use everywhere.
# weather_tool.py
from calfkit.nodes import agent_tool
# Define a tool โ the @agent_tool decorator turns any function into a deployable tool node
@agent_tool
def get_weather(location: str) -> str:
"""Get the current weather at a location"""
return f"It's sunny in {location}"calfkit run points at a module:attr target and starts the tool for you โ no extra wiring required. This requires the [cli] extra installed.
$ calfkit run weather_tool:get_weatherProvide the Agent with a reference to the tool using the same tool definition. This agent listens to the weather_agent.input topic.
# agent_service.py
from calfkit.nodes import Agent
from calfkit.providers import OpenAIResponsesModelClient
from weather_tool import get_weather # Import the tool definition (reusable)
agent = Agent(
"weather_agent",
system_prompt="You are a helpful assistant.",
subscribe_topics="weather_agent.input",
publish_topic="weather_agent.output", # Stream every agent output here. Other agents and consumers can listen into this stream
model_client=OpenAIResponsesModelClient(model_name="gpt-5.4-nano"),
tools=[get_weather], # Register tool definitions with the agent
)Set your OpenAI API key:
$ export OPENAI_API_KEY=sk-...Start the agent:
$ calfkit run agent_service:agentSend a message to the agent.
# execute.py
import asyncio
from calfkit.client import Client
async def main():
client = Client.connect("localhost:9092") # Connect to the broker
# Send a request and await the response
result = await client.execute(
"What's the weather in Tokyo?",
"weather_agent.input", # The topic the agent is listening to
)
print(f"Assistant: {result.output}")
if __name__ == "__main__":
asyncio.run(main())Run the file to invoke the agent:
$ python execute.pyYou've completed the quickstart โ a tool and an agent deployed as separate services and invoked over Kafka. The rest of this section covers common things to do next.
Set final_output_type to enforce structured output from the LLM โ it's deserialized into your type automatically on the client side.
from dataclasses import dataclass
from calfkit.nodes import Agent
from calfkit.providers import OpenAIResponsesModelClient
@dataclass
class WeatherReport:
location: str
summary: str
agent = Agent(
"weather_agent",
system_prompt="You are a helpful assistant.",
subscribe_topics="weather_agent.input",
model_client=OpenAIResponsesModelClient(model_name="gpt-5.4-nano"),
final_output_type=WeatherReport, # Enforce structured output
)When invoking, pass the matching output_type to deserialize the response:
result = await client.execute(
"What's the weather in Tokyo?",
"weather_agent.input",
output_type=WeatherReport,
)
print(result.output.location) # "Tokyo"
print(result.output.summary) # "It's sunny in Tokyo"For production, you can deploy each node with an explicit Worker, keeping startup, scaling, and lifecycle management under your control:
# serve_tool.py โ deploy the tool as its own service
import asyncio
from calfkit.client import Client
from calfkit.worker import Worker
from weather_tool import get_weather
async def main():
client = Client.connect("localhost:9092") # Connect to Kafka broker
worker = Worker(client, nodes=[get_weather]) # One service per node
await worker.run() # (Blocking) serve until stopped
if __name__ == "__main__":
asyncio.run(main())$ python serve_tool.pyThe full public surface is re-exported from the top-level calfkit package:
from calfkit import (
Client, InvocationHandle, NodeResult, # client
Agent, agent_tool, consumer, # node authoring
NodeDef, ToolNodeDef, ConsumerNodeDef, BaseNodeDef, # node types
ConsumerFn, GateFunction, # node typing helpers
ToolContext, # tool-side context
OpenAIModelClient, OpenAIResponsesModelClient, AnthropicModelClient, # providers
Worker, LifecycleContext, ServingContext, ResourceSetupContext, # worker + lifecycle
ProvisioningConfig, # provisioning (config only)
DeserializationError, LifecycleConfigError, ToolExecutionError, # exceptions
)Key entry points:
| Symbol | Purpose |
|---|---|
Client.connect(server_urls=None, reply_topic=None, reply_ttl=None, *, provisioning=None, ...) |
Connect to the broker. Defaults to $CALF_HOST_URL โ localhost. |
Client.execute(prompt, topic, *, output_type=..., deps=..., message_history=..., timeout=None, ...) |
Request/reply: publish and await the NodeResult. |
Client.start(...) / Client.send(...) |
Async-handle variant, and one-way send (no reply future; optional reply_to return address). |
Agent(node_id, *, system_prompt=..., subscribe_topics, publish_topic=None, model_client, tools=None, gates=None, final_output_type=str, model_settings=None, ...) |
An agent node. |
@agent_tool / @consumer(...) |
Decorators that turn a function into a tool node / consumer node. |
Worker(client, nodes=None, ...) โ run() / start() / stop() |
Host one or more nodes against the broker. |
Full signatures and behavior live in the source docstrings and the documentation below.
In-repo documentation lives under docs/.
How-to guides โ goal-oriented walkthroughs:
- How to call nodes from a client โ the three invocation patterns (
execute/start/send), multi-turn conversations, runtime dependency injection (deps), temporary instructions, fire-and-forget, and bounding reply memory withreply_ttl. - How to tap a topic with a consumer node โ terminal sinks that run arbitrary Python against every event on a topic; tap an agent's
publish_topicto log, persist, or fan out. - How to gate node invocations โ predicate gate stacks that let a node decline an inbound event before
run()runs (e.g. when agents share an input topic). - How to give agents MCP tools โ deploy an
MCPToolboxfronting an MCP server and pass it to agents like a tool node; tools are discovered and kept fresh across processes automatically. - Worker lifecycle & embedding โ open long-lived resources at startup and close them on shutdown, publish presence events, and run with
run(), the embeddablestart()/stop(), orasync with worker:.
Reference:
- CLI reference โ the
calfkit runandcalfkit topicscommands. - Topic provisioning โ the experimental, opt-in topic-creation helper for dev/CI.
Design & background:
- Design documents โ accepted and proposed designs.
See ROADMAP.md for what's under consideration โ listing there isn't a commitment.
Issues and pull requests are welcome. Please open an issue to discuss substantial changes before sending a PR.
See CONTRIBUTING.md for development setup, the quality gates (make fix / make check / make test), PR conventions, and how to write and run tests โ including the real-broker integration lane.
If you found this project interesting or useful, please consider:
- โญ Starring the repository โ it helps others discover it!
- ๐ Reporting issues
- ๐ Submitting PRs
This project is licensed under the Apache License 2.0. See the LICENSE file for details.