fedflow.core package

Process communication

classes in this source file are used for communication among processes, user should not use them directly. the start and stop action of listener should only be called in fedflow framework.

class fedflow.core.message.Handler[source]

Bases: object

The basic class of message handler

abstract handle(source: str, cmd: str, data: dict)None[source]

handle message from other process.

Parameters
  • source – where message from, generally, it is a uuid string.

  • cmd – command, it represents the action to be performed.

  • data – the payload data of message.

Returns

class fedflow.core.message.Message(source, cmd, data)

Bases: tuple

The message data structure communication among processes.

property cmd

the command of this message

property data

the payload data of this message

property source

where message from

class fedflow.core.message.MessageListener[source]

Bases: object

logger = <Logger fedflow.msglistener (INFO)>
classmethod start()None[source]

start listen message

Returns

classmethod run()None[source]
classmethod register_handler(source: str, handler: fedflow.core.message.Handler, overwrite=False)None[source]

register handler for specify source.

Parameters
  • source – every handler need a source, and the source cannot be same to the source of MessageListener

  • handler – an instance of subclass of Handler

  • overwrite – whether overwrite handler if it exists

Returns

classmethod register_default_handler(default_handler)[source]

register default handler and the action will overwrite previous default handler.

Parameters

default_handler – an instance of subclass of Handler, it will handle all message which has no specify handler. In init, the default handler will do nothing.

Returns

classmethod stop()[source]

stop listening.

Returns

classmethod mq()multiprocessing.context.BaseContext.Queue[source]

Get message queue

Returns

Schedule core

The core of fedflow schedule.

GroupScheduler is responsible for the parallel operation of scheduling tasks. Generally, user should not use GroupScheduler directly.

class fedflow.core.scheduler.GroupScheduler[source]

Bases: object

The scheduler of TaskGroup.

logger = <Logger fedflow.scheduler (INFO)>
classmethod schedule(group: fedflow.core.taskgroup.TaskGroup)None[source]

The entry of schedule.

This method is blocked.

Parameters

group – the task group waiting for scheduling.

Returns

classmethod cpu_free()bool[source]

check cpu utilization.

Returns

a bool value.

classmethod memory_free(require_memory: Optional[Union[int, str]] = None)bool[source]

check memory utilization

Parameters

require_memory – the memory current task required. the type of require_memory can be int(the unit is Byte) or str(number + unit, for example, ‘123KB’, ‘456 MB’, ‘789MiB’).

Returns

a bool value

classmethod assign_cuda(require_cuda_memory=None, device: Optional[str] = None)[source]

assign a cuda device.

Parameters
  • require_cuda_memory – the cuda memory current task required.

  • device – specify a device, then other device will be ignored.

Returns

An integer represents the cuda id

classmethod parse_memory_value(value)[source]

Task

The basic class of task.

User define task by inherit the Task class and overwrite load and train methods.

class fedflow.core.task.Task(task_id: Optional[Union[int, str]] = None, *, estimate_memory: Optional[Union[int, str]] = None, estimate_cuda_memory: Optional[Union[int, str]] = None, device=None)[source]

Bases: object

the basic class of all user task

main_logger = <Logger fedflow.task.main (INFO)>
sub_logger = <Logger fedflow.task.sub (INFO)>
__init__(task_id: Optional[Union[int, str]] = None, *, estimate_memory: Optional[Union[int, str]] = None, estimate_cuda_memory: Optional[Union[int, str]] = None, device=None)[source]

Construct an instance of task

Parameters
  • task_id – task unique id, default is uuid string.

  • estimate_memory – maximum memory expected to be used.

  • estimate_cuda_memory – maximum cuda memory expected to be used.

  • device – specify device the task used, if it’s None, the device will be decided by scheduler.

property workdir: str

The workdir of task.

This property only can be used after task process was started.

Returns

property status: fedflow.core.task.TaskStatus

The status of task.

Returns

start()None[source]

Start task process This method cannot be called by user.

Returns

start_load()None[source]

