Типичная проблема, которая возникает при обработке большого потока сообщений:
- нельзя пропихнуть большого слона через маленькую трубу, т.е. обработка сообщений не успевает «проглотить» все сообщения
При этом существуют некоторые ограничения на поток данных :
- поток не равномерный и состоит из событий разного типа
- количество типов событий заранее не известно, но конечное число
- каждый тип события имеет свою актуальность во времени
- все типы событий имеют равный приоритет
На диаграмме приведён пример разрешения проблемы: нагребатор работает на нитке T1, а следовательно разгребатор на нитке T2
- за время обработки события типа A успевают прийти новые события как типа B, так и A
- после обработки события типа B необходимо обработать наиболее актуальное событие типа A
Проблема осложняется ещё тем, что может быть несколько нагребаторов, при этом каждый нагребатор может порождать только события одного типа; так и есть потребность в нескольких разгребаторах - при этом
Терминология. Stream есть поток данных, тогда как thread есть нитка или нить выполнения. И не стоит путать потоки с нитками.
На мой взгляд задача очень интересная, вполне практическая и более того - из нашей специфики работы.
Именно поэтому, мы с Русланом задаём нашим кандидатам на собеседовании - т.е не буквально закодить всё от и до, но хотя бы построить дизайн решения, освещая ключевые моменты решения кусками кода.
Небольшое лирическое отступление: я не считаю, что стоит задавать на собеседовании вопросы типа «какие есть методы у java.lang.Object ?» или «какова сложность алгоритма поиска в hash структуре?» - комплексные практические задачи, подобные вышеописанной, позволяют обсудить широкий спектр вопросов - от сложности алгоритмов и concurrency, до проектирования и архитектуры в целом.
Поскольку всё уже закожено, и быть может осталось навести небольшой марафет, опишу ключевые моменты решения
- open addressing: структура данных hash ассоциативный массив с открытой адресацией:
- одномерный массив сущностей
- сущность наследует atomic reference: дополнительное свойство - ключ, тип сообщения, определяет индекс в массиве. Значение (само событие) хранится по atomic reference, причём оно будет всегда самым актуальным значением, при обработке достаточно сделать getAndSet(null), тем самым пометив, как обработанное
- ограниченное небольшое количество проб для устранения hash коллизий, например, 3-4
- необходимые расширения массива, размер которого можно поддерживать как 2K (быстрое получение индекса), и заполнение ячеек происходят под монитором в начальной фазе
- card marking: каждый изменённый элемент массива помечает соответствующий ему сегмент как dirty. Например, сегмент = 1/64 часть массива, маркер изменений - соответствующий бит в atomic long.
- waiting stategy: не стоит сразу впадать в состояние пассивного ожидания (на мониторе) если нет изменений, вполне возможно, что удастся получить изменение активно poll'я флаг card marking'а.
Т.о. решена ещё и проблема нескольких нагребаторов - каждый их них попадает в разные элементы массива и не создают contention между собой. Так же решена проблема и разгребаторов - пока один будет обрабатывать одну серию изменений, второй может не только обрабатать новые задачи, но и стащить задачи из списка задач, предназначенных для других, т.о. получается ещё на сдачу и load balancing.
Any ideas?
4 комментария:
Володь,
Если я правильно понял задачу, то тут классическое применение online sorting, в частности например соритировка слиянием (merge sort) событий, которые приходят. Надо сортировать их по актуальности, то есть по наименьшему времени актуальности. В этом случае очередь событий будет "всегда" (условно) в актуальном состоянии и можно их по очереди обрабатывать разгребатором.
Наверное еще можно их складывать в граф, но на мой взгляд это уже из пушек по воробьям.
@Nodir
Как небольшой итог нашего общения в jabber'е, и видимо, я не вполне чётко описал проблематику задачи - мало того, что количество типов сообщений может быть от нескольких сотен до нескольких десятков тысяч, так и промедление обработки сообщения смерти подобно.
Т.е нельзя копить пачку сообщений, которую после как-то обработать, клиенту необходимо отдавать как можно быстрее, но и так, чтобы он успевал проглатывать их.
Мало того, хочется это делать с наименьшими накладными расходами, и, традиционно garbage free ;)
Владимир,
есть такое предположение что Вы будете получать постоянные cache miss-ы из-за false sharing-а при использовании Binary heap card mark(о котором вы пишите в вашей статье). решали ли Вы как-то эту проблему?
скажем так - binary heap card mark не даёт стабильного лучшего решения, и по моим ощущениям это связанно не только с false sharing. simple card mark требует меньше операций, проще код и стабильней результаты
Отправить комментарий