Skip to main content
A Task is the smallest unit of work, designed to perform a specific operation. Each task represents a distinct operation or process that can be executed, such as processing data, performing calculations, or managing resources. Tasks can operate independently or as components of a more complex set of connected tasks known as a Workflow. Tasks are defined by their code, inputs, and dependencies on other tasks. To create tasks, you need to define the input parameters and specify the action to be performed during execution.

Creating a Task

To create a task in Tilebox, define a class that extends the Task base class and implements the execute method. The execute method is the entry point for the task where its logic is defined. It’s called when the task is executed.
from tilebox.workflows import Task, ExecutionContext

class MyFirstTask(Task):
    def execute(self, context: ExecutionContext):
        print("Hello World!")
This example demonstrates a simple task that prints “Hello World!” to the console. For python, the key components of this task are:
MyFirstTask is a subclass of the Task class, which serves as the base class for all defined tasks. It provides the essential structure for a task. Inheriting from Task automatically makes the class a dataclass, which is useful for specifying inputs. Additionally, by inheriting from Task, the task is automatically assigned an identifier based on the class name.
The execute method is the entry point for executing the task. This is where the task’s logic is defined. It’s invoked by a task runner when the task runs and performs the task’s operation.
The context argument is an ExecutionContext instance that provides access to an API for submitting new tasks as part of the same job and features like shared caching.
For Go, the key components are:
MyFirstTask is a struct that implements the Task interface. It represents the task to be executed.
The Execute method is the entry point for executing the task. This is where the task’s logic is defined. It’s invoked by a task runner when the task runs and performs the task’s operation.
The code samples on this page do not illustrate how to execute the task. That will be covered in the next section on task runners. The reason for that is that executing tasks is a separate concern from implementing tasks.

Input Parameters

Tasks often require input parameters to operate. These inputs can range from simple values to complex data structures. By inheriting from the Task class, the task is treated as a Python dataclass, allowing input parameters to be defined as class attributes.
Tasks must be serializable to JSON or to protobuf because they may be distributed across a cluster of task runners.
In Go, task parameters must be exported fields of the task struct (starting with an uppercase letter), otherwise they will not be serialized to JSON.
Supported types for input parameters include:
  • Basic types such as str, int, float, bool
  • Lists and dictionaries of basic types
  • Nested data classes that are also JSON-serializable or protobuf-serializable
class ParametrizableTask(Task):
    message: str
    number: int
    data: dict[str, str]

    def execute(self, context: ExecutionContext):
        print(self.message * self.number)

task = ParametrizableTask("Hello", 3, {"key": "value"})

Task Composition and subtasks

Until now, tasks have performed only a single operation. But tasks can be more powerful. Tasks can submit other tasks as subtasks. This allows for a modular workflow design, breaking down complex operations into simpler, manageable parts. Additionally, the execution of subtasks is automatically parallelized whenever possible.
class ParentTask(Task):
    num_subtasks: int

    def execute(self, context: ExecutionContext) -> None:
        for i in range(self.num_subtasks):
            context.submit_subtask(ChildTask(i))

class ChildTask(Task):
    index: int

    def execute(self, context: ExecutionContext) -> None:
        print(f"Executing ChildTask {self.index}")

# after submitting this task, a task runner may pick it up and execute it
# which will result in 5 ChildTasks being submitted and executed as well
task = ParentTask(5)
In this example, a ParentTask submits ChildTask tasks as subtasks. The number of subtasks to be submitted is based on the num_subtasks attribute of the ParentTask. The submit_subtask method takes an instance of a task as its argument, meaning the task to be submitted must be instantiated with concrete parameters first. Parent task do not have access to results of subtasks, instead, tasks can use shared caching to share data between tasks.
By submitting a task as a subtask, its execution is scheduled as part of the same job as the parent task. Compared to just directly invoking the subtask’s execute method, this allows the subtask’s execution to occur on a different machine or in parallel with other subtasks. To learn more about how tasks are executed, see the section on task runners.

