Skip to content
Snippets Groups Projects

Resolve "Track training with mlflow"

Merged Frank Sauerburger requested to merge 1-track-training-with-mlflow into main
2 files
+ 96
10
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 95
10
@@ -7,13 +7,16 @@ import argparse
import collections
import json
import os
from pprint import pprint
import random
import re
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
import tensorflow_recommenders as tfrs
import mlflow
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, TensorSpec
def load_movielense(basedir):
@@ -189,7 +192,10 @@ def load_dataset(basedir):
titles = {int(k): v for k, v in titles.items()}
movies_tensor = tf.constant([[x] for x in titles.keys()])
movies_tensor = tf.constant(
[[x] for x in titles.keys()],
dtype=tf.uint64,
)
movie_ds = tf.data.Dataset.from_tensor_slices(movies_tensor)
# Cache and batch
@@ -220,7 +226,11 @@ class RecommenderModel(tfrs.Model):
embedded_cand = self.cand_model(label_movie_id)
return self.retrieval_task(embedded_query, embedded_cand, compute_metrics=not training)
return self.retrieval_task(
embedded_query,
embedded_cand,
compute_metrics=not training
)
def build_embedding_model(movies, embedding_dimension):
@@ -235,7 +245,10 @@ def build_embedding_model(movies, embedding_dimension):
cand_model = tf.keras.Sequential([
tf.keras.layers.IntegerLookup(vocabulary=vocab, mask_token=None, input_shape=(1,)),
tf.keras.layers.Embedding(len(movies) + 1, embedding_dimension),
tf.keras.layers.Reshape((32,), input_shape=(1, 32))
tf.keras.layers.Reshape(
(embedding_dimension,),
input_shape=(1, embedding_dimension)
)
])
return query_model, cand_model
@@ -245,6 +258,9 @@ def build_model(movies,
learning_rate=0.1,
embedding_dimension=32):
"""Return the full keras/tensorflow model"""
mlflow.log_param("learning_rate", learning_rate)
mlflow.log_param("embedding_dimension", embedding_dimension)
query_model, cand_model = build_embedding_model(
movies, embedding_dimension=embedding_dimension
)
@@ -264,10 +280,26 @@ def build_model(movies,
return model
# pylint: disable=R0903
class MLflowKerasCallback(keras.callbacks.Callback):
"""Callback to record loss"""
@staticmethod
def on_epoch_end(epoch, logs=None):
"""Log non-empty metrics after each step"""
if not logs:
return
# Remove empty logs
logs = {k: v for k, v in logs.items() if v != 0}
mlflow.log_metrics(logs, epoch)
def fit_model(model, training_ds, epochs=3):
"""Fit the model to the training dataset"""
model.fit(training_ds, epochs=epochs)
mlflow.log_param("epochs", epochs)
model.fit(training_ds,
epochs=epochs,
callbacks=MLflowKerasCallback())
def lookup(titles, movie_ids):
@@ -277,19 +309,64 @@ def lookup(titles, movie_ids):
def eval_model(model, test_ds):
"""Print the test set performance"""
result = model.evaluate(test_ds, return_dict=True)
pprint(result)
result = model.evaluate(test_ds, return_dict=True, verbose=0)
val_result = {"val_" + k: v for k, v in result.items()}
mlflow.log_metrics(val_result)
def build_prediction_model(model, movies):
"""Build and return into recommend movies"""
index = tfrs.layers.factorized_top_k.BruteForce(model.query_model)
index.index_from_dataset(tf.data.Dataset.zip(
(movies.batch(100), movies.batch(100).map(model.cand_model))
))
return index
# Use layer-model for now, with fixed-size shape
# inputs = tf.keras.Input(shape=(None, ))
# outputs = index(inputs)
# model = tf.keras.Model(inputs=inputs, outputs=outputs)
# return model
def log_preduction_model(model):
"""Log the prediction model to mlflow"""
# The model does not know its input shape. Set the shape to
# a fixed size for how
model(tf.constant([[1, 2, 3, 4]]))
input_schema = Schema([
TensorSpec(np.dtype(np.int32), (-1, -1))
])
output_schema = Schema([
TensorSpec(np.dtype(np.uint64), (-1, ), name="recommendations"),
])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
mlflow.keras.log_model(model, "models",
signature=signature,
pip_requirements="requirements.txt")
def train(args):
"""Load the dataset, train and evaluate the model"""
mlflow.set_experiment(args.mlflow_experiment)
train_ds, test_ds, _, movies = load_dataset(args.input)
with tf.device("cpu:0"):
model = build_model(movies)
fit_model(model, train_ds)
eval_model(model, test_ds)
model = build_model(movies,
embedding_dimension=args.embedding_dimension,
learning_rate=args.learning_rate)
fit_model(model, train_ds, epochs=args.epochs)
if not args.debug:
eval_model(model, test_ds)
index = build_prediction_model(model, movies)
log_preduction_model(index)
commands = {
"prepare": prepare_dataset,
@@ -301,12 +378,20 @@ def get_default_parser():
parser = argparse.ArgumentParser()
parser.add_argument("command", metavar="CMD", choices=commands,
help="Operation to execute")
parser.add_argument("--mlflow-experiment", "-E", default="movie-recommender",
help="URI to the mlflow tracking backend")
parser.add_argument("-i", "--input", metavar="PATH",
help="Path to input file")
parser.add_argument("-o", "--output", metavar="PATH",
help="Path to output file(s)")
parser.add_argument("--debug", default=False, action="store_true",
help="Limit test data size")
parser.add_argument("-e", "--epochs", default=3, type=int,
help="Number of training epochs")
parser.add_argument("-l", "--learning-rate", default=0.1, type=float,
help="Learning rate")
parser.add_argument("-d", "--embedding-dimension", default=32, type=int,
help="Dimension of embedding space")
return parser
Loading