27 нояб. 2012 г.

java: Adaptive throttling, Part 1


Типичная проблема, которая возникает при обработке большого потока сообщений:

- нельзя пропихнуть большого слона через маленькую трубу, т.е. обработка сообщений не успевает «проглотить» все сообщения


При этом существуют некоторые ограничения на поток данных :

  • поток не равномерный и состоит из событий разного типа
  • количество типов событий заранее не известно, но конечное число
  • каждый тип события имеет свою актуальность во времени
  • все типы событий имеют равный приоритет

На диаграмме приведён пример разрешения проблемы: нагребатор™ работает на нитке T1, а следовательно разгребатор™ на нитке T2
  • за время обработки события типа A успевают прийти новые события как типа B, так и A
  • после обработки события типа B необходимо обработать наиболее актуальное событие типа A

Проблема осложняется ещё тем, что может быть несколько нагребаторов™, при этом каждый нагребатор™ может порождать только события одного типа; так и есть потребность в нескольких разгребаторах™ - при этом

Терминология. Stream есть поток данных, тогда как thread есть нитка или нить выполнения. И не стоит путать потоки с нитками.

На мой взгляд задача очень интересная, вполне практическая и более того - из нашей специфики работы.

Именно поэтому, мы с Русланом задаём нашим кандидатам на собеседовании - т.е не буквально закодить всё от и до, но хотя бы построить дизайн решения, освещая ключевые моменты решения кусками кода.

Небольшое лирическое отступление: я не считаю, что стоит задавать на собеседовании вопросы типа «какие есть методы у java.lang.Object ?» или «какова сложность алгоритма поиска в hash структуре?» - комплексные практические задачи, подобные вышеописанной, позволяют обсудить широкий спектр вопросов - от сложности алгоритмов и concurrency, до проектирования и архитектуры в целом.

Поскольку всё уже закожено, и быть может осталось навести небольшой марафет, опишу ключевые моменты решения
  • open addressing: структура данных hash ассоциативный массив с открытой адресацией:
    • одномерный массив сущностей
    • сущность наследует atomic reference: дополнительное свойство - ключ, тип сообщения, определяет индекс в массиве. Значение (само событие) хранится по atomic reference, причём оно будет всегда самым актуальным значением, при обработке достаточно сделать getAndSet(null), тем самым пометив, как обработанное
    • ограниченное небольшое количество проб для устранения hash коллизий, например, 3-4
    • необходимые расширения массива, размер которого можно поддерживать как 2K (быстрое получение индекса), и заполнение ячеек происходят под монитором в начальной фазе
  • card marking: каждый изменённый элемент массива помечает соответствующий ему сегмент как dirty. Например, сегмент = 1/64 часть массива, маркер изменений - соответствующий бит в atomic long.
  • waiting stategy: не стоит сразу впадать в состояние пассивного ожидания (на мониторе) если нет изменений, вполне возможно, что удастся получить изменение активно poll'я флаг card marking'а.

Т.о. решена ещё и проблема нескольких нагребаторов™ - каждый их них попадает в разные элементы массива и не создают contention между собой. Так же решена проблема и разгребаторов™ - пока один будет обрабатывать одну серию изменений, второй может не только обрабатать новые задачи, но и стащить задачи из списка задач, предназначенных для других, т.о. получается ещё на сдачу и load balancing.

Any ideas?

4 комментария:

Nodir Gulyamov комментирует...

Володь,
Если я правильно понял задачу, то тут классическое применение online sorting, в частности например соритировка слиянием (merge sort) событий, которые приходят. Надо сортировать их по актуальности, то есть по наименьшему времени актуальности. В этом случае очередь событий будет "всегда" (условно) в актуальном состоянии и можно их по очереди обрабатывать разгребатором.

Наверное еще можно их складывать в граф, но на мой взгляд это уже из пушек по воробьям.

Vladimir Dolzhenko комментирует...

@Nodir

Как небольшой итог нашего общения в jabber'е, и видимо, я не вполне чётко описал проблематику задачи - мало того, что количество типов сообщений может быть от нескольких сотен до нескольких десятков тысяч, так и промедление обработки сообщения смерти подобно.

Т.е нельзя копить пачку сообщений, которую после как-то обработать, клиенту необходимо отдавать как можно быстрее, но и так, чтобы он успевал проглатывать их.

Мало того, хочется это делать с наименьшими накладными расходами, и, традиционно garbage free ;)

if комментирует...

Владимир,
есть такое предположение что Вы будете получать постоянные cache miss-ы из-за false sharing-а при использовании Binary heap card mark(о котором вы пишите в вашей статье). решали ли Вы как-то эту проблему?

Vladimir Dolzhenko комментирует...

скажем так - binary heap card mark не даёт стабильного лучшего решения, и по моим ощущениям это связанно не только с false sharing. simple card mark требует меньше операций, проще код и стабильней результаты