Larger subtasks example

A practical workflow example showcasing task composition might help illustrate the capabilities of tasks. Below is an example of a set of tasks forming a workflow capable of downloading a set number of random dog images from the internet. The Dog API can be used to get the image URLs, and then download them. Implementing this using Task Composition could look like this:
import httpx  # pip install httpx
from pathlib import Path

class DownloadRandomDogImages(Task):
    num_images: int

    def execute(self, context: ExecutionContext) -> None:
        url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}"
        response = httpx.get(url)
        for dog_image in response.json():
            context.submit_subtask(DownloadImage(dog_image["url"]))

class DownloadImage(Task):
    url: str

    def execute(self, context: ExecutionContext) -> None:
        file = Path("dogs") / self.url.split("/")[-1]
        response = httpx.get(self.url)
        with file.open("wb") as file:
            file.write(response.content)
This example consists of the following tasks:
DownloadRandomDogImages fetches a specific number of random dog image URLs from an API. It then submits a DownloadImage task for each received image URL.
DownloadImage downloads an image from a specified URL and saves it to a file.
Together, these tasks create a workflow that downloads random dog images from the internet. The relationship between the two tasks and their formation as a workflow becomes clear when DownloadRandomDogImages submits DownloadImage tasks as subtasks. Visualizing the execution of such a workflow is akin to a tree structure where the DownloadRandomDogImages task is the root, and the DownloadImage tasks are the leaves. For instance, when downloading five random dog images, the following tasks are executed.
from tilebox.workflows import Client

client = Client()
jobs = client.jobs()
job = jobs.submit(
    "download-dog-images",
    DownloadRandomDogImages(5),
)

# now our deployed task runners will pick up the task and execute it

jobs.display(job)
Download Dog Images Workflow
In total, six tasks are executed: the DownloadRandomDogImages task and five DownloadImage tasks. The DownloadImage tasks can execute in parallel, as they are independent. If more than one task runner is available, the Tilebox Workflow Orchestrator automatically parallelizes the execution of these tasks.
Check out job_client.display to learn how this visualization was automatically generated from the task executions.

Task States

Every task goes through a set of states during its lifetime. When submitted, either as a job or as a subtask, it starts in the QUEUED state and transitions to RUNNING when a task runner picks it up. If the task executes successfully, it transitions to COMPUTED. If the task fails, it transitions to FAILED. As soon as all subtasks of a task are also COMPUTED, the task is considered COMPLETED, allowing dependent tasks to be executed. The table below summarizes the different task states and their meanings.
Task StateDescription
QueuedThe task is queued and waiting for execution. Any eligible task runner can pick it up and execute it, as soon as it’s parent task is COMPUTED and all it’s dependencies are COMPLETED.
RunningThe task is currently being executed by a task runner.
ComputedThe task has successfully been computed, but still has outstanding subtasks.
CompletedThe task has successfully been computed, and all it’s subtasks are also computed, making it COMPLETED. This is the final state of a task. Only once a task has been COMPLETED, dependent tasks can be executed.
FailedThe task has been executed but encountered an error.
Task States

Map-Reduce Pattern

Often times the input to a task is a list, with elements that should then be mapped to individual subtasks, whose results are later aggregated in a reduce step. This pattern is commonly known as MapReduce and a common pattern in workflows. In Tilebox, the reduce step is typically defined as a separate task that depends on all the map tasks. For example, the workflow below applies this pattern to a list of numbers to calculate the sum of all squares of the numbers. The Square task takes a single number and squares it, and the Sum task reduces the list of squared numbers to a single sum.
class SumOfSquares(Task):
    numbers: list[int]

    def execute(self, context: ExecutionContext) -> None:
		# 1. Map
        square_tasks = context.submit_subtasks(
            [Square(num) for num in self.numbers]
        )
        # 2. Reduce
        sum_task = context.submit_subtask(Sum(), depends_on=square_tasks)


