sparqlin
is a Spark SQL framework designed to simplify job creation and management in
Databricks environments. It integrates with Spark SQL and PySpark for a streamlined development
experience.
The framework was specifically created to empower data analysts who may not have deep development skills. It provides a streamlined approach to adopting standard software development life cycles, enabling analysts to focus on working with data without the need to master complex programming paradigms. By leveraging familiar tools like SQL scripts and YAML files, the framework simplifies tasks such as data configuration, transformation, and testing.
This enables teams to:
pytest
.GitPython
and system monitoring via psutil
.You can install sparqlin
directly from PyPI using pip
.
pip install sparqlin
sparqlin
requires Python 3.11 or higher. Ensure you have the following
dependencies installed:
pyspark>=3.5.0
pytest
pyyaml
psutil
gitpython
To use sparqlin
for creating and running Spark SQL jobs in Databricks, follow these steps:
.sql
code> files with
queries that will run on Databricks.
Here is a typical layout of an analytical project:
databricks_default_project/
|-- README.md
|-- sql/
| |-- query1.sql
| |-- query2.sql
| |-- ...
|-- tests/
| |-- __init__.py
| |-- test_query1.py
| |-- test_query2.py
| |-- ...
|-- resources/
| |-- databricks_default_project.job.yml
|-- databricks.yml
databricks.yml
...
# The example job configuration for databricks_default_project.
resources:
jobs:
databricks_default_python_job:
trigger:
# Run this job every day, exactly one day from the last run;
# see https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- some_email@some_domain.com
job_clusters:
- job_cluster_key: job_cluster
new_cluster:
spark_version: 15.4.x-scala2.12
node_type_id: i3.xlarge
autoscale:
min_workers: 1
max_workers: 1
tasks:
- task_key: query1_sparqlin
# existing_cluster_id: ${var.cluster_id} # Existing cluster can be used instead
job_cluster_key: job_cluster
libraries:
- pypi:
package: sparqlin==0.1.11 # Install your package via PyPI (or custom repository)
# - whl: ../dist/*.whl # Alternatively, you can upload the sparqlin wheel into Volume
python_wheel_task:
package_name: "sparqlin" # Package name as defined in your setup.py or pyproject.toml
entry_point: "sparqlin" # Entry point defined in the package
named_parameters:
sql-query-path: "${workspace.root_path}/files/sql/query1.sql"
table-name: "sparkdev.default.taxi_top_five"
- task_key: query2_sparqlin
depends_on:
- task_key: query1_sparqlin
# existing_cluster_id: ${var.cluster_id} # Existing cluster can be used instead
job_cluster_key: job_cluster
libraries:
- pypi:
package: sparqlin==0.1.11 # Install your package via PyPI (or custom repository)
# - whl: ../dist/*.whl # Alternatively, you can upload the sparqlin wheel into Volume
python_wheel_task:
package_name: "sparqlin" # Package name as defined in your setup.py or pyproject.toml
entry_point: "sparqlin" # Entry point defined in the package
named_parameters:
sql-query-path: "${workspace.root_path}/files/sql/query2.sql"
table-name: "sparkdev.default.taxi_count"
# You can also continue `tasks` to use other frameworks or task types that Databricks support
This example tests loading datasets into Spark DataFrames from YAML configuration files. It uses
pytest
fixtures to dynamically provide the datasets_path
.
from sparqlin.testing.helpers import get_spark_dataframe
@pytest.mark.parametrize("datasets_path", ["tests/testing/datasets_test/datasets.yml"], indirect=True)
def test_base_test_config(spark_session, datasets_path):
# Load test table as DataFrame
test_table_df = get_spark_dataframe(spark_session, datasets_path, "testdb.test_table")
second_table_df = get_spark_dataframe(spark_session, datasets_path, "testdb.second_table")
# Validate record counts
assert test_table_df.count() == 3
assert second_table_df.count() == 2
This example demonstrates how to use BaseTestConfig
to register tables as temporary datasets in
Spark and perform SQL operations.
from sparqlin.testing.base_test_config import BaseTestConfig
def test_hive_table_operations(hive_data_yaml, tmp_path_factory):
datasets_file, tmp_path = hive_data_yaml
# Initialize BaseTestConfig
config = BaseTestConfig(tmp_path_factory)
# Set datasets location
config.DATASETS_LOCATION = datasets_file
# Create Spark session
spark = config.create_spark_session()
# Register tables from YAML file
config.register_tables(spark)
# Verify table registration
test_table_df = spark.sql("SELECT * FROM testdb.test_table")
second_table_df = spark.sql("SELECT * FROM testdb.second_table")
assert test_table_df.count() == 3
# Perform join operation
joined_df = test_table_df.join(second_table_df, test_table_df.id == second_table_df.id)
joined_results = joined_df.select("name", "value").collect()
assert len(joined_results) == 2
assert any(row.name == "Alice" and row.value == 100 for row in joined_results)
git clone https://gitlab.com/rokorolev/sparqlin.git
cd sparqlin
pip install -r requirements.txt
The framework uses pytest
for testing. You can run the test suite as follows:
pytest
Install Build Tools:
pip install setuptools wheel
Build the Package:
rm -rf build dist *.egg-info
python setup.py sdist bdist_wheel
Install Twine:
pip install twine
Generate token for PyPI account.
Upload the Package:
twine upload dist/*
This project is licensed under the MIT License. See the LICENSE for details.
Contributions are welcome! Feel free to fork the repository, create a feature branch, and submit a pull request. Please ensure proper test coverage for new functionality.
If you encounter a bug or have a feature request, please open an issue on the project's GitLab repository.
Developed and maintained by Roman Korolev.