From 6b93d9060a4257fa8be1dc7cc8f7a9061b8a9fc3 Mon Sep 17 00:00:00 2001 From: Frank Sauerburger <frank@sauerburger.com> Date: Sun, 28 Aug 2022 12:56:01 +0200 Subject: [PATCH] Use fixed-size inputs --- movies.py | 93 +++++++++++++++++++++++++++++++++++++++++++----- requirements.txt | 1 + 2 files changed, 86 insertions(+), 8 deletions(-) diff --git a/movies.py b/movies.py index a78639e..f212dfc 100644 --- a/movies.py +++ b/movies.py @@ -7,13 +7,16 @@ import argparse import collections import json import os -from pprint import pprint import random import re import numpy as np import pandas as pd import tensorflow as tf +from tensorflow import keras import tensorflow_recommenders as tfrs +import mlflow +from mlflow.models.signature import ModelSignature +from mlflow.types.schema import Schema, TensorSpec def load_movielense(basedir): @@ -189,7 +192,10 @@ def load_dataset(basedir): titles = {int(k): v for k, v in titles.items()} - movies_tensor = tf.constant([[x] for x in titles.keys()]) + movies_tensor = tf.constant( + [[x] for x in titles.keys()], + dtype=tf.uint64, + ) movie_ds = tf.data.Dataset.from_tensor_slices(movies_tensor) # Cache and batch @@ -220,7 +226,11 @@ class RecommenderModel(tfrs.Model): embedded_cand = self.cand_model(label_movie_id) - return self.retrieval_task(embedded_query, embedded_cand, compute_metrics=not training) + return self.retrieval_task( + embedded_query, + embedded_cand, + compute_metrics=not training + ) def build_embedding_model(movies, embedding_dimension): @@ -235,7 +245,10 @@ def build_embedding_model(movies, embedding_dimension): cand_model = tf.keras.Sequential([ tf.keras.layers.IntegerLookup(vocabulary=vocab, mask_token=None, input_shape=(1,)), tf.keras.layers.Embedding(len(movies) + 1, embedding_dimension), - tf.keras.layers.Reshape((32,), input_shape=(1, 32)) + tf.keras.layers.Reshape( + (embedding_dimension,), + input_shape=(1, embedding_dimension) + ) ]) return query_model, cand_model @@ -245,6 +258,9 @@ def build_model(movies, learning_rate=0.1, embedding_dimension=32): """Return the full keras/tensorflow model""" + mlflow.log_param("learning_rate", learning_rate) + mlflow.log_param("embedding_dimension", embedding_dimension) + query_model, cand_model = build_embedding_model( movies, embedding_dimension=embedding_dimension ) @@ -264,10 +280,26 @@ def build_model(movies, return model +# pylint: disable=R0903 +class MLflowKerasCallback(keras.callbacks.Callback): + """Callback to record loss""" + + @staticmethod + def on_epoch_end(epoch, logs=None): + """Log non-empty metrics after each step""" + if not logs: + return + + # Remove empty logs + logs = {k: v for k, v in logs.items() if v != 0} + mlflow.log_metrics(logs, epoch) def fit_model(model, training_ds, epochs=3): """Fit the model to the training dataset""" - model.fit(training_ds, epochs=epochs) + mlflow.log_param("epochs", epochs) + model.fit(training_ds, + epochs=epochs, + callbacks=MLflowKerasCallback()) def lookup(titles, movie_ids): @@ -277,19 +309,62 @@ def lookup(titles, movie_ids): def eval_model(model, test_ds): """Print the test set performance""" - result = model.evaluate(test_ds, return_dict=True) - pprint(result) + result = model.evaluate(test_ds, return_dict=True, verbose=0) + val_result = {"val_" + k: v for k, v in result.items()} + mlflow.log_metrics(val_result) + + +def build_prediction_model(model, movies): + """Build and return into recommend movies""" + index = tfrs.layers.factorized_top_k.BruteForce(model.query_model) + index.index_from_dataset(tf.data.Dataset.zip( + (movies.batch(100), movies.batch(100).map(model.cand_model)) + )) + + return index + + # Use layer-model for now, with fixed-size shape + # inputs = tf.keras.Input(shape=(None, )) + # outputs = index(inputs) + + # model = tf.keras.Model(inputs=inputs, outputs=outputs) + # return model + + +def log_preduction_model(model): + """Log the prediction model to mlflow""" + # The model does not know its input shape. Set the shape to + # a fixed size for how + model(tf.constant([[1, 2, 3, 4]])) + + input_schema = Schema([ + TensorSpec(np.dtype(np.int32), (-1, -1)) + ]) + output_schema = Schema([ + TensorSpec(np.dtype(np.uint64), (-1, ), name="recommendations"), + ]) + + signature = ModelSignature(inputs=input_schema, outputs=output_schema) + mlflow.keras.log_model(model, "models", + signature=signature, + pip_requirements="requirements.txt") def train(args): """Load the dataset, train and evaluate the model""" + mlflow.set_experiment(args.mlflow_experiment) + train_ds, test_ds, _, movies = load_dataset(args.input) with tf.device("cpu:0"): model = build_model(movies) fit_model(model, train_ds) - eval_model(model, test_ds) + if not args.debug: + eval_model(model, test_ds) + + index = build_prediction_model(model, movies) + log_preduction_model(index) commands = { "prepare": prepare_dataset, @@ -301,6 +376,8 @@ def get_default_parser(): parser = argparse.ArgumentParser() parser.add_argument("command", metavar="CMD", choices=commands, help="Operation to execute") + parser.add_argument("--mlflow-experiment", "-E", default="movie-recommender", + help="URI to the mlflow tracking backend") parser.add_argument("-i", "--input", metavar="PATH", help="Path to input file") parser.add_argument("-o", "--output", metavar="PATH", diff --git a/requirements.txt b/requirements.txt index 9055648..b0742e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ numpy~=1.23.2 pandas~=1.4.3 tensorflow-recommenders~=0.7.0 tensorflow~=2.9.1 +mlflow~=1.27.0 -- GitLab