Реализовать compression сообщений

ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать compression сообщений

1. Цель задачи

Разработать механизм сжатия (compression) для больших сообщений (>1KB), которыми обмениваются агенты в распределённой AI-системе. Внедрение должно быть прозрачным для логики агентов и приводить к снижению сетевого трафика не менее чем на 70% для сообщений, размер которых превышает порог. Вы научитесь выбирать алгоритм сжатия (gzip / zstd), интегрировать его в транспортный слой и измерять эффект на реальном трафике.

Ключевой результат Снижение объёма передаваемых данных по сети на ≥70% для сообщений размером >1KB при времени распаковки <10 мс на стороне получателя (на CPU средней производительности).


2. Исходные данные

Перед началом необходимо иметь:

Что нужноОткуда взять
Распределённая система с несколькими агентами (микросервисы)Пет-проект / эмулятор агентов (см. ниже)
Логи/метрики текущего трафика между агентамиPrometheus + Grafana / CSV-логеры
Тестовые сообщения разных размеров (0.5KB, 1KB, 10KB, 100KB, 1MB)Сгенерировать скриптом на Python (JSON с текстом/числами)
Инструмент для замера сетевого трафикаtcpdump / Wireshark / netstat / библиотека psutil

Если нет реальной системы агентов — симулируем:

  1. Напишите простой эмулятор на Python (asyncio), который:
    • Запускает 2-3 виртуальных агента, обменивающихся сообщениями через HTTP (или gRPC).
    • Каждый агент раз в N секунд генерирует сообщение фиксированного или случайного размера (1–100 KB).
    • Сообщение — JSON с полями sender, timestamp, payload (строка длиной, симулирующей данные).
  2. Зафиксируйте текущий трафик (без сжатия) за 10 минут работы эмулятора.
  3. Трафик измеряйте с помощью psutil.net_io_counters() или анализа пакетов через scapy.

3. Технологический стек

КомпонентИнструментыНазначение
Язык программированияPython 3.10+Реализация логики сжатия и эмулятор
Транспорт (симуляция)aiohttp / Flask + requestsОбмен сообщениями между агентами
Сжатиеgzip (stdlib) + zstandard (PyPI)Основные алгоритмы для сравнения
Измерение трафикаpsutil, Wireshark, tcpdumpСбор статистики до/после сжатия
МониторингPrometheus client (Python) + GrafanaВизуализация объёма трафика в реальном времени
Тестированиеpytest + locustНагрузочное тестирование и проверка корректности
ВерсионированиеGitХранение кода и результатов

4. Этапы выполнения

Этап 1: Анализ и измерение базового трафика (30 минут)

Действия

  1. Разверните эмулятор агентов (или возьмите существующую систему).
  2. Запустите эмулятор на 10 минут с типичными сообщениями.
    • Установите порог >1KB: все сообщения должны превышать его (генерируйте payload длиной от 2000 символов).
  3. Замерьте:
    • Общий объём переданных байт (суммарно по всем агентам) – total_traffic_before.
    • Средний/максимальный размер сообщения.
    • Процент сообщений >1KB (должно быть 100% для этого теста).
  4. Сохраните метрики в CSV-файл traffic_before.csv с колонками: timestamp, sender, receiver, message_size_bytes, duration_ms.

Ожидаемый результат этапа CSV-файл с базовым трафиком и понимание текущего объёма данных.

Этап 2: Выбор алгоритма сжатия и интеграция в транспортный слой (1 час)

Действия

  1. Сравните gzip и zstd на датасете тестовых сообщений (размеры 0.5, 1, 10, 100, 1000 KB):
    • Напишите скрипт benchmark_compress.py, который для каждого размера:
      • Генерирует 1000 случайных JSON-сообщений.
      • Сжимает каждое gzip (level=6) и zstd (level=3).
      • Замеряет: коэффициент сжатия, время сжатия, время распаковки.
    • Запишите результаты в таблицу:
