Part 1: Scala with Spark Legacy Code Conversion
Part 2: Running within a Kubeflow Pipeline (KFP)
Part 3: Platform Engineering for Scala with Spark
Running within a Kubeflow Pipeline (KFP)
Written by: Alessandro – Data Engineer NStarX
Contributor: Yassir – Data Scientist NStarX
Preface
Many companies today are looking to migrate away from managed cloud platforms like Databricks, in favour of more customizable, cost effective options like Kubeflow. One of the challenges that arise from a move of this kind, is the learning curve data engineers and scientists will face when working with a new platform.
Previously in this three part series, we explored some of the solutions we found for dealing with a legacy code base. Specifically one that was locked into the Databricks infrastructure, which was written in Scala and leveraged Spark. In the second part of this three part series, we will be outlining the more technical aspects which can pose learning curve issues to new users switching from Databricks to Kubeflow. More specifically, we will detail the problems and solutions attributed to packaging scripts into reliable and repeatable Kubeflow pipelines.
Introduction
Companies who decide to make a switch from Databricks to Kubeflow are probably doing so because of the cost optimization, customizability, vendor independence or because they want granular control over their deployments and scaling. That being said, there could be a lot of internal push back from data engineers and scientists who have become entrenched in Databricks’ operational practices.
When working with Databricks over a prolonged period of time, it is not only the code base which becomes locked into the Databricks infrastructure. Developers who are accustomed to Databricks’ tools and operational practices may be resistant to adapting to Kubeflow at first. Kubeflow does have parallel operational practices, but these can be met with disdain from developers who have become complacent in status quo.
For those unfamiliar with Kubeflow’s operational practices, Kubeflow could seem to have a steep learning curve, but once adapted, a user will find developing workflows in Kubeflow as routine as Databricks. In the following paper, we will address some of the painpoints Databricks users may face when switching to Kubeflow as the platform to execute their workflows, and what steps new users can take to avoid this steep learning curve.
Real-World Example
In the first part of this series, we described how we were met with the challenge of decoupling Scala code leveraging Spark from Databricks and how we came to resolve those issues. Once that code had been decoupled, we then had to package this code into a stable and repeatable pipeline. This pipeline would have to match the specifications and steps that were present within Databricks. Unlike the first part, we were unable to simply update existing code. Kubeflow Pipelines and Databricks Pipelines are different in nature to set up; but once a user understands the framework of a KFP (Kubeflow Pipeline), the setup becomes very routine.
Kubeflow pipelines, when compared to Databricks, are far more customizable; but, as a result, require additional attention to detail, as well as a deeper understanding of the mechanics behind pipeline execution, especially when using distributed computational execution. As such, when we were met with the task of replicating a Databricks pipeline within Kubeflow, we had to do so using the same resource limitations employed in Databricks, whilst maintaining the same scalability, reliability, code immutability, and execution times.
Challenges of Real World Example: Replicating a Databricks pipeline in Kubeflow
When recreating a Databricks pipeline in Kubeflow, the primary concern of developers is that the KFP be essentially identical to the Databricks Pipeline, in terms of execution, reliability, scalability and immutability. This means that developers need an equivalent environment to Databricks for both development and for production within Kubeflow.
What becomes challenging for developers conditioned to the Databricks’ modus operandi, is decoupling their own biases of how a pipeline’s lifecycle should progress from development to production, and how this lifecycle works within Kubeflow. These challenges, while rooted in a lack of understanding of the new platform, tend to be less platform based, and more attributed to individual user bias. That being said, there can still be pain points when learning a new system or platform.
In typical Databricks fashion, a pipeline will first be created in a personal or shared volume (storage space), where the code is easily accessible and customizable. From here, once the code has reached a place where it can be put into production, it will no longer be referenced from this common, readily editable place. Within a production pipeline, a Databricks developer will typically have their pipeline reference code from a git repository, thus ensuring its immutability. Within Kubeflow, development typically happens on a shared volume and code immutability is insured through version controlled docker images.
Furthermore, within Databricks, computational requirements for a job within a pipeline are set through a UI, where users can select their machine type, best suited for their execution needs. Within Databricks, machines are selected; whereas in Kubeflow, the size of drivers and executors are determined based on previously selected machines. While these actions might seem different, they are indeed parallel, with drawbacks and benefits coming from both platforms.
Best Practices
Docker Suggestions
The first concept or tool a developer switching from Databricks to Kubeflow will have to grasp is Docker. While not a repository in its own right, a Docker image is a self contained, lightweight, standalone software package that hosts everything required to run a workflow job, including libraries, code, environment variables, and configuration files. In this light, Docker will act in place of the repository previously used to ensure production code is immutable. This means that Docker images will be tagged and versioned, to ensure production pipelines are stable and secure. The code for the jobs ran within these Kubeflow Pipelines will be accessed directly from within these docker images.
When creating docker images, it is important to be mindful of the size. The larger the image, the slower they will be to build, push and pull. If many parts of your pipeline require various large packages that are not common between jobs, it is advisable to create separate docker images for each job, ensuring the docker images are kept as lean as possible. Additionally, as with any company repository, Docker images should be hosted on a private company registry, relative to the cloud platform being used.
KFP Suggestions
For a detailed breakdown of KFP functionality, there is documentation from Kubeflow which can be found here https://www.kubeflow.org/docs/components/pipelines/, so we will not outline all the steps to build a pipeline, but rather, some of the additional best practices we came across in our own experience when leveraging this platform. Basic pipelines are very simple, but the integration of Scala and distributed Spark can prove to be a little complex if it is a developer’s first time using the platform.
To start, when developing or testing, it is advisable to use a common path, mounted to the job, so that code is easily configurable and customizable, as need be. The common path is a volume, mounted within your kubernetes cluster, so the pipeline jobs (in this case named components) are able to access this volume as long as it is mounted to the specified job.
def mount_pvc_to_component (component, pvc_name='my-storage', mount_path='/app/directory/sub-directory/'):
kubernetes.mount_pvc(
component,
pvc_name=pvc_name,
mount_path=mount_path,
)
In this case, we have a singular common location for our code to be referenced within the pipeline; therefore, we mount the volume to each job/component within the pipeline. When specifying file location, it will be respective to the relative path within this volume. This again, is only for development purposes. Once code has reached a stage where it can be put into production, we remove this mount and access the scripts directly from within the Docker images themselves. Below is a code snippet of how we mounted the volume containing the code to each component/job within the pipeline, here named c0 and c1.
# Components
components = [
c0, c1
]
# Mount PVC to all components
for component in components:
mount_pvc_to_component(component)
When defining the function to run the Scala scripts, any environment variables which a developer may want to be editable in the Kubeflow UI should be defined in the function itself. Meaning, all variables present in the defined function, will be editable at the time of pipeline creation. An example of this can be seen below. If a developer prefers to have their environment variables immutable, then these can be defined directly within the Docker image. If the script is written correctly, it will overwrite whatever default value was set, in favour of this preferred environment variable.
def run_scala (scala_file_name: str, bucket_uri: str, ingestion_period_days: str, core_limit: str,
executor_minimum: str, executor_maximum: str, executor_memory: str, driver_memory: str, system_environment: str):
return ContainerSpec(
image='[Registry]/[Image Name]:[Version]',
command=['bash', '-c',
'set -o pipefail ; '
'export Bucket_URI=$0 ; '
'export Ingestion_Period_Days=$1 ; '
'export Core_Limit=$2 ; '
'export Executor_Minimum=$3 ; '
'export Executor_Maximum=$4 ; '
'export Executor_Memory=$5 ; '
'export Driver_Memory=$6 ; '
'export System_Environment=$7 ; '
'/usr/local/bin/python /app/kubeflow_pipeline/add_pod_label.py ; '
'export K8S_SPARK_SERVICE=$(cat /tmp/selector.txt) ; '
'cat /tmp/selector.txt ; '
'/app/kubeflow_pipeline/scala_executor.sh $8 ; '
'/usr/local/bin/python /app/kubeflow_pipeline/clean_svc.py; '],
args=[f"{bucket_uri}", f"{ingestion_period_days}", f"{core_limit}", f"{executor_minimum}",
f"{executor_maximum}", f"{executor_memory}", f"{driver_memory}", f"{system_environment}",
f"{scala_file_name}"]
)
In the above example, we have a singular function for running a scala script, but it should be noted that this script will use the same docker image across each job. If a developer wishes to use multiple Docker images or environment variables, they should name these functions accordingly.
From here, if this is a function to be shared across multiple jobs/components, a developer will define the individual component to be executed. That being said, if the function is only utilized once within the pipeline, a developer can simply define that function as the container component itself as well.
@container_component
def myScalaScript(scala_file_name: str, bucket_uri: str, ingestion_period_days: str, core_limit: str, executor_minimum: str, executor_maximum: str, executor_memory: str, driver_memory: str, system_environment: str):
return run_scala(scala_file_name, bucket_uri, ingestion_period_days, core_limit, executor_minimum, executor_maximum, executor_memory, driver_memory, system_environment)
Stability Suggestions
In regards to the pipeline itself, stability and scalability are a major concern for developers who want reliability when in production. In our experience, there are some additional methods a developer can employ, not specified within the documentation, that can help to ensure pipelines, and the environment itself, remain reliable.
Firstly, creating a script, defined as add_pod_label.py in the above run_scala user defined function, can be used to set up the environment for the current pod being provisioned. This script essentially ensures that the Spark executor can identify the Spark Service. The purpose of this script is to add a specific label to the current pod, setting up the Spark service and saving the Spark service to a temp file called selector.txt, so that the Spark executor is able to identify the service. This way, a Spark executor is never left in limbo or left without a service due to anomalies or interruptions within the cluster itself. This script adds stability to the pipeline by ensuring the Spark executor will always have a specifically defined Spark service.
Similarly, scala_executor.sh, as seen in the above defined function, was also written to provide stability. In the current iteration of KFP v2, the retry function from v1 has not yet been ported over. Our solution to this was to create a simple script which employs a retry function if the script fails. Number of retries and time between retries can always be altered. The one caveat is that if the driver pod gets evicted, then the job itself will fail, as it no longer exists. The repeat function can only work as long as the driver pod remains active. In practice, this is a feature better suited for testing and development, but it is still advisable to employ in production, in case there is a failure due to some unforeseen anomaly.
Finally, the clean_svc.py script cleans up all zombie kubernetes services that may be lingering after the Spark job is complete. This script reads the environment variables to find the Spark service name and deletes the service associated with that namespace. It also attempts to remove any associated Istio virtual services, to ensure that no unnecessary network configurations are left lingering.
Spark Suggestions
Another major consideration, which is not mentioned in the documentation, is that the flags used to define Spark sessions (such as number of cores and allotted memory), are only altering the executors of that Spark session (not the drivers) if defined within the script itself. To alter the driver, these Spark configurations must be set at the component level, and mounted to the components, the same way the volumes were mounted to each component in the prior example. Furthermore, it is advisable to have a specific dedicated node group, just for pipeline executions, so that no other jobs or services are taking resources away from the pipeline attempting to execute. Especially in a production environment, this will help to add stability and efficiency as no other deployments are consuming node resources allocated for the production pipelines.
for component in components:
kubernetes.add_node_selector(component, 'asg-type', 'drivers-pipeline')
component.set_cpu_limit(cores)
component.set_cpu_request(cores)
component.set_memory_limit(executor_memory)
component.set_memory_request(executor_memory)
In order to maximize the use of a node, node size must be considered when determining the size of drivers and executors, keeping in mind that no matter the node size, developers must also calculate the overhead of said node. Maximizing the utilization of each node will ensure that the pipeline is running at an optimal level, utilizing every core and gig it can to process the data in a given job. This will aid in both pipeline efficiency and cost optimization.
In this way, Kubeflow differs from Databricks, but it can be a benefit as well. As mentioned in part 1 of this series, developers want to avoid moving data across the wire. But if they must, moving data between pods within the same node is more optimal than moving data across nodes in a node pool. In the case of Kubeflow, data can move from pod to pod, within the same machine, reducing latency and overhead, instead of machine to machine as it does in Databricks. So, if ever there is an absolute need to move data across the wire, doing so within the same node is the best solution.
Conclusion
When taking the decision to move from Databricks to Kubeflow, management may face pushback from developers who have become entrenched in Databricks’ operational practices. Movement away from the status quo can create friction, not necessarily because of learning curves, but because of the biases developers have ingrained over their years of working with a particular platform or tool.
Whenever adopting a new tool or platform, a learning curve is expected and required so that developers can become familiar with the novel way of working, hopefully coming to a point where they can leverage these tools to their full potential. That being said, those coming from Databricks to Kubeflow will face a learning curve, especially if they are dealing with Spark, but the aforementioned best practices should ease some of the pain associated with said learning curve.
Initially, developers should become familiar with Docker and how to build Docker images. Within a KFP, this is how all code and dependencies will be called. They should also be mindful of the size of the Docker images they build, learning how to keep them as lean as possible.
From here, a lot of documentation is available on how to set up a basic KFP; but, when dealing with development and production environments, how code will be referenced within a KFP will differ. In the development stage, common volumes should be mounted so that code changes can happen on the fly for testing. Once code is ready for production, it should be included in the referenced Docker image, saved in a private registry, and versioned to keep track of changes.
When dealing with Spark, developers need to understand the differences between Databricks and Kubeflow. In Databricks, you choose the machine type and each driver/executor is an individual machine. In Kubeflow, you have the ability to segment these machines down into smaller drivers and executors to increase parallelism of tasks, increasing efficiency. While it is slightly more difficult to change machine type in Kubeflow, it is far easier to avoid moving data across machines, which would add computational time and expense.
As mentioned in the Part 1 of this series, there is no real answer as to which platform is overtly better. This answer lies within the needs of individual companies and use cases. Be that as it may, if a company wants more control over their pipelines, the size of drivers and executors, their spark sessions, and the underlying infrastructure that powers their workloads, Kubeflow offers unmatched flexibility, customization, and cost efficiency; thus allowing organizations to tailor their pipelines to their exact needs.
Part 1: Scala with Spark Legacy Code Conversion
Part 2: Running within a Kubeflow Pipeline (KFP)
Part 3: Platform Engineering for Scala with Spark