fedflow.core package¶
Process communication¶
classes in this source file are used for communication among processes, user should not use them directly. the start and stop action of listener should only be called in fedflow framework.
- class fedflow.core.message.Message(source, cmd, data)¶
Bases:
tupleThe message data structure communication among processes.
- property cmd¶
the command of this message
- property data¶
the payload data of this message
- property source¶
where message from
- class fedflow.core.message.MessageListener[source]¶
Bases:
object- logger = <Logger fedflow.msglistener (INFO)>¶
- classmethod register_handler(source: str, handler: fedflow.core.message.Handler, overwrite=False) → None[source]¶
register handler for specify source.
- Parameters
source – every handler need a source, and the source cannot be same to the source of MessageListener
handler – an instance of subclass of Handler
overwrite – whether overwrite handler if it exists
- Returns
- classmethod register_default_handler(default_handler)[source]¶
register default handler and the action will overwrite previous default handler.
- Parameters
default_handler – an instance of subclass of Handler, it will handle all message which has no specify handler. In init, the default handler will do nothing.
- Returns
Schedule core¶
The core of fedflow schedule.
GroupScheduler is responsible for the parallel operation of scheduling tasks.
Generally, user should not use GroupScheduler directly.
- class fedflow.core.scheduler.GroupScheduler[source]¶
Bases:
objectThe scheduler of
TaskGroup.- logger = <Logger fedflow.scheduler (INFO)>¶
- classmethod schedule(group: fedflow.core.taskgroup.TaskGroup) → None[source]¶
The entry of schedule.
This method is blocked.
- Parameters
group – the task group waiting for scheduling.
- Returns
- classmethod memory_free(require_memory: Optional[Union[int, str]] = None) → bool[source]¶
check memory utilization
- Parameters
require_memory – the memory current task required. the type of
require_memorycan be int(the unit is Byte) or str(number + unit, for example, ‘123KB’, ‘456 MB’, ‘789MiB’).- Returns
a bool value
Task¶
The basic class of task.
User define task by inherit the Task class and overwrite load and train methods.
- class fedflow.core.task.Task(task_id: Optional[Union[int, str]] = None, *, estimate_memory: Optional[Union[int, str]] = None, estimate_cuda_memory: Optional[Union[int, str]] = None, device=None)[source]¶
Bases:
objectthe basic class of all user task
- main_logger = <Logger fedflow.task.main (INFO)>¶
- sub_logger = <Logger fedflow.task.sub (INFO)>¶
- __init__(task_id: Optional[Union[int, str]] = None, *, estimate_memory: Optional[Union[int, str]] = None, estimate_cuda_memory: Optional[Union[int, str]] = None, device=None)[source]¶
Construct an instance of task
- Parameters
task_id – task unique id, default is uuid string.
estimate_memory – maximum memory expected to be used.
estimate_cuda_memory – maximum cuda memory expected to be used.
device – specify device the task used, if it’s None, the device will be decided by scheduler.
- property workdir: str¶
The workdir of task.
This property only can be used after task process was started.
- Returns
- property status: fedflow.core.task.TaskStatus¶
The status of task.
- Returns
- start_train(device: str) → None[source]¶
Start training. This method cannot be called by user.
- Parameters
device – the device this task will use.
- Returns
- run(pipe, mq) → None[source]¶
subprocess code entry
- Parameters
pipe – connection pipe between main process task and subprocess task
mq – connection queue between main process scheduler and subprocess tasks
- Returns
- abstract load() → None[source]¶
User must overwrite this method in subclass. When implement subclass, user should put all loading action(such as load datasets) in this method.
- Returns
- abstract train(device: str) → dict[source]¶
User must overwrite this method in subclass. When implement subclass, user should put all computer action(such as train or predict) in this method.
- Parameters
device – the device this task will use.
- Returns
a dict represent some properties used for reporting.
- class fedflow.core.task.TaskStatus(value)[source]¶
Bases:
enum.EnumThe task status enum.
- UNKNOWN = 0¶
the error status
- INIT = 1¶
construct a task instance and hasn’t started scheduling
- AVAILABLE = 2¶
start task subprocess
- LOADING = 3¶
start loading
- WAITING = 4¶
loaded successfully, and waiting for training
- TRAINING = 5¶
start training
- FINISHED = 6¶
training successfully
- EXITED = 7¶
subprocess exited
- EXCEPTION = 8¶
caught some exception while running
- INTERRUPT = 9¶
caught OOM(or cuda OOM) exception while running
TaskGroup¶
All tasks in one group will executed disorderly.
- class fedflow.core.taskgroup.TaskGroup(group_name: Optional[str] = None, *, estimate_memory: Optional[Union[int, str]] = None, estimate_cuda_memory: Optional[Union[int, str]] = None, device=None)[source]¶
Bases:
objectGenerally, tasks in one group should be similar, it means all tasks is instance of the same class.
Of course, this is not mandatory, you just need to ensure that there are no dependencies between tasks.
- global_ids = {}¶
- __init__(group_name: Optional[str] = None, *, estimate_memory: Optional[Union[int, str]] = None, estimate_cuda_memory: Optional[Union[int, str]] = None, device=None)[source]¶
Construct a task group.
- Parameters
group_name – the group name, it only used for create group directory and display in report.
estimate_memory – maximum memory expected to be used for every task in this group.
estimate_cuda_memory – maximum cuda memory expected to be used for every task in this group.
device – specify device the tasks in this group used, if it’s None, the device will be decided by
scheduler.
- property device: str¶
- property group_name: str¶
only used for group directory name.
- Returns
a string represent group name.
- add_task(task: fedflow.core.task.Task) → None[source]¶
Add a task to this group.
- Parameters
task – the task to be added to this group
- Returns
- get_task(task_id: Union[int, str]) → Optional[fedflow.core.task.Task][source]¶
Get specify task in group
- Parameters
task_id – the unique task id.
- Returns
an instance of
Taskor None if not found.
- move_task(task_id: Union[int, str], _from: fedflow.core.task.TaskStatus, _to: fedflow.core.task.TaskStatus) → None[source]¶
Move task from one container to other container. An exception will be threw if task not exists in _from container. This method will update the status of task after successfully moved.
- Parameters
task_id – the id of task
_from – the status move from
_to – the status move to
- Returns
- report_finish(task_id: Union[int, str], data=None) → None[source]¶
report a task finished.
- Parameters
task_id – the finished task id
data – extra report data
- Returns
- report_exception(task_id: Union[int, str], stage: str, message: str) → None[source]¶
report a task caught exception.
- Parameters
task_id – the exception task id.
stage – the stage of exception caught(‘load’ or ‘train’).
message – exception message
- Returns
- finished() → bool[source]¶
If all tasks in this group is finished or caught exception.
- Returns
a bool value
- numbers()[source]¶
the task numbers of this group
- Returns
a tuple
(process_number, waiting_number, training_number)
- retrieve_task(status) → Optional[fedflow.core.task.Task][source]¶
randomly retrieve a task which has
status.- Parameters
status – which status task need
- Returns
the task retrieved or None if not found.