PrefectInterfaces
Documentation for PrefectInterfaces.
PrefectInterfaces.PrefectInterfaces
PrefectInterfaces.Datasets.Dataset
PrefectInterfaces.Datasets.PrefectDatastoreNames
PrefectInterfaces.LocalFSBlock
PrefectInterfaces.PrefectAPI
PrefectInterfaces.PrefectBlock
PrefectInterfaces.S3BucketBlock
PrefectInterfaces.SecretBlock
PrefectInterfaces.SecretString
Base.read
Base.write
PrefectInterfaces.Datasets.block_selector
PrefectInterfaces.getblock
PrefectInterfaces.ls
PrefectInterfaces.makeblock
PrefectInterfaces
PrefectInterfaces.PrefectInterfaces
— ModulePrefectInterfaces.jl provides composite types to enable:
- Interactive REPL connections to a Prefect server.
- Prefect deployed julia processes that can be called from the python API with parameters.
Use of this package requires a running Prefect server to make connections to the Prefect server REST API, configured with the environment variable PREFECT_API_URL
or by specifying the endpoint url in constructor calls. Calling PrefectBlock(blockname::String)
retrieves Prefect block information by name, and thus julia modules can be built to connect to resources defined by those blocks.
Examples
julia> using PrefectInterfaces
julia> ENV["PREFECT_API_URL"] = "http://127.0.0.1:4300/api";
julia> ls()
5-element Vector{Any}:
"aws-credentials/subdivisions"
"docker-container/lamneth"
"local-file-system/willowdata"
"process/red-barchetta"
"string/syrinx"
julia> dump(PrefectBlock("local-file-system/willowdata"))
PrefectBlock
blockname: String "local-file-system/willowdata"
block: LocalFSBlock
blockname: String "local-file-system/willowdata"
blocktype: String "local-file-system"
basepath: String "/Users/mahiki/willowdata/dev"
read_path: #4 (function of type PrefectInterfaces.var"#4#5"{String})
basepath: String "/Users/mahiki/willowdata/dev"
PrefectInterfaces.LocalFSBlock
— TypeLocalFSBlock(blockname::String, blocktype::String, basepath::String)
Corresponds with the Prefect LocalFileSystem block. Attached functions:
read_path("path/to/file.csv")
write_path("path/to/file.csv", df::AbstractDataFrame)
Returns or writes a DataFrame file at "LocalFSBlock.basepath/path/to/file.csv".
Examples:
julia> fsblock = PrefectBlock("local-file-system/willowdata");
julia> df = fsblock.block.read_path("extracts/csv/dataset=test_table/rundate=2023-05-25/data.csv")
1×2 DataFrame
Row │ column1 result
│ String String7
─────┼────────────────────────────────────────────
1 │ If you can select this table you… PASSED
PrefectInterfaces.PrefectAPI
— TypePrefectAPI(url::String, key::SecretString) <:AbstractPrefectInterface
PrefectAPI(url::String)
PrefectAPI()
Mutable struct tha stores the Prefect server api endpoint. All PrefectInterface
operations depend on connecting to a running Prefect server to pull block information. Constructor with no arguments assigns env variables PREFECT_API_URL
, PREFECT_API_KEY
If PREFECT_API_KEY
does not exist then an empty string is assigned to key
. For Prefect Server with no authentication (or with auth managed by connection string) the empty key will not interfere with API calls.
Examples:
julia> using PrefectInterfaces
julia> ENV["PREFECT_API_URL"] = "http://127.0.0.1:4300/api"; ENV["PREFECT_API_KEY"] = "zyxw4321";
julia> api = PrefectAPI()
PrefectAPI("http://127.0.0.1:4300/api", ####Secret####)
julia> api.url
"http://127.0.0.1:4300/api"
julia> api.key.secret
"zyxw4321"
julia> api.url = "https://api.prefect.cloud/api/accounts/0eEXAMPLE";
julia> api.key = SecretString("abcd1234")
####Secret####
julia> api = PrefectAPI("https://api.prefect.cloud/api/accounts/0eEXAMPLE", "abcd1234")
PrefectAPI("https://api.prefect.cloud/api/accounts/0eEXAMPLE", ####Secret####)
PrefectInterfaces.PrefectBlock
— TypePrefectBlock <: AbstractPrefectBlock
Constructors:
PrefectBlock(blockname::String)
PrefectBlock(blockname::String, api::PrefectAPI)
PrefectBlock(blockname::String, block::AbstractPrefectBlock)
Returns a Prefect Block from the Prefect server, the block data is stored in the block
field. Prefect Block names are strings called 'slugs', formatted as block-type-name/block-name
. A Prefect Block is uniquely specified by its name and the Prefect DB where it is stored, therefore the API URL is necessary for the constructor. The single-argument constructor takes the default PrefectAPI() as the Prefect Server endpoint.
A non-server block can be constructed by supplying an AbstractPrefectBlock object.
The AbstractPrefectBlock types are meant to mirror the functionality defined in the Prefect Python API, for example LocalFSBlock
has a write_path()
method attached which only writes to paths relative from the block basepath.
Examples:
julia> using PrefectInterfaces
julia> spec_fsblock = LocalFSBlock("local-file-system/xanadu", "local-file-system", "/usr/mahiki/xanadu/dev");
julia> fsblock = PrefectBlock("local-file-system/xanadu", spec_fsblock);
julia> fsblock.blockname
"local-file-system/xanadu"
julia> propertynames(fsblock.block)
(:blockname, :blocktype, :basepath, :read_path, :write_path)
julia> fsblock.block.basepath
"/usr/mahiki/xanadu/dev"
PrefectInterfaces.S3BucketBlock
— TypeS3BucketBlock(
blockname, blocktype, bucket_name, bucket_folder
, region_name, aws_access_key_id, aws_secret_access_key)
Corresponds with the Prefect S3Bucket block in the prefect-aws integration. Attached functions:
read_path("path/to/object.csv")
write_path("path/to/object.csv", df::AbstractDataFrame)
Returns or writes a DataFrame csv object at a relative key from the block-defined s3:://bucket_name/bucket_folder/path/to/object.csv
.
Examples:
# pull hypothetical existing block from Prefect DB server
julia> s3block = PrefectBlock("s3-bucket/willowdata")
S3BucketBlock("s3-bucket/willowdata", "s3-bucket", "willowdata", "data-folder/dev", "us-west-2"
, "AKIAEXAMPLEXXX", ####Secret####, ...)
julia> df = s3block.block.read_path("extracts/csv/dataset=test_table/rundate=2023-05-25/data.csv");
julia> s3block.block.write_path("testfolder/xanadu-test.csv", df)
p"s3://willowdata/data-folder/dev/testfolder/xanadu-test.csv"
PrefectInterfaces.SecretBlock
— TypeSecretBlock(blockname::String, blocktype::String, value::SecretString) <: AbstractPrefectBlock
A struct for storing a Prefect Block with a SecretString as value field. This permits retrieving secrets from the Prefect Server/Prefect Cloud. The secret field is accessible via the value.secret
field.
NOTE: This is not an ecrypted secrets store, it is a log obfuscator.
Example:
julia> using PrefectInterfaces
julia> secretblock = SecretBlock("secret", "necromancer", "abcd1234")
SecretBlock("secret", "necromancer", ####Secret####)
julia> dump(secretblock, maxdepth = 10)
SecretBlock
blockname: String "secret"
blocktype: String "necromancer"
value: SecretString
julia> secretblock.value.secret
"abcd1234"
PrefectInterfaces.SecretString
— TypeSecretString(secret::String) <:Any
A struct for storing secret values with overrides of show
and dump
to prevent its field from being exposed in plaintext in logs. The secret field is accessible via the secret
field.
NOTE: This is not an ecrypted secrets store, it is a log obfuscator.
Example:
julia> using PrefectInterfaces
julia> password = SecretString("abcd1234")
####Secret####
julia> show(password)
####Secret####
julia> password.secret
"abcd1234"
PrefectInterfaces.getblock
— Methodgetblock(blockname::String; api::AbstractPrefectConfig = PrefectAPI())
Makes an HTTP.get()
call to provided URL endpoint, default endpoint constructed by PrefectAPI
. Returns a Dict containing the Prefect Block specification.
PrefectInterfaces.ls
— Methodls(; type="block", api::PrefectAPI = PrefectAPI())
Calls the Prefect server and returns a list of all defined blocks as Vector{String}. Default is to list all blocks, later implementation could include "flows", "deployments", "work-pool" etc. See PrefectAPI
docs for details about authentication if needed.
Examples:
julia> ENV["PREFECT_API_URL"] = "http://127.0.0.1:4300/api";
julia> ls()
11-element Vector{Any}:
"aws-credentials/subdivisions"
"docker-container/lamneth"
"github/dev"
"github/main"
"local-file-system/willowdata"
"process/red-barchetta"
"s3/necromancer"
"s3-bucket/willowdata"
"secret/necromancer"
"slack-webhook/bytor-alert"
"string/environment"
PrefectInterfaces.makeblock
— Methodmakeblock(block_api_data::Dict)
Instantiates a new PrefectBlock
, choosing the concrete type based on block data returned from the api call for block documents.
Datasets
PrefectInterfaces.Datasets.Dataset
— TypeDataset(dataset_name=str::String; kwargs...)
An object that stores configuration, and file path locations. Assertions constrain valid field values. If rundate
is not the current date, the latest_path
will not be used.
NOTE: No positional arguments allowed b/c of @kw_def, keyworded args only.
Supported keyword arguments (default show first):
dataset_name::String (required)
datastore_type ∈ ["local", "remote"]
dataset_type ∈ ["extracts", "reports", "images"]
file_format ∈ ["csv"]
rundate::Date = Datest.today()
rundate_type ∈ ["latest", "specific"]
Read/write behavior depends on rundate/rundate_type combination as follows:
rundate_type| rundate || read | write
------------|------------||---------|-----------------------------------------
latest | == today || latest | [latest, rundate] default option
latest | != today || latest | [latest] dont write to date partition (rare)
specific | == today || rundate | [rundate]
specific | != today || rundate | [rundate]
Examples
julia> begin
ENV["PREFECT_DATA_BLOCK_LOCAL"] = "local-file-system/willowdata"
ENV["PREFECT_API_URL"] = "http://127.0.0.1:4300/api"
end;
julia> ds = Dataset(dataset_name="test_table", datastore_type="local")
Dataset
dataset_name: String "test_table"
datastore_type: String "local"
dataset_type: String "extracts"
file_format: String "csv"
rundate: Dates.Date
rundate_type: String "latest"
dataset_path: String "extracts/csv/dataset=test_table/rundate=2023-07-24/data.csv"
latest_path: String "extracts/csv/latest/dataset=test_table/data.csv"
image_path: String "extracts/dataset=test_table/rundate=2023-07-24"
julia> df = read(ds)
1×2 DataFrame
Row │ column1 result
│ String String7
─────┼────────────────────────────────────────────
1 │ If you can select this table you… PASSED
PrefectInterfaces.Datasets.PrefectDatastoreNames
— TypePrefectDatastoreNames(remote::String, local::String, default::String) <: AbstractPrefectInterface
A struct to store the names of Prefect blocks which reference local and remote file storage. The default constructor pulls the names from ENV variables, and default field default is "local".
PREFECT_DATA_BLOCK_REMOTE
PREFECT_DATA_BLOCK_LOCAL
PREFECT_DATASTORE_DEFAULT_TYPE ∈ ["local", "remote"]
Base.read
— Methodread(ds::Dataset)
Returns a DataFrame
by calling CSV.read
on a filepath defined by the Dataset type.
NOTE: A prefect server must be available to use Dataset read function.
Examples
julia> begin
ENV["PREFECT_API_URL"] = "http://127.0.0.1:4300/api"
ENV["PREFECT_DATA_BLOCK_LOCAL"] = "local-file-system/willowdata"
end;
julia> df = read(Dataset(dataset_name="test_table", datastore_type="local"))
1×2 DataFrame
Row │ column1 result
│ String String7
─────┼────────────────────────────────────────────
1 │ If you can select this table you… PASSED
Base.write
— Methodwrite(ds::Dataset, df::DataFrame)
Writes a DataFrame
via CSV.write
to a filepath defined by the Dataset
type.
NOTE: A prefect server must be available to use Dataset read function.
PrefectInterfaces.Datasets.block_selector
— Methodblock_selector(ds::Dataset)
Returns the PrefectBlock corresponding to the remote or local datastore type. This block will be used in read/write of dataset paths.