Welcome to PiePline’s documentation!

Getting started guide

First of all look at main classes of PiePline:

Training stages needed for customize training process. With it Trainer work by this scheme (dataflow scheme for single epoch):

_images/data_flow.svg

Implement dataset class

In PiePline dataset is iterable class. This means, that class need contain __getitem__ and __len__ methods.

For every i-th output, dataset need produce Python dict with keys ‘data’ and ‘target’.

Let’s create MNIST dataset, based on builtin PyTorch dataset:

from torchvision import datasets, transforms

class MNISTDataset(AbstractDataset):
    # define transforms
    transforms = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])

    def __init__(self, data_dir: str, is_train: bool):
        # instantiate PyTorch dataset
        self.dataset = datasets.MNIST(data_dir, train=is_train, download=True)

    # define method, that output dataset length
    def __len__(self):
        return len(self.dataset)

    # define method, that return single data by index
    def __getitem__(self, item):
        data, target = self.dataset[item]
        return {'data': self.transforms(data), 'target': target}

For work with this dataset we need wrap it by DataProducer:

from piepline import DataProducer

# create train and validation datasets objects
train_dataset = DataProducer([MNISTDataset('data/dataset', True)], batch_size=4, num_workers=2)
validation_dataset = DataProducer([MNISTDataset('data/dataset', False)], batch_size=4, num_workers=2)

Create TrainConfig

Now let’s define TrainConfig that will contains training hyperparameters.

In this tutorial we use predefined stages TrainStage and ValidationStage. TrainStage iterate by DataProducer and learn model in train() mode. Respectively ValidatioStage do same but in eval() mode.

from piepline import TrainConfig, TrainStage, ValidationStage

# define train stages
train_stages = [TrainStage(train_dataset), ValidationStage(validation_dataset)]

loss = torch.nn.NLLLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-4, momentum=0.5)

# define TrainConfig
train_config = TrainConfig(train_stages, loss, optimizer)

Create Trainer

First of all we need specify model, that will be trained:

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4 * 4 * 50, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4 * 4 * 50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

Now we need build our training process. It’s done by implements Trainer class:

from piepline import FileStructManager, Trainer

# define file structure for experiment
fsm = FileStructManager(base_dir='data', is_continue=False)

# create trainer
trainer = Trainer(model, train_config, fsm, torch.device('cuda:0'))

# specify training epochs number
trainer.set_epoch_num(50)

Last parameter or Trainer constructor - target device, that will be used for training.

Start training

Now we can just start training process:

trainer.train()

That’s all. Console output will look like that:

Epoch: [1]; train: [0.004141, 1.046422, 3.884116]; validation: [0.002027, 0.304710, 2.673034]
Epoch: [2]; train: [0.000519, 0.249564, 4.938250]; validation: [0.000459, 0.200972, 2.594026]
Epoch: [3]; train: [0.000182, 0.180328, 5.218509]; validation: [0.000135, 0.155546, 2.512275]
train: 31%|███ | 4651/15000 [00:31<01:07, 154.06it/s, loss=[0.154871]]

First 3 lines is standard output of ConsoleMonitor. This monitor included for MonitorHub by default. Every line show loss values of correspondence stage in format [min, mean, max] values.

Last line build by tqdm and outcomes from TrainStage and ValidationStage. This output show current mean value of metrics on training stage.

Add Tensorboard monitor

For getting most useful information about training we can connect Tensorboard.

For do it we need before training connect builtin TensorboardMonitor to Trainer:

from piepline.builtin.monitors.tensorboard import TensorboardMonitor

trainer.monitor_hub.add_monitor(TensorboardMonitor(fsm, is_continue=False))

Now Tensorboard output will looks like:

_images/tensorboard_loss.jpg _images/tensorboard_hist.jpg

Continue training

If we need to do some more training epochs but doesn’t have previously defined objects we need to do this:

# define again all from previous steps
# ...

# define FileStructureManager with parameter is_continue=True
fsm = FileStructManager(base_dir='data', is_continue=True)

# create trainer
trainer = Trainer(model, train_config, fsm, torch.device('cuda:0'))

# specify training epochs number
trainer.set_epoch_num(50)

# add TensorboardMonitor with parameter is_continue=True
trainer.monitor_hub.add_monitor(TensorboardMonitor(fsm, is_continue=True))

# set Trainer to resume mode and run training
trainer.resume(from_best_checkpoint=False).train()

Parameter from_best_checkpoint=False tell Trainer, that it need continue from last checkpoint. PiePline can save best checkpoints by specified rule. For more information about it read about enable_lr_decaying method of Trainer.

Don’t worry about incorrect training history displaying. If history also exists - monitors just add new data to it.

After this tutorial look to segmentation example for explore how to work with specific metrics.

API