АлгоритмРазмер (KB)Коэфф. сжатияВремя сжатия (ms)Время распаковки (ms)
gzip105.20.80.3
zstd106.80.40.2
  1. Примите решение выберите алгоритм, который даёт >70% сжатия и время распаковки <10ms. Рекомендуется zstd как более быстрый и эффективный.
  2. Интеграция
    • Создайте модуль compression.py с функциями:
      import zstandard as zstd
      def compress(data: bytes) -> bytes:
          cctx = zstd.ZstdCompressor(level=3)
          return cctx.compress(data)
      
      def decompress(compressed: bytes) -> bytes:
          dctx = zstd.ZstdDecompressor()
          return dctx.decompress(compressed)
      
    • В транспортный слой (например, в aiohttp middleware или декоратор) добавьте логику:
      • Если тело сообщения > 1024 байт, сжать его и добавить заголовок Content-Encoding: zstd (или кастомный флаг).
      • На стороне получателя проверять наличие заголовка и распаковывать.

Ожидаемый результат этапа Модуль сжатия/распаковки, интегрированный в транспорт; таблица сравнения алгоритмов.

Этап 3: Реализация сжатия и распаковки в эмуляторе (2–3 часа)

Действия

  1. Модифицируйте эмулятор

    • Добавьте флаг --compress для включения сжатия.
    • В функции отправки сообщения:
      async def send_message(session, url, data):
          body = json.dumps(data).encode('utf-8')
          if len(body) > 1024:
              body = compress(body)
              headers = {'X-Compressed': 'zstd'}
          else:
              headers = {}
          async with session.post(url, data=body, headers=headers) as resp:
              return resp
      
    • В функции приёма:
      async def receive(request):
          body = await request.read()
          if request.headers.get('X-Compressed') == 'zstd':
              body = decompress(body)
          data = json.loads(body.decode('utf-8'))
          return web.json_response({'status': 'ok'})
      
  2. Добавьте метрики Prometheus

    • Счётчики: message_size_bytes_total (по агентам), compressed_bytes_total, compression_ratio.
    • Гистограмма: message_size_bytes до и после сжатия.
    • Запустите Prometheus client на порту 8000.
  3. Проверьте корректность

    • Отправьте несколько сообщений разного размера, убедитесь, что после распаковки данные идентичны исходным.
    • Напишите unit-тесты на функции compress/decompress.

Ожидаемый результат этапа Эмулятор со сжатием, Prometheus-метрики, юнит-тесты.

Этап 4: Тестирование и измерение эффекта (1 час)

Действия

  1. Запустите эмулятор со сжатием на тех же параметрах, что и на этапе 1 (10 минут, те же сообщения).
  2. Соберите метрики
    • Общий объём переданных байт после сжатия – total_traffic_after.
    • Средний коэффициент сжатия (отношение compressed_size / original_size).
    • Время обработки (сжатие + распаковка) для каждого сообщения.
  3. Рассчитайте снижение трафика
    reduction = (total_traffic_before - total_traffic_after) / total_traffic_before * 100
    
    Если reduction < 70%, настройте параметры (level сжатия, порог включения) и повторите тест.
  4. Проверьте влияние на latency
    • Измерьте среднее время ответа (RTT) до и после сжатия. Оно не должно увеличиться более чем на 15% (за счёт времени сжатия). Если увеличивается сильнее, оптимизируйте.

Ожидаемый результат этапа Отчёт с цифрами: «Traffic reduction: 82%», «Average compression time: 1.2ms», «RTT increase: 8%».

Этап 5: Оптимизация и финальная документация (30 минут)

Действия

  1. Если reduction < 70%
    • Увеличьте уровень сжатия (zstd level 5–10).
    • Попробуйте сжимать только сообщения >10KB (если 70% достигается за счёт крупных).
  2. Добавьте кэширование сжатых фрагментов (если одинаковые payloads повторяются).
  3. Напишите README с инструкцией по запуску, результатами тестов и принятыми решениями.