class Square(Task):  # The map step
    num: int

    def execute(self, context: ExecutionContext) -> None:
        result = self.num ** 2
        # typically the output of a task is a large dataset,
        # so we save individual results into a shared cache
        context.job_cache.group("squares")[str(self.num)] = str(result).encode()
        context.current_task.display = f"Square({self.num})"

class Sum(Task):  # The reduce step
    def execute(self, context: ExecutionContext) -> None:
        result = 0
        # access our cached results from the map step
        squares = context.job_cache.group("squares")
        for key in squares:
            result += int(squares[key].decode())

        print("Sum of squares is:", result)
Submitting a job of the SumOfSquares task and running it with a task runner can be done as follows:
from tilebox.workflows import Client
from tilebox.workflows.cache import InMemoryCache

client = Client()
jobs = client.jobs()
job = jobs.submit(
    "sum-of-squares",
    SumOfSquares([12, 345, 453, 21, 45, 98]),
)

client.runner(tasks=[SumOfSquares, Square, Sum], cache=InMemoryCache()).run_all()

jobs.display(job)
Output
Sum of squares is: 336448
Sum of squares workflow using the map-reduce pattern

Recursive subtasks

Tasks can not only submit other tasks as subtasks, but also instances of themselves. This allows for a recursive breakdown of a task into smaller chunks. Such recursive decomposition algorithms are referred to as divide and conquer algorithms. For example, the RecursiveTask below is a valid task that submits smaller instances of itself as subtasks.
When implementing a recursive task, it’s important to define a base case that stops the recursion. Otherwise, the task will keep submitting subtasks indefinitely, resulting in an infinite loop.
class RecursiveTask(Task):
    num: int

    def execute(self, context: ExecutionContext) -> None:
        print(f"Executing RecursiveTask with num={self.num}")
		# if num < 2, we reached the base case and stop recursion
        if self.num >= 2:
            context.submit_subtask(RecursiveTask(self.num // 2))

Recursive subtask example

An example for this is the random dog images workflow mentioned earlier. In the previous implementation, downloading images was already parallelized. But the initial orchestration of the individual download tasks was not parallelized, because DownloadRandomDogImages was responsible for fetching all random dog image URLs and only submitted the individual download tasks once all URLs were retrieved. For a large number of images this setup can bottleneck the entire workflow. To improve this, recursive subtask submission decomposes a DownloadRandomDogImages task with a high number of images into two smaller DownloadRandomDogImages tasks, each fetching half. This process can be repeated until a specified threshold is met, at which point the Dog API can be queried directly for image URLs. That way, image downloads start as soon as the first URLs are retrieved, without initial waiting. An implementation of this recursive submission may look like this:
class DownloadRandomDogImages(Task):
    num_images: int

    def execute(self, context: ExecutionContext) -> None:
        if self.num_images > 4:
            half = self.num_images // 2
            remaining = self.num_images - half  # account for odd numbers
            context.submit_subtask(DownloadRandomDogImages(half))
            context.submit_subtask(DownloadRandomDogImages(remaining))
        else:
            url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}"
            response = httpx.get(url)
            for dog_image in response.json()[:self.num_images]:
                context.submit_subtask(DownloadImage(dog_image["url"]))
With this implementation, downloading a large number of images (for example, 9) results in the following tasks being executed:
Download Dog Images Workflow implemented recursively

Retry Handling

By default, when a task fails to execute, it’s marked as failed. In some cases, it may be useful to retry the task multiple times before marking it as a failure. This is particularly useful for tasks dependent on external services that might be temporarily unavailable. Tilebox Workflows allows you to specify the number of retries for a task using the max_retries argument of the submit_subtask method. Check out the example below to see how this might look like in practice.
A failed task may be picked up by any available runner and not necessarily the same one that it failed on.
import random

class RootTask(Task):
    def execute(self, context: ExecutionContext) -> None:
        context.submit_subtask(FlakyTask(), max_retries=5)

class FlakyTask(Task):
    def execute(self, context: ExecutionContext) -> None:
        print(f"Executing FlakyTask")

        if random.random() < 0.1:
            raise Exception("FlakyTask failed randomly")

Dependencies

Tasks often rely on other tasks. For example, a task that processes data might depend on a task that fetches that data. Tasks can express their dependencies on other tasks by using the depends_on argument of the submit_subtask method. This means that a dependent task will only execute after the task it relies on has successfully completed.
The depends_on argument accepts a list of tasks, enabling a task to depend on multiple other tasks.
A workflow with dependencies might look like this:
class RootTask(Task):
    def execute(self, context: ExecutionContext) -> None:
        first_task = context.submit_subtask(
          PrintTask("Executing first")
        )
        second_task = context.submit_subtask(
          PrintTask("Executing second"), 
          depends_on=[first_task],
        )
        third_task = context.submit_subtask(
          PrintTask("Executing last"),
          depends_on=[second_task],
        )

class PrintTask(Task):
    message: str

    def execute(self, context: ExecutionContext) -> None:
        print(self.message)
The RootTask submits three PrintTask tasks as subtasks. These tasks depend on each other, meaning the second task executes only after the first task has successfully completed, and the third only executes after the second completes. The tasks are executed sequentially.
If a task upon which another task depends submits subtasks, those subtasks must also execute before the dependent task begins execution.

Dependencies Example

A practical example is a workflow that fetches news articles from an API and processes them using the News API.
from pathlib import Path
import json
from collections import Counter
import httpx  # pip install httpx

class NewsWorkflow(Task):
    category: str
    max_articles: int

    def execute(self, context: ExecutionContext) -> None:
        fetch_task = context.submit_subtask(FetchNews(self.category, self.max_articles))
        context.submit_subtask(PrintHeadlines(), depends_on=[fetch_task])
        context.submit_subtask(MostFrequentAuthors(), depends_on=[fetch_task])

class FetchNews(Task):
    category: str
    max_articles: int

    def execute(self, context: ExecutionContext) -> None:
        url = f"https://newsapi.org/v2/top-headlines?category={self.category}&pageSize={self.max_articles}&country=us&apiKey=API_KEY"
        news = httpx.get(url).json()
        # check out our documentation page on caches to learn
        # about a better way of passing data between tasks
        Path("news.json").write_text(json.dumps(news))

class PrintHeadlines(Task):
    def execute(self, context: ExecutionContext) -> None:
        news = json.loads(Path("news.json").read_text())
        for article in news["articles"]:
            print(f"{article['publishedAt'][:10]}: {article['title']}")

class MostFrequentAuthors(Task):
    def execute(self, context: ExecutionContext) -> None:
        news = json.loads(Path("news.json").read_text())
        authors = [article["author"] for article in news["articles"]]
        for author, count in Counter(authors).most_common():
            print(f"Author {author} has written {count} articles")

# now submit a job, and then visualize it
job = job_client.submit("process-news",
    NewsWorkflow(category="science", max_articles=5),
)
Output
2024-02-15: NASA selects ultraviolet astronomy mission but delays its launch two years - SpaceNews
2024-02-15: SpaceX launches Space Force mission from Cape Canaveral - Orlando Sentinel
2024-02-14: Saturn's largest moon most likely uninhabitable - Phys.org
2024-02-14: AI Unveils Mysteries of Unknown Proteins' Functions - Neuroscience News
2024-02-14: Anthropologists' research unveils early stone plaza in the Andes - Phys.org
Author Jeff Foust has written 1 articles
Author Richard Tribou has written 1 articles
Author Jeff Renaud has written 1 articles
Author Neuroscience News has written 1 articles
Author Science X has written 1 articles
Process News Workflow
This workflow consists of four tasks:
TaskDependenciesDescription
NewsWorkflow-The root task of the workflow. It spawns the other tasks and sets up the dependencies between them.
FetchNews-A task that fetches news articles from the API and writes the results to a file, which is then read by dependent tasks.
PrintHeadlinesFetchNewsA task that prints the headlines of the news articles to the console.
MostFrequentAuthorsFetchNewsA task that counts the number of articles each author has written and prints the result to the console.
An important aspect is that there is no dependency between the PrintHeadlines and MostFrequentAuthors tasks. This means they can execute in parallel, which the Tilebox Workflow Orchestrator will do, provided multiple task runners are available.
In this example, the results from FetchNews are stored in a file. This is not the recommended method for passing data between tasks. When executing on a distributed cluster, the existence of a file written by a dependent task cannot be guaranteed. Instead, it’s better to use a shared cache.

Task Identifiers

A task identifier is a unique string used by the Tilebox Workflow Orchestrator to identify the task. It’s used by task runners to map submitted tasks to a task class and execute them. It also serves as the default name in execution visualizations as a tree of tasks. If unspecified, the identifier of a task defaults to the class name. For instance, the identifier of the PrintHeadlines task in the previous example is "PrintHeadlines". This is good for prototyping, but not recommended for production, as changing the class name also changes the identifier, which can lead to issues during refactoring. It also prevents different tasks from sharing the same class name. To address this, Tilebox Workflows offers a way to explicitly specify the identifier of a task. This is done by overriding the identifier method of the Task class. This method should return a unique string identifying the task. This decouples the task’s identifier from its class name, allowing you to change the identifier without renaming the class. It also allows tasks with the same class name to have different identifiers. The identifier method can also specify a version number for the task—see the section on semantic versioning below for more details.
class MyTask(Task):
    def execute(self, context: ExecutionContext) -> None:
        pass

# MyTask has the identifier "MyTask" and the default version of "v0.0"

class MyTask2(Task):
    @staticmethod
    def identifier() -> tuple[str, str]:
        return "tilebox.com/example_workflow/MyTask", "v1.0"

    def execute(self, context: ExecutionContext) -> None:
        pass

# MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0"
In python, the identifier method must be defined as either a classmethod or a staticmethod, meaning it can be called without instantiating the class.

Semantic Versioning

As seen in the previous section, the identifier method can return a tuple of two strings, where the first string is the identifier and the second string is the version number. This allows for semantic versioning of tasks. Versioning is important for managing changes to a task’s execution method. It allows for new features, bug fixes, and changes while ensuring existing workflows operate as expected. Additionally, it enables multiple versions of a task to coexist, enabling gradual rollout of changes without interrupting production deployments. You assign a version number by overriding the identifier method of the task class. It must return a tuple of two strings: the first is the identifier and the second is the version number, which must match the pattern vX.Y (where X and Y are non-negative integers). X is the major version number and Y is the minor version. For example, this task has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.3":
class MyTask(Task):
    @staticmethod
    def identifier() -> tuple[str, str]:
        return "tilebox.com/example_workflow/MyTask", "v1.3"

    def execute(self, context: ExecutionContext) -> None:
        pass
When a task is submitted as part of a job, the version from which it’s submitted is recorded and may differ from the version on the task runner executing the task. When task runners execute a task, they require a registered task with a matching identifier and compatible version number. A compatible version is where the major version number on the task runner matches that of the submitted task, and the minor version number on the task runner is equal to or greater than that of the submitted task. Examples of compatible version numbers include:
  • MyTask is submitted as part of a job. The version is "v1.3".
  • A task runner with version "v1.3" of MyTask would executes this task.
  • A task runner with version "v1.5" of MyTask would also executes this task.
  • A task runner with version "v1.2" of MyTask would not execute this task, as its minor version is lower than that of the submitted task.
  • A task runner with version "v2.5" of MyTask would not execute this task, as its major version differs from that of the submitted task.

Conclusion

Tasks form the foundation of Tilebox Workflows. By understanding how to create and manage tasks, you can leverage Tilebox’s capabilities to automate and optimize your workflows. Experiment with defining your own tasks, utilizing subtasks, managing dependencies, and employing semantic versioning to develop robust and efficient workflows.