Skip to content
Snippets Groups Projects
Verified Commit 6b93d906 authored by Frank Sauerburger's avatar Frank Sauerburger
Browse files

Use fixed-size inputs

parent 02db354b
No related branches found
No related tags found
1 merge request!1Resolve "Track training with mlflow"
Pipeline #9627 passed
......@@ -7,13 +7,16 @@ import argparse
import collections
import json
import os
from pprint import pprint
import random
import re
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
import tensorflow_recommenders as tfrs
import mlflow
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, TensorSpec
def load_movielense(basedir):
......@@ -189,7 +192,10 @@ def load_dataset(basedir):
titles = {int(k): v for k, v in titles.items()}
movies_tensor = tf.constant([[x] for x in titles.keys()])
movies_tensor = tf.constant(
[[x] for x in titles.keys()],
dtype=tf.uint64,
)
movie_ds = tf.data.Dataset.from_tensor_slices(movies_tensor)
# Cache and batch
......@@ -220,7 +226,11 @@ class RecommenderModel(tfrs.Model):
embedded_cand = self.cand_model(label_movie_id)
return self.retrieval_task(embedded_query, embedded_cand, compute_metrics=not training)
return self.retrieval_task(
embedded_query,
embedded_cand,
compute_metrics=not training
)
def build_embedding_model(movies, embedding_dimension):
......@@ -235,7 +245,10 @@ def build_embedding_model(movies, embedding_dimension):
cand_model = tf.keras.Sequential([
tf.keras.layers.IntegerLookup(vocabulary=vocab, mask_token=None, input_shape=(1,)),
tf.keras.layers.Embedding(len(movies) + 1, embedding_dimension),
tf.keras.layers.Reshape((32,), input_shape=(1, 32))
tf.keras.layers.Reshape(
(embedding_dimension,),
input_shape=(1, embedding_dimension)
)
])
return query_model, cand_model
......@@ -245,6 +258,9 @@ def build_model(movies,
learning_rate=0.1,
embedding_dimension=32):
"""Return the full keras/tensorflow model"""
mlflow.log_param("learning_rate", learning_rate)
mlflow.log_param("embedding_dimension", embedding_dimension)
query_model, cand_model = build_embedding_model(
movies, embedding_dimension=embedding_dimension
)
......@@ -264,10 +280,26 @@ def build_model(movies,
return model
# pylint: disable=R0903
class MLflowKerasCallback(keras.callbacks.Callback):
"""Callback to record loss"""
@staticmethod
def on_epoch_end(epoch, logs=None):
"""Log non-empty metrics after each step"""
if not logs:
return
# Remove empty logs
logs = {k: v for k, v in logs.items() if v != 0}
mlflow.log_metrics(logs, epoch)
def fit_model(model, training_ds, epochs=3):
"""Fit the model to the training dataset"""
model.fit(training_ds, epochs=epochs)
mlflow.log_param("epochs", epochs)
model.fit(training_ds,
epochs=epochs,
callbacks=MLflowKerasCallback())
def lookup(titles, movie_ids):
......@@ -277,19 +309,62 @@ def lookup(titles, movie_ids):
def eval_model(model, test_ds):
"""Print the test set performance"""
result = model.evaluate(test_ds, return_dict=True)
pprint(result)
result = model.evaluate(test_ds, return_dict=True, verbose=0)
val_result = {"val_" + k: v for k, v in result.items()}
mlflow.log_metrics(val_result)
def build_prediction_model(model, movies):
"""Build and return into recommend movies"""
index = tfrs.layers.factorized_top_k.BruteForce(model.query_model)
index.index_from_dataset(tf.data.Dataset.zip(
(movies.batch(100), movies.batch(100).map(model.cand_model))
))
return index
# Use layer-model for now, with fixed-size shape
# inputs = tf.keras.Input(shape=(None, ))
# outputs = index(inputs)
# model = tf.keras.Model(inputs=inputs, outputs=outputs)
# return model
def log_preduction_model(model):
"""Log the prediction model to mlflow"""
# The model does not know its input shape. Set the shape to
# a fixed size for how
model(tf.constant([[1, 2, 3, 4]]))
input_schema = Schema([
TensorSpec(np.dtype(np.int32), (-1, -1))
])
output_schema = Schema([
TensorSpec(np.dtype(np.uint64), (-1, ), name="recommendations"),
])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
mlflow.keras.log_model(model, "models",
signature=signature,
pip_requirements="requirements.txt")
def train(args):
"""Load the dataset, train and evaluate the model"""
mlflow.set_experiment(args.mlflow_experiment)
train_ds, test_ds, _, movies = load_dataset(args.input)
with tf.device("cpu:0"):
model = build_model(movies)
fit_model(model, train_ds)
eval_model(model, test_ds)
if not args.debug:
eval_model(model, test_ds)
index = build_prediction_model(model, movies)
log_preduction_model(index)
commands = {
"prepare": prepare_dataset,
......@@ -301,6 +376,8 @@ def get_default_parser():
parser = argparse.ArgumentParser()
parser.add_argument("command", metavar="CMD", choices=commands,
help="Operation to execute")
parser.add_argument("--mlflow-experiment", "-E", default="movie-recommender",
help="URI to the mlflow tracking backend")
parser.add_argument("-i", "--input", metavar="PATH",
help="Path to input file")
parser.add_argument("-o", "--output", metavar="PATH",
......
......@@ -2,3 +2,4 @@ numpy~=1.23.2
pandas~=1.4.3
tensorflow-recommenders~=0.7.0
tensorflow~=2.9.1
mlflow~=1.27.0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment