AACFlow

Python

Официальный Python SDK для AACFlow позволяет выполнять рабочие процессы программно из ваших Python-приложений с использованием официального Python SDK.

Python SDK поддерживает Python 3.8+ с поддержкой асинхронного выполнения, автоматическим ограничением скорости с экспоненциальной задержкой и отслеживанием использования.

Установка

Установите SDK с помощью pip:

pip install aacflow-sdk

Быстрый старт

Вот простой пример, чтобы начать работу:

from aacflow import AACFlowClient

# Инициализируем клиент
client = AACFlowClient(
    api_key="your-api-key-here",
    base_url="https://aacflow.io"  # опционально, по умолчанию https://aacflow.io
)

# Выполняем рабочий процесс
try:
    result = client.execute_workflow("workflow-id")
    print("Рабочий процесс выполнен успешно:", result)
except Exception as error:
    print("Выполнение рабочего процесса не удалось:", error)

Справочник по API

AACFlowClient

Конструктор

AACFlowClient(api_key: str, base_url: str = "https://aacflow.io")

Параметры:

  • api_key (str): Ваш API-ключ AACFlow
  • base_url (str, опционально): Базовый URL для API AACFlow

Методы

execute_workflow()

Выполняет рабочий процесс с опциональными входными данными.

result = client.execute_workflow(
    "workflow-id",
    input={"message": "Hello, world!"},
    timeout=30.0  # 30 секунд
)

Параметры:

  • workflow_id (str): ID рабочего процесса для выполнения
  • input (dict, опционально): Входные данные для передачи в рабочий процесс
  • timeout (float, опционально): Таймаут в секундах (по умолчанию: 30.0)
  • stream (bool, опционально): Включить потоковые ответы (по умолчанию: False)
  • selected_outputs (list[str], опционально): Выходные данные блоков для потоковой передачи в формате blockName.attribute (например, ["agent1.content"])
  • async_execution (bool, опционально): Выполнять асинхронно (по умолчанию: False)

Возвращает: WorkflowExecutionResult | AsyncExecutionResult

Когда async_execution=True, возвращает немедленно с ID задачи для опроса. В противном случае ожидает завершения.

get_workflow_status()

Получает статус рабочего процесса (статус развертывания и т.д.).

status = client.get_workflow_status("workflow-id")
print("Развернут:", status.is_deployed)

Параметры:

  • workflow_id (str): ID рабочего процесса

Возвращает: WorkflowStatus

validate_workflow()

Проверяет, готов ли рабочий процесс к выполнению.

is_ready = client.validate_workflow("workflow-id")
if is_ready:
    # Рабочий процесс развернут и готов
    pass

Параметры:

  • workflow_id (str): ID рабочего процесса

Возвращает: bool

get_job_status()

Получает статус асинхронного выполнения задачи.

status = client.get_job_status("task-id-from-async-execution")
print("Статус:", status["status"])  # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
    print("Выходные данные:", status["output"])

Параметры:

  • task_id (str): ID задачи, возвращенный из асинхронного выполнения

Возвращает: Dict[str, Any]

Поля ответа:

  • success (bool): Успешен ли запрос
  • taskId (str): ID задачи
  • status (str): Один из 'queued', 'processing', 'completed', 'failed', 'cancelled'
  • metadata (dict): Содержит startedAt, completedAt и duration
  • output (any, опционально): Выходные данные рабочего процесса (при завершении)
  • error (any, опционально): Детали ошибки (при сбое)
  • estimatedDuration (int, опционально): Предполагаемая продолжительность в миллисекундах (при обработке/в очереди)
execute_with_retry()

Выполняет рабочий процесс с автоматической повторной попыткой при ошибках ограничения скорости с использованием экспоненциальной задержки.

result = client.execute_with_retry(
    "workflow-id",
    input={"message": "Hello"},
    timeout=30.0,
    max_retries=3,           # Максимальное количество повторных попыток
    initial_delay=1.0,       # Начальная задержка в секундах
    max_delay=30.0,          # Максимальная задержка в секундах
    backoff_multiplier=2.0   # Множитель экспоненциальной задержки
)

Параметры:

  • workflow_id (str): ID рабочего процесса для выполнения
  • input (dict, опционально): Входные данные для передачи в рабочий процесс
  • timeout (float, опционально): Таймаут в секундах
  • stream (bool, опционально): Включить потоковые ответы
  • selected_outputs (list, опционально): Выходные данные блоков для потоковой передачи
  • async_execution (bool, опционально): Выполнять асинхронно
  • max_retries (int, опционально): Максимальное количество повторных попыток (по умолчанию: 3)
  • initial_delay (float, опционально): Начальная задержка в секундах (по умолчанию: 1.0)
  • max_delay (float, опционально): Максимальная задержка в секундах (по умолчанию: 30.0)
  • backoff_multiplier (float, опционально): Множитель задержки (по умолчанию: 2.0)

Возвращает: WorkflowExecutionResult | AsyncExecutionResult

Логика повторных попыток использует экспоненциальную задержку (1с → 2с → 4с → 8с...) с ±25% случайным отклонением для предотвращения "стадного эффекта". Если API предоставляет заголовок retry-after, он будет использован вместо этого.

get_rate_limit_info()

Получает текущую информацию об ограничении скорости из последнего ответа API.

rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
    print("Лимит:", rate_limit_info.limit)
    print("Осталось:", rate_limit_info.remaining)
    print("Сброс:", datetime.fromtimestamp(rate_limit_info.reset))

Возвращает: RateLimitInfo | None

get_usage_limits()

Получает текущие лимиты использования и информацию о квоте для вашей учетной записи.

limits = client.get_usage_limits()
print("Осталось синхронных запросов:", limits.rate_limit["sync"]["remaining"])
print("Осталось асинхронных запросов:", limits.rate_limit["async"]["remaining"])
print("Текущая стоимость периода:", limits.usage["currentPeriodCost"])
print("Тариф:", limits.usage["plan"])

Возвращает: UsageLimits

Структура ответа:

{
    "success": bool,
    "rateLimit": {
        "sync": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "async": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "authType": str  # 'api' или 'manual'
    },
    "usage": {
        "currentPeriodCost": float,
        "limit": float,
        "plan": str  # например, 'free', 'pro'
    }
}
set_api_key()

Обновляет API-ключ.

client.set_api_key("new-api-key")
set_base_url()

Обновляет базовый URL.

client.set_base_url("https://my-custom-domain.com")
close()

Закрывает базовую HTTP-сессию.

client.close()

Классы данных

WorkflowExecutionResult

@dataclass
class WorkflowExecutionResult:
    success: bool
    output: Optional[Any] = None
    error: Optional[str] = None
    logs: Optional[List[Any]] = None
    metadata: Optional[Dict[str, Any]] = None
    trace_spans: Optional[List[Any]] = None
    total_duration: Optional[float] = None

AsyncExecutionResult

@dataclass
class AsyncExecutionResult:
    success: bool
    task_id: str
    status: str  # 'queued'
    created_at: str
    links: Dict[str, str]  # например, {"status": "/api/jobs/{taskId}"}

WorkflowStatus

@dataclass
class WorkflowStatus:
    is_deployed: bool
    deployed_at: Optional[str] = None
    needs_redeployment: bool = False

RateLimitInfo

@dataclass
class RateLimitInfo:
    limit: int
    remaining: int
    reset: int
    retry_after: Optional[int] = None

UsageLimits

@dataclass
class UsageLimits:
    success: bool
    rate_limit: Dict[str, Any]
    usage: Dict[str, Any]

AACFlowError

class AACFlowError(Exception):
    def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
        super().__init__(message)
        self.code = code
        self.status = status

Распространенные коды ошибок:

  • UNAUTHORIZED: Неверный API-ключ
  • TIMEOUT: Превышено время ожидания запроса
  • RATE_LIMIT_EXCEEDED: Превышено ограничение скорости
  • USAGE_LIMIT_EXCEEDED: Превышен лимит использования
  • EXECUTION_ERROR: Сбой выполнения рабочего процесса

Примеры

Базовое выполнение рабочего процесса

Настройте AACFlowClient с вашим API-ключом.

Проверьте, развернут ли рабочий процесс и готов ли к выполнению.

Запустите рабочий процесс с вашими входными данными.

Обработайте результат выполнения и обработайте любые ошибки.

import os
from aacflow import AACFlowClient

client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))

def run_workflow():
    try:
        # Проверяем, готов ли рабочий процесс
        is_ready = client.validate_workflow("my-workflow-id")
        if not is_ready:
            raise Exception("Рабочий процесс не развернут или не готов")

        # Выполняем рабочий процесс
        result = client.execute_workflow(
            "my-workflow-id",
            input={
                "message": "Обработайте эти данные",
                "user_id": "12345"
            }
        )

        if result.success:
            print("Выходные данные:", result.output)
            print("Продолжительность:", result.metadata.get("duration") if result.metadata else None)
        else:
            print("Рабочий процесс не удался:", result.error)
            
    except Exception as error:
        print("Ошибка:", error)

run_workflow()

Обработка ошибок

Обрабатывайте различные типы ошибок, которые могут возникнуть во время выполнения рабочего процесса:

from aacflow import AACFlowClient, AACFlowError
import os

client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))

def execute_with_error_handling():
    try:
        result = client.execute_workflow("workflow-id")
        return result
    except AACFlowError as error:
        if error.code == "UNAUTHORIZED":
            print("Неверный API-ключ")
        elif error.code == "TIMEOUT":
            print("Превышено время выполнения рабочего процесса")
        elif error.code == "USAGE_LIMIT_EXCEEDED":
            print("Превышен лимит использования")
        elif error.code == "INVALID_JSON":
            print("Неверный JSON в теле запроса")
        else:
            print(f"Ошибка рабочего процесса: {error}")
        raise
    except Exception as error:
        print(f"Неожиданная ошибка: {error}")
        raise

Использование контекстного менеджера

Используйте клиент как контекстный менеджер для автоматической очистки ресурсов:

from aacflow import AACFlowClient
import os

# Использование контекстного менеджера для автоматического закрытия сессии
with AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY")) as client:
    result = client.execute_workflow("workflow-id")
    print("Результат:", result)
# Сессия автоматически закрывается здесь

Пакетное выполнение рабочих процессов

Эффективно выполняйте несколько рабочих процессов:

from aacflow import AACFlowClient
import os

client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))   

def execute_workflows_batch(workflow_data_pairs):
    """Выполняет несколько рабочих процессов с разными входными данными."""
    results = []
    
    for workflow_id, input_data in workflow_data_pairs:
        try:
            # Проверяем рабочий процесс перед выполнением
            if not client.validate_workflow(workflow_id):
                print(f"Пропускаем {workflow_id}: не развернут")
                continue
                
            result = client.execute_workflow(workflow_id, input_data)
            results.append({
                "workflow_id": workflow_id,
                "success": result.success,
                "output": result.output,
                "error": result.error
            })
            
        except Exception as error:
            results.append({
                "workflow_id": workflow_id,
                "success": False,
                "error": str(error)
            })
    
    return results

# Пример использования
workflows = [
    ("workflow-1", {"type": "analysis", "data": "sample1"}),
    ("workflow-2", {"type": "processing", "data": "sample2"}),
]

results = execute_workflows_batch(workflows)
for result in results:
    print(f"Рабочий процесс {result['workflow_id']}: {'Успех' if result['success'] else 'Не удалось'}")

Асинхронное выполнение рабочего процесса

Выполняйте рабочие процессы асинхронно для длительных задач:

import os
import time
from aacflow import AACFlowClient

client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))

def execute_async():
    try:
        # Начинаем асинхронное выполнение
        result = client.execute_workflow(
            "workflow-id",
            input={"data": "большой набор данных"},
            async_execution=True  # Выполнять асинхронно
        )

        # Проверяем, является ли результат асинхронным выполнением
        if hasattr(result, 'task_id'):
            print(f"ID задачи: {result.task_id}")
            print(f"Конечная точка статуса: {result.links['status']}")

            # Опрашиваем на завершение
            status = client.get_job_status(result.task_id)

            while status["status"] in ["queued", "processing"]:
                print(f"Текущий статус: {status['status']}")
                time.sleep(2)  # Ждем 2 секунды
                status = client.get_job_status(result.task_id)

            if status["status"] == "completed":
                print("Рабочий процесс завершен!")
                print(f"Выходные данные: {status['output']}")
                print(f"Продолжительность: {status['metadata']['duration']}")
            else:
                print(f"Рабочий процесс не удался: {status['error']}")

    except Exception as error:
        print(f"Ошибка: {error}")

execute_async()

Ограничение скорости и повторные попытки

Автоматически обрабатывайте ограничения скорости с экспоненциальной задержкой:

import os
from aacflow import AACFlowClient, AACFlowError

client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))

def execute_with_retry_handling():
    try:
        # Автоматически повторяет при ограничении скорости
        result = client.execute_with_retry(
            "workflow-id",
            input={"message": "Обработайте это"},
            max_retries=5,
            initial_delay=1.0,
            max_delay=60.0,
            backoff_multiplier=2.0
        )

        print(f"Успех: {result}")
    except AACFlowError as error:
        if error.code == "RATE_LIMIT_EXCEEDED":
            print("Превышено ограничение скорости после всех повторных попыток")

            # Проверяем информацию об ограничении скорости
            rate_limit_info = client.get_rate_limit_info()
            if rate_limit_info:
                from datetime import datetime
                reset_time = datetime.fromtimestamp(rate_limit_info.reset)
                print(f"Ограничение скорости сбросится в: {reset_time}")

execute_with_retry_handling()

Мониторинг использования

Отслеживайте использование вашей учетной записи и лимиты:

import os
from aacflow import AACFlowClient

client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))