Trainer

The main module for training process

class piepline.train.Trainer(train_config: piepline.train_config.train_config.BaseTrainConfig, fsm: piepline.utils.fsm.FileStructManager, device: torch.device = None)[source]

Class, that run drive process.

Trainer get list of training stages and every epoch loop over it.

Training process looks like:

for epoch in epochs_num:
    for stage in training_stages:
        stage.run()
        monitor_hub.update_metrics(stage.metrics_processor().get_metrics())
    save_state()
    on_epoch_end_callback()
Parameters:
  • train_configTrainConfig object
  • fsmFileStructManager object
  • device – device for training process
exception TrainerException(msg)[source]
cur_epoch_id() → int[source]

Get current epoch index

data_processor() → piepline.data_processor.data_processor.TrainDataProcessor[source]

Get data processor object

Returns:data processor
enable_lr_decaying(coeff: float, patience: int, target_val_clbk: callable) → piepline.train.Trainer[source]

Enable rearing rate decaying. Learning rate decay when target_val_clbk returns doesn’t update minimum for patience steps

Parameters:
  • coeff – lr decay coefficient
  • patience – number of steps
  • target_val_clbk – callback which returns the value that is used for lr decaying
Returns:

self object

set_epoch_num(epoch_number: int) → piepline.train.Trainer[source]

Define number of epoch for training. One epoch - one iteration over all train stages

Parameters:epoch_number – number of training epoch
Returns:self object
train() → None[source]

Run training process

train_config() → piepline.train_config.train_config.BaseTrainConfig[source]

Get train config

Returns:TrainConfig object

Train Config

class piepline.train_config.train_config.BaseTrainConfig(model: torch.nn.modules.module.Module, train_stages: [], loss: torch.nn.modules.module.Module, optimizer: torch.optim.optimizer.Optimizer)[source]

Train process setting storage

Parameters:
  • train_stages – list of stages for train loop
  • loss – loss criterion
  • optimizer – optimizer object
loss() → torch.nn.modules.module.Module[source]

Get loss object

Returns:loss object
optimizer() → torch.optim.optimizer.Optimizer[source]

Get optimizer object

Returns:optimizer object
stages() → List[piepline.train_config.stages.AbstractStage][source]

Get list of stages

Returns:list of stages

Data Producer

class piepline.data_producer.data_producer.DataProducer(dataset: piepline.data_producer.datasets.AbstractDataset, batch_size: int = 1, num_workers: int = 0)[source]

Data Producer. Accumulate one or more datasets and pass it’s data by batches for processing. This use PyTorch builtin DataLoader for increase performance of data delivery. :param dataset: dataset object. Every dataset might be iterable (contans methods __getitem__ and __len__) :param batch_size: size of output batch :param num_workers: number of processes, that load data from datasets and pass it for output

get_data(data_idx: int) → object[source]

Get single data by dataset idx and data_idx :param data_idx: index of data in this dataset :return: dataset output

get_loader(indices: [<class 'str'>] = None) → torch.utils.data.dataloader.DataLoader[source]

Get PyTorch DataLoader object, that aggregate DataProducer. If indices is specified - DataLoader will output data only by this indices. In this case indices will not passed. :param indices: list of indices. Each item of list is a string in format ‘{}_{}’.format(dataset_idx, data_idx) :return: DataLoader object

global_shuffle(is_need: bool) → piepline.data_producer.data_producer.DataProducer[source]

Is need global shuffling. If global shuffling enable - batches will compile from random indices of all datasets. In this case datasets order shuffling was ignoring :param is_need: is need global shuffling :return: self object

pass_indices(need_pass: bool) → piepline.data_producer.data_producer.DataProducer[source]

Pass indices of data in every batch. By default disabled :param need_pass: is need to pass indices

pin_memory(is_need: bool) → piepline.data_producer.data_producer.DataProducer[source]

Is need to pin memory on loading. Pinning memory was increase data loading performance (especially when data loads to GPU) but incompatible with swap :param is_need: is need :return: self object

File structure management utils

This module contains all classes, that work with file structure

  • FileStructManager provide all modules registration
  • CheckpointsManager provide checkpoints management
class piepline.utils.fsm.FileStructManager(base_dir: str, is_continue: bool, exists_ok: bool = False)[source]

Class, that provide directories registration in base directory.

All modules, that use file structure under base directory should register their paths in this class by pass module to method register_dir(). If directory also registered registration method will raise exception FSMException

Parameters:
  • base_dir – path to directory with checkpoints
  • is_continue – is FileStructManager used for continue training or predict
  • exists_ok – if True - all checks for existing directories will be disabled
exception FSMException(message: str)[source]
get_path(obj: piepline.utils.fsm.FolderRegistrable, create_if_non_exists: bool = False, check: bool = True) → str[source]

