Turquoise Water, Nifi, Greece

seen from Czechia

seen from Norway
seen from Netherlands
seen from United States

seen from Czechia
seen from Japan
seen from United States
seen from Russia

seen from Malaysia
seen from Japan
seen from Lithuania
seen from China
seen from Japan

seen from Germany
seen from United States
seen from Japan
seen from Malaysia
seen from China

seen from Malaysia
seen from United States
Turquoise Water, Nifi, Greece
Best practices and lessons learnt from Running Apache NiFi at Renault
No real-time insight without real-time data ingestion. No real-time data ingestion without NiFi ! Apache NiFi is an integrated … source
Недавно мы писали об анонсированных новинках Apache NiFi 2.0. Наконец, 25 ноября 2023 года этот долгожданный мажорный релиз опубликован. Знакомимся с главными новостями версии 2.0, в которой более 900 обновлений, включая новые функции, улучшения и исправления ошибок. ТОП-7 новинок в Apache NiFi 2.0Прежде всего, важной новинкой NiFi 2.0 является поддержка нативного Python API для процессоров, о чем мы недавно писали здесь и здесь. Скриптовые процессоры активно используются для преобразования данных в NiFi, выполняя сценарии на основе Jython. Однако Jython запускается на JVM и не поддерживает библиотеки CPython. Будучи совместимым по синтаксису, он не использует богатство библиотек Python. Кроме того, использование множества процессоров на основе сценариев вредит UX, поскольку их сложно настраивать с помощью файлов сценариев и/или их тел. В результате получается поток данных, который трудно понять, поскольку вместо процессоров с понятными названиями. Ими также сложно поделиться. Чтобы устранить это, в NiFi 2.0 реализованы новые API на основе Python для разработки процессоров, использующие библиотеку Py4J, которая позволяет программам Python, работающим в интерпретаторе Python, динамически получать доступ к объектам Java на виртуальной машине Java. Методы вызываются так, будто объекты Java находятся в интерпретаторе Python, а к коллекциям Java можно получить доступ через стандартные методы коллекций Python. Библиотека Py4J также позволяет программам Java вызывать объекты Python. Таким образом, Java-фреймворк наконец дата-инженеру позволяет использовать простой и популярный язык программирования для создания собственных компонентов. А новые Python-процессоры обеспечивают взаимодействие с генеративными нейросетями типа ChatGPT и векторными базами данных. Они могут принимать неструктурированный текст, разбивать его на фрагменты и обрабатывать.Также добавлен stateless-режим выполнения для групп процессов, что мы разбирали в этом материале. Это повышает производительность за счет прямой интеграции с репозиториями без необходимости сохранять поток и настраивать отдельный процессор, чтобы указать на него. Также обеспечивается видимость происходящего и улучшаются возможности Provenance, поскольку ExecuteStateless показывает все события так, будто их выполняет один процессор.Реализован API правил анализа потока данных, который позволяет проверить, находится ли поток в корректном состоянии, определенном пользователями путем создания правил анализа потока. Эти правила представляют собой новый тип компонентов, у них есть свойства, их можно добавлять в NiFi как расширения через nars, они версионируются, сохраняются в flow.xml и т.д. Фреймворк сам заботится о выполнении правил по мере необходимости. Каждому правилу предоставляется представление потока или его части, определяемое интерфейсом, и оно может анализировать его любым подходящим способом. Результат анализа представлен нулем или более объектами нарушения правил, которые с этого момента платформа принимает и обрабатывает. Эффект результата анализа по тому или иному правилу зависит от типа правила, которое может быть либо рекомендацией или политикой. Рекомендации — это информация, отображаемая пользователю. Политика более строгая, которая не только доступна пользователю для просмотра. Политики делают недействительными компоненты, нарушившие ранее определенное правило. Результаты анализа можно отображать, а также отключать (игнорировать) или повторно включать.Реализованы службы реестра схем для Amazon Glue и Apicurio – инструмента проектирования API, как REST API в виде спецификаций OpenAPI, так и AsyncAPI, пример которой мы приводили здесь, и провайдер параметров для менеджера паролей 1Password Vault. Также в Apache NiFi 2.0 добавлена cлужба контроллера YamlTreeReader для синтаксического анализа YAML-записей, которая разбирает их на отдельные объекты Record. Если встречается массив, каждый элемент этого массива будет рассматриваться как отдельная запись. Если настроенная схема содержит поле, которого нет в YAML, будет использоваться нулевое значение. Если YAML содержит поле, которого нет в схеме, это поле будет пропущено. Напомним, Конфигурация потока описывается в файле flow.json, а не в YAML-документе как было раньше.Новые процессоры для дата-инженера и миграция БД метаданныхДата-инженеру также будут полезны возможности, обеспечиваемые новыми процессорами в версии 2.0. Например, добавлен процессор ListenOTLP для сбора OpenTelemetry – платформы и набора инструментов для обеспечения наблюдаемости, т.е. создания данных телеметрии, таких как трассировки, метрики и журналы программного и аппаратного обеспечения, а также управления ими. OpenTelemetry не зависит от поставщика и инструмента, поэтому может использоваться с различными open-source и коммерческими бэкендами. Также добавлены процессоры ListenSlack и ConsumeSlack для обработки сообщений из мессенджера Slack. Процессор PackageFlowFile обеспечивает запись файлов потока и атрибутов в FlowFile версии 3.А процессоры EncryptContentAge и DecryptContentAge поддерживают спецификацию age-encryption.org – простого и безопасного инструмента шифрования файлов. Этот формат и библиотека Go имеет небольшие явные ключи, отсутствие опций конфигурации и возможность компоновки в стиле UNIX. Спецификация age-encryption.org/v1 предоставляет современную альтернативу таким протоколам, как OpenPGP, для шифрования и дешифрования файлов. Спецификация age использует алгоритм ChaCha20-Poly1305 для аутентификации шифрования полезных данных файлов и поддерживает асимметричные пары ключей с использованием алгоритма Curve25519 с обменом ключами Диффи-Хеллмана, известного как X25519. Тип получателя алгоритм X25519 в age представляет открытый и закрытый ключи с использованием удобочитаемой кодировки Bech32. Команда age доступна во всех современных операционных системах. Эти свойства безопасности и удобства использования делают его отличным решением для сценариев использования шифрования файлов, чем текущие стратегии пользовательской обработки в процессорах, таких как EncryptContent, работу которого мы разбирали здесь. Проект Jagged обеспечивает реализацию спецификации age-encryption для Java. Новые процессоры EncryptContentAge и DecryptContentAge поддерживают тип получателя X25519 с опциями для ключей на основе свойств или файлов. Стандарт age-encryption поддерживает бронированную кодировку ASCII, что полезно, когда не удается обрабатывать необработанные двоичные файлы. Java 11 и 17 поддерживают алгоритм ChaCha20-Poly1305 и X25519, но для Java 8 требуется другой поставщик безопасности, например Bouncy Castle. Поддержка криптографических алгоритмов проверяется во время выполнения, чтобы обеспечить прозрачный возврат к Bouncy Castle.Также в новой Apache NiFi 2.0 разрешено документирование конкретных вариантов использования служб процессоров и контроллеров в аннотациях. Ранее разработчик мог документировать созданные расширения с помощью аннотации @CapabilityDescription. Однако, этого недостаточно для поддержки всех вариантов описания сценариев использования точек расширения. Поэтому введены несколько новых аннотаций:- @UseCase для описания конкретного варианта использования, который может быть реализован с помощью определенного расширения, для включения очень краткого описания в виде пары предложений и любых дополнительных примечаний, а также ключевых слов для этого варианта использования и способа настройки компонент для его реализации; - @MultiProcessorUseCase для предоставления одинаковых подробностей о вариантах использования, которые включают несколько процессоров.Еще для разработчика NiFi важно переносить конфигурацию созданных расширений, включая процессоры, службы контроллеров и задачи отчетности. Для этого в релизе 2.0 добавлены новые механизмы, обеспечивающие простую миграцию конфигураций UDF-компонентов, которые развиваются. Администратору кластера NiFi важно знать о переходе с реляционной базы данных метаданных H2 на JetBrains Xodus. Эта миграция с ядра базы данных H2 на JetBrains Xodus нужна для оптимизации хранения истории конфигурации потока. Напомним, H2 – это открытая кроссплатформенная реляционная СУБД, полностью написанная на языке Java. В Apache NiFi она используется в качестве базы данных метаданных и хранит информацию о том, какие сегменты существуют, какие им принадлежат элементы и с версиями, а также историю версий для каждого элемента. Эта база данных настраивается в файле nifi-registry.properties по умолчанию, а ее содержимое хранится в файле в локальной файловой системе. JetBrains Xodus — это встроенная база данных без транзакционных схем, написанная на Java и Kotlin. Изначально она была разработана для JetBrains YouTrack, инструмента отслеживания проблем и управления проектами. Xodus также используется в JetBrains Hub, платформе управления пользователями для командных инструментов JetBrains, а также в некоторых внутренних проектах JetBrains. Xodus является транзакционным и полностью поддерживает ACID-требования к транзакциям. Также эта СУБД отлично поддерживает параллелизм и неблокирующие чтения благодаря MVCC-концепции моментальных снимков и настоящей изоляции снимков. Xodus не требует схемы данных, а потому очень гибок. Также эта СУБД не требует миграции схемы или рефакторинга. Будучи встроенной базой данных, Xodus не требует установки или администрирования. СУБД написана на языках Java и Kotlin, бесплатна и распространяется под лицензией Apache 2.0.Наконец, добавлены расширения для выбора лидеров кластера и управления состоянием на базе Kubernetes.Освойте администрирование и использование Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:- Эксплуатация Apache NIFIИсточники1. https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version2.0.0-M1 2. https://cwiki.apache.org/confluence/display/NIFI/Migrating+Deprecated+Components+and+Features+for+2.0.0 3. https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12339599
Как расширить возможности Apache NiFi, используя Python: знакомимся с библиотекой NiPyAPI. Возможности, принципы работы и примеры использования NiPyAPI в управлении средой NiFi: очистка от неиспользуемых компонентов.Python в Apache NiFiХотя официальная поддержка Python ожидается в релизе 2.0, о чем мы писали здесь, использовать этот язык программирования в Apache NiFi можно уже давно. Например, процессор ExecuteScript позволяет использовать язык сценариев для использования API NiFi для выполнения следующих задач:- чтение содержимого и/или атрибутов из входящего FlowFile;- создание нового FlowFile (с родительским элементом или без него);- запись содержимого и/или атрибутов в исходящий FlowFile;- взаимодействие с ProcessSession для передачи FlowFiles в отношения;- чтение/запись в State Manager для отслеживания переменных во время выполнения процессора.Справедливости ради, стоит отметить, что Python-движок, указанный в списке доступных скриптовых движков процессора ExecuteScript, на самом деле является Jython, а не Python. Поэтому при его использовании нельзя импортировать чистые CPython-модули, такие как pandas.Поэтому для Python-скрипта, который использует другие библиотеки и выдает выходные данные, можно использовать Execute Process, который выполнит сценарий Python на компьютере, используя полную python-библиотеку, и выходные данные станут файлом потока. Оператор устанавливает тип скрипта на Python и в разделе тело скрипта вводит скрипт.При этом можно использовать Nifi-Python-Api - богатый клиентский SDK Apache NiFi Python, который имеет 3 уровня поддержки Python для работы с Apache NiFi:- Высокоуровневые демонстрации и примеры сценариев;- Клиентский SDK среднего уровня для типичных сложных задач;- Клиентские SDK низкого уровня для полной реализации API NiFi и отдельных подпроектов.По сути, NiPyAPI – это Python-библиотека для взаимодействия с инстансами NiFi Она имеет следующие функциональные возможности:- Подробная документация полного SDK на всех уровнях;- Оболочки CRUD-операций для общих областей задач, таких как группы процессоров, процессоры, шаблоны, клиенты реестра, сегменты реестра, потоки реестра и т. д.- Удобные функции для задач инвентаризации, такие как рекурсивное получение всего холста или плоского списка всех групп процессов;- Поддержка планирования и очистки потоков, служб контроллера и соединений;- Поддержка выборки и обновления реестров переменных;- Поддержка импорта/экспорта версионных потоков из NiFi-Registry;- Конфигурации Docker Compose для тестирования и развертывания;- Развертывание интерактивной среды по сценарию и защищенная конфигурация для целей тестирования и демонстрации.Познакомившись с NiPyAPI, далее рассмотрим примеры использования этой библиотеки.Примеры использования NiPyAPIЧтобы использовать NiPyAPI, его сперва следует установить с помощью менеджера пакетов pip:pip install nipyapiСледующий пример показывает, как автоматизировать поиск неиспользуемых параметров, например, после рефакторинга, когда они были перемещены в другой контекст параметра или по-другому реализованы. При вводе параметров вновь вводимые чувствительные значения необходимо устанавливать вручную. Чтобы найти их, не проверяя вручную, можно запустить следующий Python-скрипт, использующий методы библиотеки NiPyAPI:def get_unused_parameters(app):app_pg_groups = nipyapi.canvas.list_all_process_groups(name_to_id)# Enable controllers and process groupsfor pg in app_pg_groups:if (pg.parameter_context is not None):#print("Processing: "+ str(pg.parameter_context.component.name))param_context = nipyapi.nifi.ParameterContextsApi().get_parameter_context(pg.parameter_context.component.id)params=param_context.component.parametersfor param in params:if len(param.parameter.referencing_components) == 0:print("Context: " + str(pg.parameter_context.component.name)+". Parameter: " + str(param.parameter.name) + " is not being used")if param.parameter.sensitive is True and param.parameter.value is None:print("Context: " + str(pg.parameter_context.component.name)+". Sensitive parameter " + str(param.parameter.name) + " has no value set")else:print("Process group: " + str(pg.component.name) + " has no parameter context assigned")А неиспользуемые службы контроллера NiFi можно найти с помощью следующего метода:def get_unused_controller_services(app):app_pg = nipyapi.canvas.get_process_group(identifier_type='id',identifier=name_to_id)# Enable controllers and process groupscs = nipyapi.nifi.FlowApi().get_controller_services_from_group(id=app_pg.id, include_ancestor_groups=False, include_descendant_groups=True).controller_servicesfor service in cs:if len(service.component.referencing_components)==0:print("Controller service " + str(service.component.name) + " has no referencing components. Parent pg: "+id_to_name)Таким образом, Python-скрипты с методами библиотеки NiPyAPI позволяют улучшить работу с NiFi: автоматизировать развертывание и сократить усилия, необходимые для очистки среды от компонентов, которые больше не используются.Освойте администрирование и использование Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:- Эксплуатация Apache NIFIИсточники1. - https://technology.amis.nl/big-data-database/apache-nifi-automating-tasks-using-nipyapi/2. - https://nipyapi.readthedocs.io/en/latest/3. - https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-scripting-nar/1.17.0/org.apache.nifi.processors.script.ExecuteScript/additionalDetails.html4. - https://community.cloudera.com/t5/Community-Articles/Python-Script-in-NiFi/ta-p/246406
Недавно мы писали, что такое Apache NiFi без сохранения состояния и чем он отличается от классического приложения потокового конвейера обработки данных. Сегодня рассмотрим особенности и ограничения Stateless-механизма и наилучшие сценарии использования в сравнении с классическим движком.Особенности и ограничения Stateless-движкаНапомним, классический NiFi предназначен для запуска большого многопользовательского приложения, в полной мере использует все предоставленные ресурсы, обеспечивая надежность потокового конвейера за счет сохранения данных на каждом этапе. Stateless-движок поддерживает тот же API, позволяя работать со всеми процессорами и определениями потоков базового фремворка, но выполняет фиксацию данных только после успешного завершения всего потока.Поэтому Stateless-механизм требует, чтобы источник данных был надежным и воспроизводимым, что гарантируют не все системы. Кроме того, каждый поток данных, запускаемый в Stateless-режиме, должен храниться в одном источнике и одном приемнике или пункте назначения, чтобы избежать дублирования данных. Поскольку данные в NiFi Stateless проходят через поток данных синхронно от начала до конца, использовать процессоры, которым требуется несколько FlowFile, не получится. Например, MergeContent и MergeRecord, которым нужны все данные для выполнения слияния. Если процессор имеет данные в очереди и запускается, но не может добиться какого-либо прогресса, Stateless-механизм снова запускает исходный процессор, чтобы ему дополнительные данные. Иногда это может привести к ситуации, когда данные будут поступать постоянно, в зависимости от поведения процессора. Чтобы избежать этого, объем данных, которые могут быть переданы при одном вызове потока данных, ограничивается с помощью настроек конфигурации.Например, если конфигурация потока данных ограничивает объем данных на один вызов, а процессор MergeContent настроен так, что ожидает определенного количества данных, поток будет продолжать запускать MergeContent без какого-либо прогресса до тех пор, пока истечет максимальный срок хранения или время потока данных.Кроме того, в зависимости от контекста, в котором выполняется Stateless, запуск исходных компонентов может не предоставить дополнительные данные. Например, если Stateless запускается в среде, где данные ставятся в очередь во входном порту, а затем запускается поток данных, последующий запуск входного порта не приведет к созданию дополнительных данных. Поэтому дата-инженер должен убедиться, что все потоки данных, содержащие логику для объединения FlowFiles, настроены с использованием максимального срока хранения для MergeContent и MergeRecord. В стандартном развертывании NiFi в этой ситуации обычно происходит зацикливание ошибочного соединения от исходного процессора обратно к нему же. Это приводит к тому, что процессор постоянно пытается обработать FlowFile, пока не добьется успеха. В классическом приложении NiFi получает данные и отвечает за владение ими, храня их до тех пор, пока нижестоящие службы не смогут получить эти данные. Однако, в случае с Stateless NiFi хранение не реализуется, а источник данных считается надежным и воспроизводимым, что не всегда соответствует действительности.Кроме того, Stateless-движок не сохраняет данные после перезапуска, поэтому алгоритмы обработки сбоев могут быть разными. При использовании Stateless-механизма в случае невозможности доставить данные в нижестоящую систему следует направить FlowFile на выходной порт, а затем пометить его как порт сбоя.Наконец, при использовании механизма без сохранения состояния потоки не должны загружать большие файлы, поскольку, в отличие от классического NiFi, содержимое FlowFile хранится не на диске, а в памяти, в куче JVM. А память не предназначена для долговременного хранения больших объемов данных. Поэтому не рекомендуется загружать большие файлы, такие как набор данных размером 100 ГБ, в NiFi Stateless. Это приводит к ошибке OutOfMemoryError или к значительной сборке мусора, что сильно снижает производительность. Впрочем, частично обойти это ограничение можно, настроив Stateless-движок на использование репозитория контента с диска.Классический Apache NiFi vs Stateless-механизм: ключевые отличияЧтобы резюмировать отличия классического механизма Apache NiFi от Statless-движка, сравним их по следующим критериям:- Ключевое назначение; - Долговечность хранения данных; - Порядок обработки данных; - Работа с памятью (кучей JVM); - Работа в режиме клиента или сервера; - Особенности потребления ресурсов; - Надежность происхождения данных; - Варианты использования.Для наглядности сравнения составим таблицу.Освойте администрирование и использование Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:- Эксплуатация Apache NIFIИсточники1. https://github.com/apache/nifi/blob/main/nifi-stateless/nifi-stateless-assembly/README.md 2. https://bryanbende.com/development/2021/11/10/apache-nifi-stateless
Чем Stateless-движок отличается от классического механизма потоковой обработки данных Apache NiFi, каковы его ключевые принципы работы и почему здесь особенно важна надежность источника. Классический Apache NiFi: основные понятия Приложение Apache NiFi можно рассматривать как два отдельных, но взаимосвязанных компонента: подлинности потока и его движок. Объединив их в одном приложении, NiFi позволяет дата-инженерам создавать поток данных и запускать его в режиме реального времени в одном пользовательском интерфейсе. Однако эти два понятия можно разделить. NiFi можно использовать для создания потоков, которые затем могут запускаться не только NiFi, но и другими совместимыми механизмами потоков данных: MiNiFi Java и C++ (подпроекты Apache NiFi) и NiFi Stateless. Каждый из этих механизмов потока данных имеет свои сильные и слабые стороны, обусловливающие их варианты использования.Классический NiFi предназначен для запуска как большого многопользовательского приложения, поэтому стремится в полной мере использовать все предоставленные ему ресурсы, включая диски/хранилище и множество потоков. Обычно один экземпляр NiFi кластеризуется по множеству разных узлов, образуя большой связный поток данных, который может состоять из множества различных подпотоков. NiFi полностью отвечает за данные, которые ему доставляются: надежно хранит их на диске до тех пор, пока они не будут доставлены во все пункты назначения. Доставке этих данных может быть присвоен приоритет в разных точках потока, чтобы более важные для конкретного пункта назначения данные были доставлены туда прежде всего. Это обеспечивает целостное представление о том, как данные обрабатываются и проходят через все предприятие.Однако, бывают сценарии, для которых можно использовать более легкое приложение, способное взаимодействовать со всеми различными конечными точками, с которыми может взаимодействовать NiFi, выполнять все преобразования, маршрутизацию, фильтрацию и обработку. Но при этом приложение работает с относительно небольшими потоками данных, которые не требуют сохранения состояния.Напомним, основной концепцией в NiFi является FlowFile – информационный объект, который движется по потоку данных и состоит из атрибутов и содержимого. В классическом фреймворке каждый узел имеет набор внутренних репозиториев, которые хранятся на локальном диске. Репозиторий FlowFile содержит состояние каждого потокового файла, включая его атрибуты и расположение в потоке, а репозиторий содержимого хранит его содержимое. Подробнее про репозитории NiFi мы писали здесь. При каждом выполнении процессора дается ссылка на сеанс, который действует как транзакция для работы с FlowFile. Если все операции завершены успешно и сеанс зафиксирован, все обновления сохраняются в репозиториях NiFi. В случае перезапуска приложения все данные сохраняются в репозиториях, и поток начнет обработку с последнего зафиксированного состояния. В Stateless-движке подобное сохранение не обеспечивается. Как это работает, рассмотрим далее.Stateless-движок: основные принципы работыМногие концепции в Stateless-движка отличаются от концепций типичного движка Apache NiFi. Stateless обеспечивает механизм потока данных с меньшим размером. Он не включает пользовательский интерфейс для создания или мониторинга потоков данных, а вместо этого запускает потоки данных, созданные с помощью приложения NiFi. Хотя NiFi работает лучше всего при наличии доступа к быстрому хранилищу, такому как SSD и диски NVMe, Stateless может хранить все данные в памяти. В качестве альтернативы используется дисковый репозиторий для содержимого FlowFile, чтобы избежать необходимости использования очень больших куч Java. Однако, как и следует из названия, Stateless не сохраняет состояние. Поэтому данные не будут восстановлены после перезапуска: если Stateless-механизм будет остановлен, у него больше не будет прямого доступа к данным, которые находились в процессе передачи. Таким образом, Stateless можно использовать только для потоков данных, где источник данных является одновременно надежным и воспроизводимым, или в сценариях, где потеря данных не является критической проблемой. Например, чтение данных из Apache Kafka или JMS-брокеров, а затем выполнение некоторой маршрутизации/фильтрации/манипулирования и, наконец, доставка данных в другое место назначения. Если запускать такой поток данных в классическом NiFi, данные потребляются из источника, записываются во внутренние репозитории и попадают под ответственность NiFi, который будет отвечать за доставку по всем пунктам назначения, даже при перезапуске приложения. В случае с Stateless данные будут потребляться, а затем передаваться следующему процессору в потоке. Данные не записываются во внутренние репозитории и передаются следующим процессорам по потоку. Данные, полученные от источника, будут подтверждены только после того, как они достигнут конца всего потокового конвейера. Если Stateless перезапускается до завершения обработки, данные не подтверждены, а потому используются снова. Это позволяет обрабатывать данные в памяти, не опасаясь потери данных, но также возлагает на источник данных ответственность за их надежное хранение и обеспечение возможности их воспроизведения.Stateless-движок придерживается того же API-интерфейса, что и классический фреймворк, позволяя работать с теми же процессорами и определениями потоков, но обеспечивает другую реализацию базового движка. Для этого в Stateless есть объект StatelessDataFlow, запускаемый для выполнения потока. Каждое выполнение потока дает результат, который можно считать успешным или неудачным. Сбой может произойти из-за того, что процессор выдает исключение, или из-за явной маршрутизации файлов потока на поименованный порт сбоя. Ключевое отличие Stateless-движка заключается в фиксации сеанса NiFi с помощью метода ProcessSession с сигнатурой void commitAsync(Runnable onSuccess). Это дает реализации сеанса контроль над тем, когда выполнять данный обратный вызов. В классическом NiFi сеанс может выполнить обратный вызов на последнем этапе commitAsync(), обеспечивая сохранность данных при перезапуске даже без фиксации того, что они достигли конечной точки потокового конвейера. Сеанс NiFi Stateless может удерживать обратный вызов и выполнять его только после успешного завершения всего потока.Еще в выпуске 1.15.0 был представлен процессор ExecuteStateless для запуска движка Stateless из классического NiFi. Это позволяет управлять выполнением потока без сохранения состояния с помощью традиционного пользовательского интерфейса фреймворка, а также подключать выход Stateless-потока для последующей обработки в классическом потоке NiFi.Чтобы использовать процессор ExecuteStateless, следует сначала в классическом NiFi создать группу процессов, содержащую поток, который надо выполнить с помощью Stateless-механизма. Затем надо загрузить определение потока или зафиксировать его в экземпляре реестра NiFi, а потом настроить для процессора ExecuteStateless местоположение определения потока.Stateless-механизм можно использовать в качестве библиотеки и встраивать в другие приложения, а также запустить непосредственно из командной строки из сборки NiFi с помощью скрипта bin/nifi.sh. Для этого необходимы три файла: файл свойств конфигурации механизма, файл свойств конфигурации потока данных и сам поток данных, который может существовать в виде файла или указывать на поток в реестре NiFi. Stateless-движок принимает два отдельных файла конфигурации: файл конфигурации механизма и файл конфигурации потока данных, поскольку обычно конфигурация механизма одинакова для всех выполняемых потоков, поэтому ее можно создать только один раз. А вот конфигурация потока данных будет отличаться для каждого потока данных, который необходимо запустить.Ключевым аспектом NiFi является разделение платформы и расширений, работающих с данными, поэтому должен быть механизм определения расположения этих расширений. При запуске Stateless-движка он анализирует предоставленный поток данных и определяет, какие пакеты/расширения необходимы для запуска потока данных. Если расширение недоступно или версия, на которую ссылается поток, недоступна, Stateless пытается загрузить расширения автоматически, используя предварительно настроенные клиенты расширения или те расширения, которые уже доступны, например, загружены вручную и скопированы в автономном режиме в каталогах, указанных в свойствах nifi.stateless.extensions.directory и nifi.stateless.readonly.extensions.directory.Конфигурация потока данных предоставляет Stateless-движку всю информацию о том, какой поток запускать. Местоположение потока указывается в URL-адресе реестра NiFi, идентификаторе сегмента и идентификаторе потока или в JSON-определении потока. На практике проще всего экспортировать поток из NiFi на локальный диск для использования Stateless-движком в GUI, через контекстное меню группы процессов или загрузить поток сразу на холст.Разобравшись с основными принципами работы Stateless-движка, в следующей статье рассмотрим, его особенности и ограничения.Узнайте больше про администрирование и использование Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:- Эксплуатация Apache NIFIИсточники1. https://github.com/apache/nifi/blob/main/nifi-stateless/nifi-stateless-assembly/README.md 2. https://bryanbende.com/development/2021/11/10/apache-nifi-stateless
Опубликованная впервые в 2016 году 1-ая версия Apache NiFi дополняется новыми минорными релизами, последним из которых стал 1.23.2, исправляющий ошибки предыдущих выпусков. Однако, в обозримом будущем ожидается мажорный релиз 2.0 со множеством новых возможностей. Разбираемся с его наиболее перспективными предложениями.ТОП-10 целей Apache NiFi 2.0Чтобы повысить безопасность, снизить сложность и внедрить новые функции, NiFi 2.0 сфокусирован на сокращении технического долга. Согласно принципам семантического управления версиями, мажорный выпуск предоставляет возможность внесения критических изменений, которые исключают обратную совместимость с предыдущими версиями. 15 декабря 2022 года комитет по управлению проектом Apache NiFi проголосовал за принятие следующих основных целей нового мажорного релиза:- внедрить поддержку Java 11 вместо Java 8. В частности, для Jetty 10 требуется и OpenSAML 4 требуется Java 11. Поддержка Java 8 в Kafka 3 прекращена в сентябре 2021 года, а в Spring 6 – в ноябре 2022 г.- удалить устаревшие компоненты и свойства компонентов, включая процессоры и службы контроллеров, помеченные как неподдерживаемые или замененные лучшими альтернативами. Удаление устаревших свойств компонента позволит сохранить существующие процессоры, устраняя при этом дублирующиеся параметры конфигурации. Например, свойства PGP в EncryptContent и связанные с ними возможности теперь реализованы в EncryptContentPGP и DecryptContentPGP, о чем мы писали здесь. Свойства Keytab непосредственно в процессорах, поддерживающих Kerberos, а KeytabCredentialService заменены на KerberosUserService.- удалить компоненты, интегрирующиеся с необслуживаемыми сервисами и/или их устаревшими версиями без технической поддержки;- удалить ненужные классы и методы, например, PersistentProvenanceRepository, Стандартный RecordWriter NiFiLegacyCipherProvider и связанные с ним методы шифрования supportExpressionLanguage() без аргументов, пользовательские классы InputStream и OutputStream.- заменить представление внутренней конфигурации потока в файле flow.xml.gz на flow.json.gz. В NiFi 1.16.0 представлен flow.json.gz для хранения конфигурации потока с использованием классов модели версионных компонентов. Более простой JSON-формат снижает затраты на сохранение конфигурации и синхронизацию нескольких представлений и уже используется в NiFi, NiFi Stateless и NiFi Registry.- удалить дублирующиеся функции и элементы, например, шаблоны XML, замененные JSON-представлениями и реестр переменных, замененный контекстом параметров;- обновить внутренние ссылки на API Java - перенос внутреннего использования классов util.Date в java.time обеспечит большую точность анализа и форматирования. Это предполагает рефакторинг использования классов DateTimeFormatter и java.time в компонентах, ориентированных на записи.- Реорганизация стандартных компонентов для уменьшения размера и объема процессоров и пакетов NAR Standard Services. Компоненты со специализированными зависимостями будут перемещены из стандартных процессоров (SFTP, HTTP, JSON, Netty) в отдельные пакеты.- Внедрение инструментов миграции для обновления потоков, включая автоматическую миграцию для переназначения свойств и функций, преобразования шаблонов XML в определения потока JSON.Важные возможности и ключевые измененияВ Apache NiFi 2.0 ожидается поддержка Python, что очень порадует многих дата-инженеров и разработчиков. Также будет добавлена возможность запуска группы процессов с использованием NiFi Stateless – компонента, который обеспечивает механизм потока данных с меньшим масштабом и набором функций. Он не включает пользовательский интерфейс для создания или мониторинга потоков данных, а просто запускает потоки данных, созданные с помощью приложения NiFi. NiFi Stateless может хранить все данные в памяти или использовать дисковый репозиторий для содержимого FlowFile, чтобы избежать необходимости использования очень больших куч Java. Однако, данные не будут восстановлены после перезапуска, что и обусловливает название компонента – без сохранения состояния. Использование NiFi Stateless на уровне группы процессов позволит реализовать транзакционные сценарии потокового конвейера обработки данных, когда поток следует рассматривать как единую транзакцию. Это пригодится в случае захвата измененных данных (CDC, Change Data Capture).Также ожидается улучшение движка обработки правил (Rules Engine), который дает рекомендации проектировщику потока данных по настройке компонентов, опираясь на лучшие практики современной дата-инженерии.Как уже было отмечено, вместо XML-представлений будут использоваться JSON-структуры. Шаблоны XML хранятся в памяти NiFi, а также в постоянном определении потока (файлы flow.xml.gz и flow.json.gz), и это вызвало множество проблем у пользователей с десятками или сотнями массивных шаблонов с тысячами компонентов. Удаление всего этого повысит стабильность NiFi и улучшит использование памяти. При переходе на новую версию придется экспортировать шаблоны в виде определений JSON или создать версию шаблонов в экземпляре реестра NiFi. Лучше использовать реестр NiFi вместе с самим фреймворком, чтобы контролировать версии и определения потока данных для совместного и повторного использования. Для этого, если шаблон представляет собой группу процессов, можно перетащить его на холст, а затем через контекстное меню экспортировать его как определение потока (файл JSON) или запустить контроль версий в реестре NiFi. Если шаблон не является группой процессов, а представляет собой непосредственно поток с компонентами, нужно перетащить группу процессов, затем перейти в эту группу процессов и перетащить туда шаблон. Далее можно вернуться к родительской группе процессов, содержащей шаблон, и экспортировать ее как определение потока или запустить для нее контроль версий.Переменные и реестр переменных исчезнут, поскольку они имеют множество ограничений, таких как необходимость поддержки языка выражений в свойстве для ссылки на переменную и невозможность хранения конфиденциальных значений. Вместо этого будут использоваться контексты параметров, которые поддерживают концепцию провайдеров. Например, провайдер контекста параметров для получения значений параметров из внешних хранилищ, таких как HashiCorp, хранилища облачных провайдеров и пр.Стратегия планирования на основе событий была экспериментальной опцией, доступной на некоторых процессорах. Поскольку она не принесла каких-либо существенных улучшений производительности, в версии NiFi 2.0 ее не будет. Вместо этой стратегии следует использовать стратегию планирования на основе таймера. Для более простого отслеживания удаленных и устаревших компонентов в NiFi есть специальный файл журнала nifi-deprecation.log, который содержит их перечисления.Наконец, для обновления зависимостей в пользовательских компонентах, в набор CLI-инструментов NiFi 2.0 будет добавлена команда рекурсивного изменения текущих версии всех экземпляров компонента на более новые.Узнайте больше про администрирование и использование Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:- Эксплуатация Apache NIFIИсточники1. https://medium.com/cloudera-inc/getting-ready-for-apache-nifi-2-0-5a5e6a67f4502. https://cwiki.apache.org/confluence/display/NIFI/NiFi+2.0+Release+Goals
Недавно мы писали про спецификацию OpenLineage, которая позволяет обеспечить мониторинг происхождения данных в Apache AirFlow. Сегодня рассмотрим, в чем разница Data Lineage и Data Provenance, а также, как потоковый маршрутизатор Apache NiFi организует отслеживание событий генерации и изменения данных.Data Lineage vs Data ProvenanceСначала рассмотрим, чем отличается Data Provenance от Data Lineage. Хотя на русский оба этих выражения переводятся одинаково, они имеют разный смысл. Data Lineage показывает, откуда взялись данные и как они развивались на протяжении своего жизненного цикла. А Data Provenance больше ориентировано на историческую запись данных, фиксируя историю происхождения продукта данных, начиная с его изначальных источников.Data Lineage прослеживается до источников, из которых они были получены, и этапов трансформации, которые они прошли. Это относится к жизненному циклу данных, включая их происхождение и то, куда они перемещаются с течением времени. Data Lineage обеспечивает наглядность аналитического конвейера и предоставляет схему для понимания того, как данные доставляются от источника к конечному пользователю. Сюда входят процессы, через которые проходят данные, такие как преобразования, агрегации и другие манипуляции. Эту информацию можно использовать для анализа первопричин, отслеживания ошибок, улучшения качества данных и обеспечения соответствия нормативным требованиям. Data Lineage отвечает на следующие вопросы о данных:- Откуда взялись данные?- Какие преобразования они претерпели?- Куда они движутся дальше?- Кто их использует и с какой целью?Data Provenance больше ориентировано на историческую запись данных, фиксируя историю происхождения продукта данных, начиная с его первоначальных источников. Это включает в себя такие сведения, как кто создал данные, когда они были созданы, какие изменения были внесены в них и кем. Можно сказать, что Data Lineage обеспечивает высокоуровневое представление о пути данных, а Data Provenance глубже погружается в особенности их истории, подлинность и целостность данных. Data Provenance отвечает на следующие вопросы о данных:- Кто создал данные и когда?- Какие изменения были внесены в данные и кем?- Каково качество и надежность данных?- Каков первоначальный источник данных?Таким образом, оба термина направлены на понимание истории данных, но отличаются по глубине и направленности. Data Lineage ориентировано на перемещение и преобразование данных, тогда как Data Provenance больше связано с историей и подлинностью данных. Для комплексного управления данными и стратегии управления данными необходимо поддерживать и использовать обе концепции. Будучи потоковым маршрутизатором, Apache NiFi поддерживает именно Data Provenance. Как именно, рассмотрим далее.Как отследить создание и изменение данных в Apache NiFiПри мониторинге потока данных пользователям часто требуется способ определить, что произошло с конкретным объектом данных (FlowFile). В Apache NiFi эта информация представлена на странице происхождения данных Data Provenance. Поскольку NiFi записывает и индексирует детали происхождения данных по мере прохождения объектов через систему, пользователи могут выполнять поиск, устранять неполадки и оценивать такие вещи, как соответствие и оптимизация потока данных, в режиме реального времени. По умолчанию NiFi обновляет эту информацию каждые пять минут, но это можно настроить.Данные о происхождении потока в NiFi позволяют воспроизводить потоковый конвейер. Пока данные о происхождении не устарели и указанный контент все еще доступен в репозитории контента, любой FlowFile можно воспроизвести из любой точки потока. Это значительно сокращает жизненный цикл разработки потокового конвейера, экономя время дата-инженера.Все сведения по происхождению данных в Apache NiFi отображаются в GUI на вкладке с названием Data Provinance. Это диалоговое окно позволяет просмотреть самую последнюю доступную информацию о происхождении данных, выполнить поиск информации для конкретных элементов и отфильтровать результаты поиска. Также можно открыть дополнительные диалоговые окна, чтобы просмотреть подробную информацию о событии, воспроизвести данные в любой точке потока данных и просмотреть графическое представление происхождения данных или пути в потоке. Когда авторизация включена, для доступа к информации о происхождении данных требуется глобальная политика запрос происхождения, а также политика компонента просмотр происхождения для компонента, который сгенерировал событие. Кроме того, для доступа к сведениям о событии, которые включают атрибуты и содержимое FlowFile, требуется политика компонента просмотр данных для компонента, сгенерировавшего событие.Реестр событий происхождения данных в Apache NiFiКаждая точка потока данных, где FlowFile каким-либо образом обрабатывается, считается событием происхождения. В зависимости от структуры потока данных происходят различные типы событий происхождения. Например, когда данные вводятся в поток, возникает событие RECEIVE, а когда данные отправляются из потока, возникает событие SEND. Могут возникать другие типы событий обработки, например, если данные клонируются (событие CLONE), маршрутизируются (событие ROUTE), изменяются (событие CONTENT_MODIFIED или ATTRIBUTES_MODIFIED), разделяются (событие FORK), объединяются с другими объектами данных (событие JOIN), и в конечном итоге удаляется из потока (событие DROP).Apache NiFi поддерживает следующие типы событий:- ADDINFO - событие происхождения, когда добавляется дополнительная информация, такая как новая связь с новым URI или UUID;- ATTRIBUTES_MODIFIED – изменение атрибутов FlowFile;- CLONE - указывает, что FlowFile является точной копией своего родительского FlowFile;- CONTENT_MODIFIED – изменение содержимого FlowFile;- CREATE – создание FlowFile на основе данных, которые не были получены от удаленной системы или внешнего процесса;- DOWNLOAD - содержимое FlowFile загружено пользователем или внешним объектом;- DROP - завершение жизни объекта по какой-либо причине, кроме истечения срока действия объекта;- EXPIRE - завершение срока службы объекта из-за того, что объект не был обработан своевременно;- FETCH - содержимое FlowFile перезаписано содержимым какого-либо внешнего ресурса;- FORK - один или несколько FlowFile были производными от родительского FlowFile;- JOIN - один FlowFile получен в результате объединения нескольких родительских FlowFile;- RECEIVE - получение данных из внешнего процесса;- REMOTE_INVOCATION – регистрация удаленного вызова к внешней конечной точке;- REPLAY - воспроизведение FlowFile;- ROUTE – направление FlowFile к указанному отношению с приведением причины;- SEND - отправка данных во внешний процесс;- UNKNOWN - тип события происхождения данных неизвестен для конкретного пользователя по причине отсутствия у него прав на просмотр этой информации.Одна из наиболее распространенных задач, выполняемых на странице Data Provenance, — это поиск заданного FlowFile, чтобы определить, что с ним произошло. Это также можно найти в GUI фреймворка. В диалоговом окне пользователь может определить для поиска, включая интересующее событие обработки, отличительные характеристики FlowFile или компонента, вызвавшего событие, временной интервал, в течение которого выполняется поиск, и размер FlowFile.Например, чтобы определить, был ли получен конкретный FlowFile, можно найти тип события RECEIVE и включить идентификатор FlowFile, например его uuid или имя файла. Звездочку (*) можно использовать в качестве подстановочного знака для любого количества символов. По каждому событию происхождения данных для каждого события можно просмотреть его подробности: детали, атрибуты и содержимое.Детали отображают различные сведения о событии, например, когда оно произошло, тип события и компонент, вызвавший это событие. Также отображается информация об обработанном FlowFile с указанием его UUID, UUID любых родительских или дочерних файлов FlowFile, связанных с текущим. Атрибуты показывают атрибуты, существующие в FlowFile на данный момент в потоке. Также можно настроить отображение только тех атрибутов, которые были изменены в результате обработки.Можно увидеть графическое представление происхождения или пути, который FlowFile прошел в потоке данных на графе, который отображает его сам и различные произошедшие с ним события обработки. Также можно увидеть, как происхождение развивалось с течением времени.Граф событий происхождения данных в Apache NiFiУзнайте больше про администрирование и использование Apache NiFi для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:- Эксплуатация Apache NIFIИсточники1. https://atlan.com/data-lineage-vs-data-provenance/2. https://nifi.apache.org/docs.html3. https://blogsarchive.apache.org/nifi/entry/encrypted-provenance-repositories-in-apache