Julia Demo on REPL
This demonstrates interacting with a running Prefect DB from the Julia REPL. If you don't have a Prefect Server instance running, see the Prefect Installation doc first. Block information is usually pulled from the Prefect DB, but Prefect Block types can also be manually created from the constructors.
See files in the test
folder for examples of Block usage and loading data from Prefect DB, some require the Prefect DB to run and some tests simply construct dummy objects without connecting to Prefect.
Entering the Julia REPL from the just julia
command will inject the .env
variables. Execute the just
commands from the appropriate directory as shown.
$ cd ./prefect/
$ just launch
# to verify server is running, and the API PORT:
just view main
# CTRL-b, d to exit
$ cd ../julia-demo
# start julia in current project, env vars will be loaded as well
$ just julia
Examples
- Call the
PrefectAPI
function - Access the secret string in an AWS Credentials block via
.secret
field
julia> using PrefectInterfaces
# returns the current endpoint from env
PrefectAPI().url
# "http://127.0.0.1:4300/api"
# PrefectAPI is called by various functions to retreive the current API env value
ENV["PREFECT_API_URL"] = "http://127.0.0.1:4301/api";
PrefectAPI().url
# "http://127.0.0.1:4301/api"
# Construct an example, normally this is pulled from DB if such a block
# exists with PrefectBlock("aws-credentials/subdivisions")
creds = AWSCredentialsBlock(
"aws-credentials/subdivisions"
, "aws-credentials"
, "us-west-2"
, "AKIAXXX999XXX999"
, "GUUxx87987xxPXH")
# AWSCredentialsBlock("aws-credentials/subdivisions", "aws-credentials", "us-west-2", "AKIAXXX999XXX999", ####Secret####)
creds.aws_secret_access_key
# ####Secret####
creds.aws_secret_access_key.secret
# "GUUxx87987xxPXH"
The secret is obfuscated, to prevent it being visible in logs.
There are other AbstractPrefectBlock
types, see list below. These facilitate interactions with Blocks in your Prefect instance, they are primary organizing abstractions in the Prefect world.
julia> names(PrefectInterfaces);
subtypes(PrefectInterfaces.AbstractPrefectBlock)
# AWSCredentialsBlock
# CredentialPairBlock
# LocalFSBlock
# PrefectBlock
# S3BucketBlock
# StringBlock
# SecretBlock
Shut down the server after exiting julia.
$ cd ../prefect
$ just kill
Dataset Type
This type is an opinionated means of organizing data artifacts by name. This is not a part of the Prefect API, and can be disregarded. Dataset is not a dependency of the Prefect types that are meant to constitute an unofficial 'Prefect Julia SDK'.
This is a lightweight organizational construct for reading/writing data artifacts as a part of orchestrated data pipelines. The type merely holds metadata about named data sets and where they should be found or placed in a file system that is defined by a Prefect Block. The data files get arranged in a hive-ish file structure that allows tracking experiment results or daily extracts. The layout assumes partitions of daily data, additing additional partitions to the struct definition wouldn't be difficult.
The fields of the Dataset type are populated by env variables (loaded from a .env
file) or defined in the constructor. The env variables PREFECT_DATA_BLOCK_REMOTE
, PREFECT_DATA_BLOCK_LOCAL
are used by the PrefectDatastoreNames()
to return the names of your Prefect blocks which define remote or local storage.
ENV["PREFECT_API_URL"] = "http://127.0.0.1:4300/api"
ENV["PREFECT_DATA_BLOCK_LOCAL"] = "local-file-system/willowdata"
ENV["PREFECT_DATA_BLOCK_REMOTE"] = "local-file-system/willowdata"
# NOTE: defining the same, unless you have a remote storage block registered
ds = Dataset(dataset_name="limelight_moving_pictures", datastore_type="local")
using DataFrames
df = DataFrame(
flag = [false, true, false, true, false, true]
, amt = [19.00, 11.00, 35.50, 32.50, 5.99, 5.99]
, qty = [1, 4, 1, 3, 21, 109]
, item = ["B001", "B001", "B020", "B020", "BX00", "BX00"]
, day = ["2021-01-01", "2021-01-01", "2112-12-12", "2020-10-20", "2021-05-04", "1984-07-04"]
);
write(ds, df)
# "$HOME/willowdata/main/extracts/csv/latest/dataset=limelight_moving_pictures/data.csv"
# "$HOME/willowdata/main/extracts/csv/dataset=limelight_moving_pictures/rundate=2023-08-14/data.csv"
dfr = read(ds)
# 6×5 DataFrame
# Row │ flag amt qty item day
# ... etc
The read
and write
functions are calling the Prefect Server API to retrieve block information, in this case the LocalFilesystem.basepath
attribute.
Notice the write
function writes to two locations unless rundate_type="specific"
. This is for the use-case of running a backfill of historical daily data without affecting the 'latest' path. The 'latest' folder is a convenience rather than creating a module that reads file metdata.
tree $HOME/willowdata/main/extracts
$HOME/willowdata/main/extracts
└── csv
├── dataset=limelight_moving_pictures
│ └── rundate=2023-08-14
│ └── data.csv
└── latest
└── dataset=limelight_moving_pictures
└── data.csv
Reading/writing a specific rundate:
# writing a specific rundate
ds1 = Dataset(dataset_name="test_dataset_specific", datastore_type="local", rundate=Date("2112-03-15"))
# Dataset
# dataset_name: String "test_dataset_specific"
# datastore_type: String "local"
# dataset_type: String "extracts"
# file_format: String "csv"
# rundate: Date
# rundate_type: String "specific"
# dataset_path: String "extracts/csv/dataset=test_dataset_specific/rundate=2112-03-15/data.csv"
# latest_path: String "extracts/csv/latest/dataset=test_dataset_specific/data.csv"
# image_path: String "extracts/dataset=test_dataset_specific/rundate=2112-03-15"
write(ds1, df)
# "$HOME/willowdata/main/extracts/csv/dataset=test_dataset_specific/rundate=2112-03-15/data.csv"
# note only one path was written. the 'latest_path' was not.
shell> ls -la "$HOME/willowdata/main/$(ds1.latest_path)"
# ls: cannot access '$HOME/willowdata/main/extracts/csv/latest/dataset=test_dataset_specific/data.csv': No such file or directory
shell> ls -la "$HOME/willowdata/main/$(ds1.dataset_path)"
# -rw-r--r-- 1 segovia staff 196 Aug 14 15:45 '$HOME/willowdata/main/extracts/csv/dataset=test_dataset_specific/rundate=2112-03-15/data.csv'
# the 'read' function knows to read the correct path
df1 = read(ds1);
df1 == dfr
# true
The datastore now looks like this:
/Users/segovia/willowdata/main/extracts/
└── csv
├── dataset=test_dataset_specific
│ └── rundate=2112-03-15
│ └── data.csv
├── dataset=limelight_moving_pictures
│ └── rundate=2023-08-14
│ └── data.csv
└── latest
└── dataset=limelight_moving_pictures
└── data.csv
Environment
The Prefect types pull information from a running Prefect DB, by calling the REST API stored in PREFECTAPIURL. If the julia REPL session is called from a just
command, the .env variables will be exported into the environment. In application code you need to either set ENV["PREFECT_API_URL"]="http://127.0.0.1:4300/api"
(for example) or use the ConfigEnv
package as shown below to load the .env
file from the Julia application.
The Dataset
read/write functions depend on the local and remote data block names being defined in environment variables.
# .env file imported with ConfigEnv.dotent(), or just assignment:
using ConfigEnv
dotenv(".env", overwrite=false)
# all the Prefect env variables are now loaded into the Julia environment
ENV["PREFECT_DATA_BLOCK_REMOTE"]
# "s3-bucket/willowdata"
# or just set them manually
begin
ENV["PREFECT_API_URL"] = "http://127.0.0.1:4300/api"
ENV["PREFECT_DATA_BLOCK_LOCAL"] = "local-file-system/willowdata"
ENV["PREFECT_DATA_BLOCK_REMOTE"] = "s3-bucket/willowdata"
end
For interactive work, entering the Julia REPL from the just julia
command will inject the .env
variables.