Get path of registered object

Parameters:
  • obj – object
  • create_if_non_exists – is need to create object’s directory if it doesn’t exists
  • check – is need to check object’s directory existing
Returns:

path to directory

Raises:

FSMException – if directory exists and check == True

in_continue_mode() → bool[source]

Is FileStructManager in continue mode

Returns:True if in continue
register_dir(obj: piepline.utils.fsm.FolderRegistrable, check_name_registered: bool = False, check_dir_registered: bool = True) → None[source]

Register directory in file structure

Parameters:
  • obj – object to registration
  • check_name_registered – is need to check if object name also registered
  • check_dir_registered – is need to check if object path also registered
Raises:

FileStructManager – if path or object name also registered and if path also exists (in depends of optional parameters values)

class piepline.utils.fsm.FolderRegistrable(fsm: piepline.utils.fsm.FileStructManager)[source]

Abstract class for implement classes, that use folders

Parameters:fsm – FileStructureManager class instance
class piepline.utils.fsm.MultipleFSM(base_dir: str, is_continue: bool, exists_ok: bool = False)[source]

Monitoring

Data Processor

class piepline.data_processor.data_processor.DataProcessor(model: torch.nn.modules.module.Module, device: torch.device = None)[source]

DataProcessor manage: model, data processing, device choosing

Args:
model (Module): model, that will be used for process data device (torch.device): what device pass data for processing
model() → torch.nn.modules.module.Module[source]

Get current module

predict(data: torch.Tensor) → object[source]

Make predict by data

Parameters:data – data as torch.Tensor or dict with key data
Returns:processed output
Return type:the model output type
set_pick_model_input(pick_model_input: callable) → piepline.data_processor.data_processor.DataProcessor[source]

Set callback, that will get output from DataLoader and return model input.

Default mode:


lambda data: data[‘data’]

Args:
pick_model_input (callable): pick model input callable. This callback need to get one parameter: dataset output
Returns:
self object

Examples:

data_processor.set_pick_model_input(lambda data: data['data'])
data_processor.set_pick_model_input(lambda data: data[0])
class piepline.data_processor.data_processor.TrainDataProcessor(train_config: piepline.train_config.train_config.BaseTrainConfig, device: torch.device = None)[source]

TrainDataProcessor is make all of DataProcessor but produce training process.

Parameters:train_config – train config
exception TDPException(msg)[source]
get_lr() → float[source]

Get learning rate from optimizer

get_state() → {}[source]

Get model and optimizer state dicts

Returns:dict with keys [weights, optimizer]
predict(data, is_train=False) → torch.Tensor[source]

Make predict by data. If is_train is True - this operation will compute gradients. If is_train is False - this will work with model.eval() and torch.no_grad

Parameters:
  • data – data in dict
  • is_train – is data processor need train on data or just predict
Returns:

processed output

Return type:

model return type

process_batch(batch: {}, is_train: bool) → Tuple[torch.Tensor, torch.Tensor, torch.Tensor][source]

Process one batch of data

Args:
batch (dict): contains ‘data’ and ‘target’ keys. The values for key must be instance of torch.Tensor or dict is_train (bool): is batch process for train
Returns:
tuple of class:torch.Tensor of losses, predicts and targets with shape (N, …) where N is batch size
save_state(path: str) → None[source]

Save state of optimizer and perform epochs number

set_data_preprocess(data_preprocess: callable) → piepline.data_processor.data_processor.DataProcessor[source]

Set callback, that will get output from DataLoader and return preprocessed data. For example may be used for pass data to device.

Default mode:


_pass_data_to_device()

Args:
data_preprocess (callable): preprocess callable. This callback need to get one parameter: dataset output
Returns:
self object

Examples:

from piepline.utils import dict_recursive_bypass
data_processor.set_data_preprocess(lambda data: dict_recursive_bypass(data, lambda v: v.cuda()))
set_pick_target(pick_target: callable) → piepline.data_processor.data_processor.DataProcessor[source]

Set callback, that will get output from DataLoader and return target.

Default mode:


lambda data: data[‘target’]

Args:
pick_target (callable): pick target callable. This callback need to get one parameter: dataset output
Returns:
self object

Examples:

data_processor.set_pick_target(lambda data: data['target'])
data_processor.set_pick_target(lambda data: data[1])
update_lr(lr: float) → None[source]

Update learning rate straight to optimizer

Parameters:lr – target learning rate

Model

Predictor

The main module for run inference

class piepline.predict.Predictor(model: torch.nn.modules.module.Module, checkpoints_manager: piepline.utils.checkpoints_manager.CheckpointsManager)[source]

Predictor run inference by training parameters

Parameters:
  • model – model object, used for predict
  • fsmFileStructManager object