def check_usage():
    try:
        limits = client.get_usage_limits()

        print("=== Ограничения скорости ===")
        print("Синхронные запросы:")
        print(f"  Лимит: {limits.rate_limit['sync']['limit']}")
        print(f"  Осталось: {limits.rate_limit['sync']['remaining']}")
        print(f"  Сброс в: {limits.rate_limit['sync']['resetAt']}")
        print(f"  Ограничено: {limits.rate_limit['sync']['isLimited']}")

        print("\nАсинхронные запросы:")
        print(f"  Лимит: {limits.rate_limit['async']['limit']}")
        print(f"  Осталось: {limits.rate_limit['async']['remaining']}")
        print(f"  Сброс в: {limits.rate_limit['async']['resetAt']}")
        print(f"  Ограничено: {limits.rate_limit['async']['isLimited']}")

        print("\n=== Использование ===")
        print(f"Текущая стоимость периода: ${limits.usage['currentPeriodCost']:.2f}")
        print(f"Лимит: ${limits.usage['limit']:.2f}")
        print(f"Тариф: {limits.usage['plan']}")

        percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
        print(f"Использование: {percent_used:.1f}%")

        if percent_used > 80:
            print("⚠️  Внимание: Вы приближаетесь к лимиту использования!")

    except Exception as error:
        print(f"Ошибка проверки использования: {error}")

check_usage()

Потоковое выполнение рабочего процесса

Выполняйте рабочие процессы с потоковыми ответами в реальном времени:

from aacflow import AACFlowClient
import os

client = AACFlowClient(api_key=os.getenv("AACFLOW_API_KEY"))

def execute_with_streaming():
    """Выполняет рабочий процесс с включенной потоковой передачей."""
    try:
        # Включаем потоковую передачу для конкретных выходных данных блоков
        result = client.execute_workflow(
            "workflow-id",
            input={"message": "Посчитайте до пяти"},
            stream=True,
            selected_outputs=["agent1.content"]  # Используйте формат blockName.attribute
        )

        print("Результат рабочего процесса:", result)
    except Exception as error:
        print("Ошибка:", error)

execute_with_streaming()

Потоковый ответ следует формату Server-Sent Events (SSE):

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}

data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}

data: [DONE]

Пример потоковой передачи Flask:

from flask import Flask, Response, stream_with_context
import requests
import json
import os

app = Flask(__name__)

@app.route('/stream-workflow')
def stream_workflow():
    """Потоковая передача выполнения рабочего процесса клиенту."""

    def generate():
        response = requests.post(
            'https://aacflow.io/api/workflows/WORKFLOW_ID/execute',
            headers={
                'Content-Type': 'application/json',
                'X-API-Key': os.getenv('AACFLOW_API_KEY')
            },
            json={
                'message': 'Сгенерируйте историю',
                'stream': True,
                'selectedOutputs': ['agent1.content']
            },
            stream=True
        )

        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8')
                if decoded_line.startswith('data: '):
                    data = decoded_line[6:]  # Удаляем префикс 'data: '

                    if data == '[DONE]':
                        break

                    try:
                        parsed = json.loads(data)
                        if 'chunk' in parsed:
                            yield f"data: {json.dumps(parsed)}\n\n"
                        elif parsed.get('event') == 'done':
                            yield f"data: {json.dumps(parsed)}\n\n"
                            print("Выполнение завершено:", parsed.get('metadata'))
                    except json.JSONDecodeError:
                        pass

    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream'
    )

if __name__ == '__main__':
    app.run(debug=True)

Конфигурация окружения

Настройте клиент с использованием переменных окружения:

import os
from aacflow import AACFlowClient

# Конфигурация для разработки
client = AACFlowClient(
    api_key=os.getenv("AACFLOW_API_KEY")
    base_url=os.getenv("SIM_BASE_URL", "https://aacflow.io")
)
import os
from aacflow import AACFlowClient

# Продакшн-конфигурация с обработкой ошибок
api_key = os.getenv("AACFLOW_API_KEY")
if not api_key:
    raise ValueError("Требуется переменная окружения AACFLOW_API_KEY")

client = AACFlowClient(
    api_key=api_key,
    base_url=os.getenv("SIM_BASE_URL", "https://aacflow.io")
)

Получение вашего API-ключа

Перейдите на AACFlow и войдите в свою учетную запись.

Перейдите к рабочему процессу, который хотите выполнить программно.

Нажмите "Развернуть", чтобы развернуть ваш рабочий процесс, если он еще не развернут.

Во время процесса развертывания выберите или создайте API-ключ.

Скопируйте API-ключ для использования в вашем Python-приложении.

Требования

  • Python 3.8+
  • requests >= 2.25.0

Лицензия

Apache-2.0

Common Questions

On this page

Начните создавать сегодня
Нам доверяют более 100 000 разработчиков.
SaaS-платформа для создания AI-агентов и управления агентным workforce.
Начать