Start loading. This method cannot be called by user.

Returns

start_train(device: str)None[source]

Start training. This method cannot be called by user.

Parameters

device – the device this task will use.

Returns

exit()None[source]

Exit task process. This method cannot be called by user.

Returns

is_alive()bool[source]

If the task process is alive.

Returns

a bool value

run(pipe, mq)None[source]

subprocess code entry

Parameters
  • pipe – connection pipe between main process task and subprocess task

  • mq – connection queue between main process scheduler and subprocess tasks

Returns

abstract load()None[source]

User must overwrite this method in subclass. When implement subclass, user should put all loading action(such as load datasets) in this method.

Returns

abstract train(device: str)dict[source]

User must overwrite this method in subclass. When implement subclass, user should put all computer action(such as train or predict) in this method.

Parameters

device – the device this task will use.

Returns

a dict represent some properties used for reporting.

class fedflow.core.task.TaskStatus(value)[source]

Bases: enum.Enum

The task status enum.

UNKNOWN = 0

the error status

INIT = 1

construct a task instance and hasn’t started scheduling

AVAILABLE = 2

start task subprocess

LOADING = 3

start loading

WAITING = 4

loaded successfully, and waiting for training

TRAINING = 5

start training

FINISHED = 6

training successfully

EXITED = 7

subprocess exited

EXCEPTION = 8

caught some exception while running

INTERRUPT = 9

caught OOM(or cuda OOM) exception while running

TaskGroup

All tasks in one group will executed disorderly.

class fedflow.core.taskgroup.TaskGroup(group_name: Optional[str] = None, *, estimate_memory: Optional[Union[int, str]] = None, estimate_cuda_memory: Optional[Union[int, str]] = None, device=None)[source]

Bases: object

Generally, tasks in one group should be similar, it means all tasks is instance of the same class.

Of course, this is not mandatory, you just need to ensure that there are no dependencies between tasks.

global_ids = {}
__init__(group_name: Optional[str] = None, *, estimate_memory: Optional[Union[int, str]] = None, estimate_cuda_memory: Optional[Union[int, str]] = None, device=None)[source]

Construct a task group.

Parameters
  • group_name – the group name, it only used for create group directory and display in report.

  • estimate_memory – maximum memory expected to be used for every task in this group.

  • estimate_cuda_memory – maximum cuda memory expected to be used for every task in this group.

  • device – specify device the tasks in this group used, if it’s None, the device will be decided by

scheduler.

property device: str
property group_name: str

only used for group directory name.

Returns

a string represent group name.

add_task(task: fedflow.core.task.Task)None[source]

Add a task to this group.

Parameters

task – the task to be added to this group

Returns

get_task(task_id: Union[int, str])Optional[fedflow.core.task.Task][source]

Get specify task in group

Parameters

task_id – the unique task id.

Returns

an instance of Task or None if not found.

move_task(task_id: Union[int, str], _from: fedflow.core.task.TaskStatus, _to: fedflow.core.task.TaskStatus)None[source]

Move task from one container to other container. An exception will be threw if task not exists in _from container. This method will update the status of task after successfully moved.

Parameters
  • task_id – the id of task

  • _from – the status move from

  • _to – the status move to

Returns

report_finish(task_id: Union[int, str], data=None)None[source]

report a task finished.

Parameters
  • task_id – the finished task id

  • data – extra report data

Returns

report_exception(task_id: Union[int, str], stage: str, message: str)None[source]

report a task caught exception.

Parameters
  • task_id – the exception task id.

  • stage – the stage of exception caught(‘load’ or ‘train’).

  • message – exception message

Returns

finished()bool[source]

If all tasks in this group is finished or caught exception.

Returns

a bool value

numbers()[source]

the task numbers of this group

Returns

a tuple (process_number, waiting_number, training_number)

retrieve_task(status)Optional[fedflow.core.task.Task][source]

randomly retrieve a task which has status.

Parameters

status – which status task need

Returns

the task retrieved or None if not found.