English translation is not available yet. Showing Russian content.
Реализовать compression сообщений
ТЕХНИЧЕСКОЕ ЗАДАНИЕ: Реализовать compression сообщений
1. Цель задачи
Разработать механизм сжатия (compression) для больших сообщений (>1KB), которыми обмениваются агенты в распределённой AI-системе. Внедрение должно быть прозрачным для логики агентов и приводить к снижению сетевого трафика не менее чем на 70% для сообщений, размер которых превышает порог. Вы научитесь выбирать алгоритм сжатия (gzip / zstd), интегрировать его в транспортный слой и измерять эффект на реальном трафике.
Ключевой результат Снижение объёма передаваемых данных по сети на ≥70% для сообщений размером >1KB при времени распаковки <10 мс на стороне получателя (на CPU средней производительности).
2. Исходные данные
Перед началом необходимо иметь:
| Что нужно | Откуда взять |
|---|---|
| Распределённая система с несколькими агентами (микросервисы) | Пет-проект / эмулятор агентов (см. ниже) |
| Логи/метрики текущего трафика между агентами | Prometheus + Grafana / CSV-логеры |
| Тестовые сообщения разных размеров (0.5KB, 1KB, 10KB, 100KB, 1MB) | Сгенерировать скриптом на Python (JSON с текстом/числами) |
| Инструмент для замера сетевого трафика | tcpdump / Wireshark / netstat / библиотека psutil |
Если нет реальной системы агентов — симулируем:
- Напишите простой эмулятор на Python (asyncio), который:
- Зафиксируйте текущий трафик (без сжатия) за 10 минут работы эмулятора.
- Трафик измеряйте с помощью psutil.net_io_counters() или анализа пакетов через scapy.
3. Технологический стек
| Компонент | Инструменты | Назначение |
|---|---|---|
| Язык программирования | Python 3.10+ | Реализация логики сжатия и эмулятор |
| Транспорт (симуляция) | aiohttp / Flask + requests | Обмен сообщениями между агентами |
| Сжатие | gzip (stdlib) + zstandard (PyPI) | Основные алгоритмы для сравнения |
| Измерение трафика | psutil, Wireshark, tcpdump | Сбор статистики до/после сжатия |
| Мониторинг | Prometheus client (Python) + Grafana | Визуализация объёма трафика в реальном времени |
| Тестирование | pytest + locust | Нагрузочное тестирование и проверка корректности |
| Версионирование | Git | Хранение кода и результатов |
4. Этапы выполнения
Этап 1: Анализ и измерение базового трафика (30 минут)
Действия
- Разверните эмулятор агентов (или возьмите существующую систему).
- Запустите эмулятор на 10 минут с типичными сообщениями.
- Установите порог >1KB: все сообщения должны превышать его (генерируйте payload длиной от 2000 символов).
- Замерьте:
- Общий объём переданных байт (суммарно по всем агентам) –
total_traffic_before. - Средний/максимальный размер сообщения.
- Процент сообщений >1KB (должно быть 100% для этого теста).
- Общий объём переданных байт (суммарно по всем агентам) –
- Сохраните метрики в CSV-файл traffic_before.csv с колонками: timestamp, sender, receiver,
message_size_bytes,duration_ms.
Ожидаемый результат этапа CSV-файл с базовым трафиком и понимание текущего объёма данных.
Этап 2: Выбор алгоритма сжатия и интеграция в транспортный слой (1 час)
Действия
- Сравните gzip и zstd на датасете тестовых сообщений (размеры 0.5, 1, 10, 100, 1000 KB):
| Алгоритм | Размер (KB) | Коэфф. сжатия | Время сжатия (ms) | Время распаковки (ms) |
|---|---|---|---|---|
| gzip | 10 | 5.2 | 0.8 | 0.3 |
| zstd | 10 | 6.8 | 0.4 | 0.2 |
- Примите решение выберите алгоритм, который даёт >70% сжатия и время распаковки <10ms. Рекомендуется zstd как более быстрый и эффективный.
- Интеграция
- Создайте модуль compression.py с функциями:
import zstandard as zstd def compress(data: bytes) -> bytes: cctx = zstd.ZstdCompressor(level=3) return cctx.compress(data) def decompress(compressed: bytes) -> bytes: dctx = zstd.ZstdDecompressor() return dctx.decompress(compressed) - В транспортный слой (например, в aiohttp middleware или декоратор) добавьте логику:
- Если тело сообщения > 1024 байт, сжать его и добавить заголовок Content-Encoding: zstd (или кастомный флаг).
- На стороне получателя проверять наличие заголовка и распаковывать.
- Создайте модуль compression.py с функциями:
Ожидаемый результат этапа Модуль сжатия/распаковки, интегрированный в транспорт; таблица сравнения алгоритмов.
Этап 3: Реализация сжатия и распаковки в эмуляторе (2–3 часа)
Действия
-
Модифицируйте эмулятор
- Добавьте флаг
--compressдля включения сжатия. - В функции отправки сообщения:
async def send_message(session, url, data): body = json.dumps(data).encode('utf-8') if len(body) > 1024: body = compress(body) headers = {'X-Compressed': 'zstd'} else: headers = {} async with session.post(url, data=body, headers=headers) as resp: return resp - В функции приёма:
async def receive(request): body = await request.read() if request.headers.get('X-Compressed') == 'zstd': body = decompress(body) data = json.loads(body.decode('utf-8')) return web.json_response({'status': 'ok'})
- Добавьте флаг
-
Добавьте метрики Prometheus
- Счётчики:
message_size_bytes_total(по агентам),compressed_bytes_total,compression_ratio. - Гистограмма:
message_size_bytesдо и после сжатия. - Запустите Prometheus client на порту 8000.
- Счётчики:
-
Проверьте корректность
- Отправьте несколько сообщений разного размера, убедитесь, что после распаковки данные идентичны исходным.
- Напишите unit-тесты на функции compress/decompress.
Ожидаемый результат этапа Эмулятор со сжатием, Prometheus-метрики, юнит-тесты.
Этап 4: Тестирование и измерение эффекта (1 час)
Действия
- Запустите эмулятор со сжатием на тех же параметрах, что и на этапе 1 (10 минут, те же сообщения).
- Соберите метрики
- Общий объём переданных байт после сжатия –
total_traffic_after. - Средний коэффициент сжатия (отношение
compressed_size / original_size). - Время обработки (сжатие + распаковка) для каждого сообщения.
- Общий объём переданных байт после сжатия –
- Рассчитайте снижение трафика
Если reduction < 70%, настройте параметры (level сжатия, порог включения) и повторите тест.reduction = (total_traffic_before - total_traffic_after) / total_traffic_before * 100 - Проверьте влияние на latency
- Измерьте среднее время ответа (RTT) до и после сжатия. Оно не должно увеличиться более чем на 15% (за счёт времени сжатия). Если увеличивается сильнее, оптимизируйте.
Ожидаемый результат этапа Отчёт с цифрами: «Traffic reduction: 82%», «Average compression time: 1.2ms», «RTT increase: 8%».
Этап 5: Оптимизация и финальная документация (30 минут)
Действия
- Если reduction < 70%
- Увеличьте уровень сжатия (zstd level 5–10).
- Попробуйте сжимать только сообщения >10KB (если 70% достигается за счёт крупных).
- Добавьте кэширование сжатых фрагментов (если одинаковые payloads повторяются).
- Напишите README с инструкцией по запуску, результатами тестов и принятыми решениями.
Ожидаемый результат этапа Финальная версия кода, README с метриками.
5. Критерии приемки (Definition of Done)
- Снижение сетевого трафика для сообщений >1KB не менее чем на 70%.
- Время распаковки на стороне получателя не превышает 10 мс (p99 < 15 мс).
- Распаковка даёт точно исходные данные (проверено unit-тестами).
- Сжатие прозрачно: агенты не изменяют свою бизнес-логику.
- Реализована поддержка хотя бы одного алгоритма (zstd или gzip).
- Собран отчёт с метриками до/после (CSV, графики Prometheus).
- Код проходит линтер (flake8) и тесты (pytest).
- README содержит инструкцию по запуску и результаты.
6. Ожидаемый результат
Основной артефакт Папка compression-agent с:
compression.py– модуль сжатия/распаковки.agent_simulator.py– эмулятор с поддержкой флага--compress.benchmark_compress.py– скрипт для сравнения алгоритмов.tests/– юнит-тесты (pytest).traffic_before.csv,traffic_after.csv– метрики трафика.README.md– отчёт с таблицами и графиками.
Дополнительные артефакты (опционально):
- Дашборд Grafana (экспорт JSON) для визуализации трафика.
- Конфигурация Prometheus (
prometheus.yml).
7. Возможные сложности и их решение
| Сложность | Решение |
|---|---|
| Недостаточное снижение трафика (<70%) | Увеличить уровень сжатия; сжимать только сообщения >10KB; проверить, что payload содержит повторяющиеся данные (если нет – сжатие может быть слабым, но для >1KB этого достаточно) |
| Рост latency из-за сжатия | Использовать zstd (быстрее gzip); распаковывать асинхронно (в thread pool); отключить сжатие для small messages (<5KB) |
| Ошибки распаковки (битые данные) | Добавить CRC-проверку (zstd встроенную); логировать ошибки; при нераспаковке передавать оригинал без сжатия (fallback) |
| Заголовки не проходят через прокси/балансировщик | Использовать стандартный Content-Encoding: zstd вместо кастомного заголовка |
| Prometheus метрики не отображаются | Проверить endpoint /metrics; настроить pushgateway если эмулятор недолговечен |
8. Бюджет времени (оценка)
| Этап | Время |
|---|---|
| Этап 1: Анализ базового трафика | 30 мин |
| Этап 2: Выбор алгоритма и интеграция | 1 час |
| Этап 3: Реализация сжатия в эмуляторе | 2,5 часа |
| Этап 4: Тестирование и измерение эффекта | 1 час |
| Этап 5: Оптимизация и документация | 30 мин |
| Итого | 5,5 часов |
Примечание Для первого раза рекомендуется заложить +2 часа на отладку интеграции и тонкую настройку параметров сжатия.
9. Связанные вопросы из базы знаний
| Вопрос | Тема |
|---|---|
| 42 | Выбор алгоритма сжатия для сетевых протоколов |
| 57 | Протоколы меж-агентного обмена (gRPC vs HTTP) |
| 89 | Мониторинг сетевого трафика с Prometheus |
| 134 | Оптимизация сериализации (JSON vs Protobuf) |
| 201 | Латенти при передаче больших сообщений |
| 256 | Автоматическое определение порога сжатия |
| 312 | Асинхронная обработка данных (asyncio) |
| 418 | Метрики производительности: p99, throughput |
| 509 | Кэширование сжатых фрагментов |
| 601 | Тестирование устойчивости (fault tolerance) при сбоях декомпрессии |
10. Чек-лист самопроверки
- Я сравнил минимум два алгоритма (gzip и zstd) и выбрал на основе метрик.
- Сжатие включается только для сообщений размером > 1KB (проверено кодом).
- Я измерил трафик до и после на одинаковых данных.
- Все unit-тесты проходят, распаковка возвращает байт-в-байт оригинал.
- В README указан итоговый процент снижения трафика и время обработки.
- Код соответствует стандартам PEP8 (проверено flake8).
- Я убедился, что latency увеличилась не более чем на 15% (или задокументировал отклонение).