predict(data: torch.Tensor)[source]

Predict ine data

Parameters:data – data as torch.Tensor or dict with key data
Returns:processed output
Return type:model output type

Builtin modules

In builtin module contains all modules that can’t be tested, or have specific field of application.

Tensorboard

This module contains Tensorboard monitor interface

class piepline.builtin.monitors.tensorboard.TensorboardMonitor(fsm: piepline.utils.fsm.FileStructManager, is_continue: bool, network_name: str = None)[source]

Class, that manage metrics end events monitoring. It worked with tensorboard. Monitor get metrics after epoch ends and visualise it. Metrics may be float or np.array values. If metric is np.array - it will be shown as histogram and scalars (scalar plots contains mean valuse from array).

Parameters:
  • fsm – file structure manager
  • is_continue – is data processor continue training
  • network_name – network name
update_losses(losses: {}) → None[source]

Update monitor

Parameters:losses – losses values with keys ‘train’ and ‘validation’
update_metrics(metrics: {}) → None[source]

Update monitor

Parameters:metrics – metrics dict with keys ‘metrics’ and ‘groups’
update_scalar(name: str, value: float, epoch_idx: int = None) → None[source]

Update scalar on tensorboard

Parameters:
  • name – the classic tag for TensorboardX
  • value – scalar value
  • epoch_idx – epoch idx. If doesn’t set - use last epoch idx stored in this class
visualize_model(model: torch.nn.modules.module.Module, tensor) → None[source]

Visualize model graph

Parameters:
  • modeltorch.nn.Module object
  • tensor – dummy input for trace model
write_to_txt_log(text: str, tag: str = None) → None[source]

Write to txt log

Parameters:
  • text – text that will be writed
  • tag – tag

Matplotlib

This module contains Matplotlib monitor interface

class piepline.builtin.monitors.mpl.MPLMonitor[source]

This monitor show all data in Matplotlib plots

realtime(is_realtime: bool) → piepline.builtin.monitors.mpl.MPLMonitor[source]

Is need to show data updates in realtime

Parameters:is_realtime – is need realtime
Returns:self object
update_metrics(metrics: {}) → None[source]

Update metrics on monitor

Parameters:metrics – metrics dict with keys ‘metrics’ and ‘groups’

DVC

Portrait segmentation network.

This based on PyTorch, PiePline and high-level pipeline build by [DVC](dvc.org).

Creation repo tutorial (explain, that code also exists):

This steps also done and results contains in repo. For reproduce this step make: ` dvc destroy git commit -m 'deinit DVC' ` ###Clone repo

1) add PixArt dataset as submodule ` git submodule add http://172.26.40.23:3000/datasets/pixart.git datasets/ ` 2) load all from submodule ` git submodule update --init `

###Build DVC pipeline:

1) initialize DVC ` dvc init git commit -m 'add DVC' ` 2) Setup pipeline ` dvc run -d train.py -M data/monitors/metrics_log/metrics.json -o data/checkpoints/last/last_checkpoint.zip --no-exec python train.py dvc run -d predict.py -d data/checkpoints/last/last_checkpoint.zip -o result --no-exec python predict.py ` 3) Run pipeline ` dvc repro result.dvc ` 4) Last steps

After pipeline execution end, we get metrics.json file with metrics values and pipeline modified steps files. Let’s add it to git history ` git add data/checkpoints/last/.gitignore last_checkpoint.zip.dvc result.dvc metrics.json -f `

###Run another experiment We add hard negative mining to our training process. So we need to run new experiment and then compare it with existing

  1. Create new branch

` git checkout -b hnm dvc checkout `

  1. Repeat all steps from previous section
  2. Compare metrics

` dvc metrics show -a `

Output will look like that:

``` hnm:

metrics.json: {“train”: {“jaccard”: 0.8874640464782715, “dice”: 0.9423233270645142, “loss”: 0.7522647976875305}, “validation”: {“jaccard”: 0.8573445081710815, “dice”: 0.9246319532394409, “loss”: 0.7623925805091858}}
master:
metrics.json: {“train”: {“jaccard”: 0.8774164915084839, “dice”: 0.9357065558433533, “loss”: 0.7595105767250061}, “validation”: {“jaccard”: 0.8574965596199036, “dice”: 0.927370011806488, “loss”: 0.7602806687355042}}

```

###Show DVC pipeline: ` dvc pipeline show --ascii result.dvc ` U may see this output: ``` +————————-+ | last_checkpoint.zip.dvc | +————————-+

result.dvc

```

## Reproduce results: Call dvc repro will run pipeline. But we need define last step of pipeline. So as a parameter we pass last pipeline step file name: ` dvc repro result.dvc `

After pipeline stop executing, you can see metrics (-a - show metrics from all branches): ` dvc metrics show -a `