Python
Официальный Python SDK для AACFlow позволяет выполнять рабочие процессы программно из ваших Python-приложений с использованием официального Python SDK.
Python SDK поддерживает Python 3.8+ с поддержкой асинхронного выполнения, автоматическим ограничением скорости с экспоненциальной задержкой и отслеживанием использования.
Установка
Установите SDK с помощью pip:
pip install aacflow-sdkБыстрый старт
Вот простой пример, чтобы начать работу:
from aacflow import AACFlowClient
# Инициализируем клиент
client = AACFlowClient(
api_key="your-api-key-here",
base_url="https://aacflow.io" # опционально, по умолчанию https://aacflow.io
)
# Выполняем рабочий процесс
try:
result = client.execute_workflow("workflow-id")
print("Рабочий процесс выполнен успешно:", result)
except Exception as error:
print("Выполнение рабочего процесса не удалось:", error)Справочник по API
AACFlowClient
Конструктор
AACFlowClient(api_key: str, base_url: str = "https://aacflow.io")Параметры:
api_key(str): Ваш API-ключ AACFlowbase_url(str, опционально): Базовый URL для API AACFlow
Методы
execute_workflow()
Выполняет рабочий процесс с опциональными входными данными.
result = client.execute_workflow(
"workflow-id",
input={"message": "Hello, world!"},
timeout=30.0 # 30 секунд
)Параметры:
workflow_id(str): ID рабочего процесса для выполненияinput(dict, опционально): Входные данные для передачи в рабочий процессtimeout(float, опционально): Таймаут в секундах (по умолчанию: 30.0)stream(bool, опционально): Включить потоковые ответы (по умолчанию: False)selected_outputs(list[str], опционально): Выходные данные блоков для потоковой передачи в форматеblockName.attribute(например,["agent1.content"])async_execution(bool, опционально): Выполнять асинхронно (по умолчанию: False)
Возвращает: WorkflowExecutionResult | AsyncExecutionResult
Когда async_execution=True, возвращает немедленно с ID задачи для опроса. В противном случае ожидает завершения.
get_workflow_status()
Получает статус рабочего процесса (статус развертывания и т.д.).
status = client.get_workflow_status("workflow-id")
print("Развернут:", status.is_deployed)Параметры:
workflow_id(str): ID рабочего процесса
Возвращает: WorkflowStatus
validate_workflow()
Проверяет, готов ли рабочий процесс к выполнению.
is_ready = client.validate_workflow("workflow-id")
if is_ready:
# Рабочий процесс развернут и готов
passПараметры:
workflow_id(str): ID рабочего процесса
Возвращает: bool
get_job_status()
Получает статус асинхронного выполнения задачи.
status = client.get_job_status("task-id-from-async-execution")
print("Статус:", status["status"]) # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
print("Выходные данные:", status["output"])Параметры:
task_id(str): ID задачи, возвращенный из асинхронного выполнения
Возвращает: Dict[str, Any]
Поля ответа:
success(bool): Успешен ли запросtaskId(str): ID задачиstatus(str): Один из'queued','processing','completed','failed','cancelled'metadata(dict): СодержитstartedAt,completedAtиdurationoutput(any, опционально): Выходные данные рабочего процесса (при завершении)error(any, опционально): Детали ошибки (при сбое)estimatedDuration(int, опционально): Предполагаемая продолжительность в миллисекундах (при обработке/в очереди)
execute_with_retry()
Выполняет рабочий процесс с автоматической повторной попыткой при ошибках ограничения скорости с использованием экспоненциальной задержки.
result = client.execute_with_retry(
"workflow-id",
input={"message": "Hello"},
timeout=30.0,
max_retries=3, # Максимальное количество повторных попыток
initial_delay=1.0, # Начальная задержка в секундах
max_delay=30.0, # Максимальная задержка в секундах
backoff_multiplier=2.0 # Множитель экспоненциальной задержки
)Параметры:
workflow_id(str): ID рабочего процесса для выполненияinput(dict, опционально): Входные данные для передачи в рабочий процессtimeout(float, опционально): Таймаут в секундахstream(bool, опционально): Включить потоковые ответыselected_outputs(list, опционально): Выходные данные блоков для потоковой передачиasync_execution(bool, опционально): Выполнять асинхронноmax_retries(int, опционально): Максимальное количество повторных попыток (по умолчанию: 3)initial_delay(float, опционально): Начальная задержка в секундах (по умолчанию: 1.0)max_delay(float, опционально): Максимальная задержка в секундах (по умолчанию: 30.0)backoff_multiplier(float, опционально): Множитель задержки (по умолчанию: 2.0)
Возвращает: WorkflowExecutionResult | AsyncExecutionResult
Логика повторных попыток использует экспоненциальную задержку (1с → 2с → 4с → 8с...) с ±25% случайным отклонением для предотвращения "стадного эффекта". Если API предоставляет заголовок retry-after, он будет использован вместо этого.
get_rate_limit_info()
Получает текущую информацию об ограничении скорости из последнего ответа API.
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
print("Лимит:", rate_limit_info.limit)
print("Осталось:", rate_limit_info.remaining)
print("Сброс:", datetime.fromtimestamp(rate_limit_info.reset))Возвращает: RateLimitInfo | None
get_usage_limits()
Получает текущие лимиты использования и информацию о квоте для вашей учетной записи.
limits = client.get_usage_limits()
print("Осталось синхронных запросов:", limits.rate_limit["sync"]["remaining"])
print("Осталось асинхронных запросов:", limits.rate_limit["async"]["remaining"])
print("Текущая стоимость периода:", limits.usage["currentPeriodCost"])
print("Тариф:", limits.usage["plan"])Возвращает: UsageLimits
Структура ответа:
{
"success": bool,
"rateLimit": {
"sync": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"async": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"authType": str # 'api' или 'manual'
},
"usage": {
"currentPeriodCost": float,
"limit": float,
"plan": str # например, 'free', 'pro'
}
}set_api_key()
Обновляет API-ключ.
client.set_api_key("new-api-key")set_base_url()
Обновляет базовый URL.
client.set_base_url("https://my-custom-domain.com")close()
Закрывает базовую HTTP-сессию.
client.close()Классы данных
WorkflowExecutionResult
@dataclass
class WorkflowExecutionResult:
success: bool
output: Optional[Any] = None
error: Optional[str] = None
logs: Optional[List[Any]] = None
metadata: Optional[Dict[str, Any]] = None
trace_spans: Optional[List[Any]] = None
total_duration: Optional[float] = NoneAsyncExecutionResult
@dataclass
class AsyncExecutionResult:
success: bool
task_id: str
status: str # 'queued'
created_at: str
links: Dict[str, str] # например, {"status": "/api/jobs/{taskId}"}WorkflowStatus
@dataclass
class WorkflowStatus:
is_deployed: bool
deployed_at: Optional[str] = None
needs_redeployment: bool = FalseRateLimitInfo
@dataclass
class RateLimitInfo:
limit: int
remaining: int
reset: int
retry_after: Optional[int] = NoneUsageLimits
@dataclass
class UsageLimits:
success: bool
rate_limit: Dict[str, Any]
usage: Dict[str, Any]AACFlowError
class AACFlowError(Exception):
def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
super().__init__(message)
self.code = code
self.status = statusРаспространенные коды ошибок:
UNAUTHORIZED: Неверный API-ключTIMEOUT: Превышено время ожидания запросаRATE_LIMIT_EXCEEDED: Превышено ограничение скоростиUSAGE_LIMIT_EXCEEDED: Превышен лимит использованияEXECUTION_ERROR: Сбой выполнения рабочего процесса
Примеры
Базовое выполнение рабочего процесса
Настройте AACFlowClient с вашим API-ключом.
Проверьте, развернут ли рабочий процесс и готов ли к выполнению.
Запустите рабочий процесс с вашими входными данными.
Обработайте результат выполнения и обработайте любые ошибки.
import os
from aacflow import AACFlowClient
client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))
def run_workflow():
try:
# Проверяем, готов ли рабочий процесс
is_ready = client.validate_workflow("my-workflow-id")
if not is_ready:
raise Exception("Рабочий процесс не развернут или не готов")
# Выполняем рабочий процесс
result = client.execute_workflow(
"my-workflow-id",
input={
"message": "Обработайте эти данные",
"user_id": "12345"
}
)
if result.success:
print("Выходные данные:", result.output)
print("Продолжительность:", result.metadata.get("duration") if result.metadata else None)
else:
print("Рабочий процесс не удался:", result.error)
except Exception as error:
print("Ошибка:", error)
run_workflow()Обработка ошибок
Обрабатывайте различные типы ошибок, которые могут возникнуть во время выполнения рабочего процесса:
from aacflow import AACFlowClient, AACFlowError
import os
client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))
def execute_with_error_handling():
try:
result = client.execute_workflow("workflow-id")
return result
except AACFlowError as error:
if error.code == "UNAUTHORIZED":
print("Неверный API-ключ")
elif error.code == "TIMEOUT":
print("Превышено время выполнения рабочего процесса")
elif error.code == "USAGE_LIMIT_EXCEEDED":
print("Превышен лимит использования")
elif error.code == "INVALID_JSON":
print("Неверный JSON в теле запроса")
else:
print(f"Ошибка рабочего процесса: {error}")
raise
except Exception as error:
print(f"Неожиданная ошибка: {error}")
raiseИспользование контекстного менеджера
Используйте клиент как контекстный менеджер для автоматической очистки ресурсов:
from aacflow import AACFlowClient
import os
# Использование контекстного менеджера для автоматического закрытия сессии
with AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY")) as client:
result = client.execute_workflow("workflow-id")
print("Результат:", result)
# Сессия автоматически закрывается здесьПакетное выполнение рабочих процессов
Эффективно выполняйте несколько рабочих процессов:
from aacflow import AACFlowClient
import os
client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))
def execute_workflows_batch(workflow_data_pairs):
"""Выполняет несколько рабочих процессов с разными входными данными."""
results = []
for workflow_id, input_data in workflow_data_pairs:
try:
# Проверяем рабочий процесс перед выполнением
if not client.validate_workflow(workflow_id):
print(f"Пропускаем {workflow_id}: не развернут")
continue
result = client.execute_workflow(workflow_id, input_data)
results.append({
"workflow_id": workflow_id,
"success": result.success,
"output": result.output,
"error": result.error
})
except Exception as error:
results.append({
"workflow_id": workflow_id,
"success": False,
"error": str(error)
})
return results
# Пример использования
workflows = [
("workflow-1", {"type": "analysis", "data": "sample1"}),
("workflow-2", {"type": "processing", "data": "sample2"}),
]
results = execute_workflows_batch(workflows)
for result in results:
print(f"Рабочий процесс {result['workflow_id']}: {'Успех' if result['success'] else 'Не удалось'}")Асинхронное выполнение рабочего процесса
Выполняйте рабочие процессы асинхронно для длительных задач:
import os
import time
from aacflow import AACFlowClient
client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))
def execute_async():
try:
# Начинаем асинхронное выполнение
result = client.execute_workflow(
"workflow-id",
input={"data": "большой набор данных"},
async_execution=True # Выполнять асинхронно
)
# Проверяем, является ли результат асинхронным выполнением
if hasattr(result, 'task_id'):
print(f"ID задачи: {result.task_id}")
print(f"Конечная точка статуса: {result.links['status']}")
# Опрашиваем на завершение
status = client.get_job_status(result.task_id)
while status["status"] in ["queued", "processing"]:
print(f"Текущий статус: {status['status']}")
time.sleep(2) # Ждем 2 секунды
status = client.get_job_status(result.task_id)
if status["status"] == "completed":
print("Рабочий процесс завершен!")
print(f"Выходные данные: {status['output']}")
print(f"Продолжительность: {status['metadata']['duration']}")
else:
print(f"Рабочий процесс не удался: {status['error']}")
except Exception as error:
print(f"Ошибка: {error}")
execute_async()Ограничение скорости и повторные попытки
Автоматически обрабатывайте ограничения скорости с экспоненциальной задержкой:
import os
from aacflow import AACFlowClient, AACFlowError
client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))
def execute_with_retry_handling():
try:
# Автоматически повторяет при ограничении скорости
result = client.execute_with_retry(
"workflow-id",
input={"message": "Обработайте это"},
max_retries=5,
initial_delay=1.0,
max_delay=60.0,
backoff_multiplier=2.0
)
print(f"Успех: {result}")
except AACFlowError as error:
if error.code == "RATE_LIMIT_EXCEEDED":
print("Превышено ограничение скорости после всех повторных попыток")
# Проверяем информацию об ограничении скорости
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
from datetime import datetime
reset_time = datetime.fromtimestamp(rate_limit_info.reset)
print(f"Ограничение скорости сбросится в: {reset_time}")
execute_with_retry_handling()Мониторинг использования
Отслеживайте использование вашей учетной записи и лимиты:
import os
from aacflow import AACFlowClient
client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))
def check_usage():
try:
limits = client.get_usage_limits()
print("=== Ограничения скорости ===")
print("Синхронные запросы:")
print(f" Лимит: {limits.rate_limit['sync']['limit']}")
print(f" Осталось: {limits.rate_limit['sync']['remaining']}")
print(f" Сброс в: {limits.rate_limit['sync']['resetAt']}")
print(f" Ограничено: {limits.rate_limit['sync']['isLimited']}")
print("\nАсинхронные запросы:")
print(f" Лимит: {limits.rate_limit['async']['limit']}")
print(f" Осталось: {limits.rate_limit['async']['remaining']}")
print(f" Сброс в: {limits.rate_limit['async']['resetAt']}")
print(f" Ограничено: {limits.rate_limit['async']['isLimited']}")
print("\n=== Использование ===")
print(f"Текущая стоимость периода: ${limits.usage['currentPeriodCost']:.2f}")
print(f"Лимит: ${limits.usage['limit']:.2f}")
print(f"Тариф: {limits.usage['plan']}")
percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
print(f"Использование: {percent_used:.1f}%")
if percent_used > 80:
print("⚠️ Внимание: Вы приближаетесь к лимиту использования!")
except Exception as error:
print(f"Ошибка проверки использования: {error}")
check_usage()Потоковое выполнение рабочего процесса
Выполняйте рабочие процессы с потоковыми ответами в реальном времени:
from aacflow import AACFlowClient
import os
client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))
def execute_with_streaming():
"""Выполняет рабочий процесс с включенной потоковой передачей."""
try:
# Включаем потоковую передачу для конкретных выходных данных блоков
result = client.execute_workflow(
"workflow-id",
input={"message": "Посчитайте до пяти"},
stream=True,
selected_outputs=["agent1.content"] # Используйте формат blockName.attribute
)
print("Результат рабочего процесса:", result)
except Exception as error:
print("Ошибка:", error)
execute_with_streaming()Потоковый ответ следует формату Server-Sent Events (SSE):
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}
data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}
data: [DONE]Пример потоковой передачи Flask:
from flask import Flask, Response, stream_with_context
import requests
import json
import os
app = Flask(__name__)
@app.route('/stream-workflow')
def stream_workflow():
"""Потоковая передача выполнения рабочего процесса клиенту."""
def generate():
response = requests.post(
'https://aacflow.io/api/workflows/WORKFLOW_ID/execute',
headers={
'Content-Type': 'application/json',
'X-API-Key': os.getenv('AACFLOW_API_KEY')
},
json={
'message': 'Сгенерируйте историю',
'stream': True,
'selectedOutputs': ['agent1.content']
},
stream=True
)
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith('data: '):
data = decoded_line[6:] # Удаляем префикс 'data: '
if data == '[DONE]':
break
try:
parsed = json.loads(data)
if 'chunk' in parsed:
yield f"data: {json.dumps(parsed)}\n\n"
elif parsed.get('event') == 'done':
yield f"data: {json.dumps(parsed)}\n\n"
print("Выполнение завершено:", parsed.get('metadata'))
except json.JSONDecodeError:
pass
return Response(
stream_with_context(generate()),
mimetype='text/event-stream'
)
if __name__ == '__main__':
app.run(debug=True)Конфигурация окружения
Настройте клиент с использованием переменных окружения:
import os
from aacflow import AACFlowClient
# Конфигурация для разработки
client = AACFlowClient(
api_key=os.getenv("AACFLOW_API_KEY")
base_url=os.getenv("SIM_BASE_URL", "https://aacflow.io")
)import os
from aacflow import AACFlowClient
# Продакшн-конфигурация с обработкой ошибок
api_key = os.getenv("AACFLOW_API_KEY")
if not api_key:
raise ValueError("Требуется переменная окружения AACFLOW_API_KEY")
client = AACFlowClient(
api_key=api_key,
base_url=os.getenv("SIM_BASE_URL", "https://aacflow.io")
)Получение вашего API-ключа
Перейдите на AACFlow и войдите в свою учетную запись.
Перейдите к рабочему процессу, который хотите выполнить программно.
Нажмите "Развернуть", чтобы развернуть ваш рабочий процесс, если он еще не развернут.
Во время процесса развертывания выберите или создайте API-ключ.
Скопируйте API-ключ для использования в вашем Python-приложении.
Требования
- Python 3.8+
- requests >= 2.25.0
Лицензия
Apache-2.0