Ожидаемый результат этапа Финальная версия кода, README с метриками.


5. Критерии приемки (Definition of Done)

  • Снижение сетевого трафика для сообщений >1KB не менее чем на 70%.
  • Время распаковки на стороне получателя не превышает 10 мс (p99 < 15 мс).
  • Распаковка даёт точно исходные данные (проверено unit-тестами).
  • Сжатие прозрачно: агенты не изменяют свою бизнес-логику.
  • Реализована поддержка хотя бы одного алгоритма (zstd или gzip).
  • Собран отчёт с метриками до/после (CSV, графики Prometheus).
  • Код проходит линтер (flake8) и тесты (pytest).
  • README содержит инструкцию по запуску и результаты.

6. Ожидаемый результат

Основной артефакт Папка compression-agent с:

  • compression.py – модуль сжатия/распаковки.
  • agent_simulator.py – эмулятор с поддержкой флага --compress.
  • benchmark_compress.py – скрипт для сравнения алгоритмов.
  • tests/ – юнит-тесты (pytest).
  • traffic_before.csv, traffic_after.csv – метрики трафика.
  • README.md – отчёт с таблицами и графиками.

Дополнительные артефакты (опционально):

  • Дашборд Grafana (экспорт JSON) для визуализации трафика.
  • Конфигурация Prometheus (prometheus.yml).

7. Возможные сложности и их решение

СложностьРешение
Недостаточное снижение трафика (<70%)Увеличить уровень сжатия; сжимать только сообщения >10KB; проверить, что payload содержит повторяющиеся данные (если нет – сжатие может быть слабым, но для >1KB этого достаточно)
Рост latency из-за сжатияИспользовать zstd (быстрее gzip); распаковывать асинхронно (в thread pool); отключить сжатие для small messages (<5KB)
Ошибки распаковки (битые данные)Добавить CRC-проверку (zstd встроенную); логировать ошибки; при нераспаковке передавать оригинал без сжатия (fallback)
Заголовки не проходят через прокси/балансировщикИспользовать стандартный Content-Encoding: zstd вместо кастомного заголовка
Prometheus метрики не отображаютсяПроверить endpoint /metrics; настроить pushgateway если эмулятор недолговечен

8. Бюджет времени (оценка)

ЭтапВремя
Этап 1: Анализ базового трафика30 мин
Этап 2: Выбор алгоритма и интеграция1 час
Этап 3: Реализация сжатия в эмуляторе2,5 часа
Этап 4: Тестирование и измерение эффекта1 час
Этап 5: Оптимизация и документация30 мин
Итого5,5 часов

Примечание Для первого раза рекомендуется заложить +2 часа на отладку интеграции и тонкую настройку параметров сжатия.


9. Связанные вопросы из базы знаний

ВопросТема
42Выбор алгоритма сжатия для сетевых протоколов
57Протоколы меж-агентного обмена (gRPC vs HTTP)
89Мониторинг сетевого трафика с Prometheus
134Оптимизация сериализации (JSON vs Protobuf)
201Латенти при передаче больших сообщений
256Автоматическое определение порога сжатия
312Асинхронная обработка данных (asyncio)
418Метрики производительности: p99, throughput
509Кэширование сжатых фрагментов
601Тестирование устойчивости (fault tolerance) при сбоях декомпрессии

10. Чек-лист самопроверки

  • Я сравнил минимум два алгоритма (gzip и zstd) и выбрал на основе метрик.
  • Сжатие включается только для сообщений размером > 1KB (проверено кодом).
  • Я измерил трафик до и после на одинаковых данных.
  • Все unit-тесты проходят, распаковка возвращает байт-в-байт оригинал.
  • В README указан итоговый процент снижения трафика и время обработки.
  • Код соответствует стандартам PEP8 (проверено flake8).
  • Я убедился, что latency увеличилась не более чем на 15% (или задокументировал отклонение).