R interface to TensorFlow Dataset API
Overview
The TensorFlow Dataset API provides various facilities for creating scalable input pipelines for TensorFlow models, including:
Reading data from a variety of formats including CSV files and TFRecords files (the standard binary format for TensorFlow training data).
Transforming datasets in a variety of ways including mapping arbitrary functions against them.
Shuffling, batching, and repeating datasets over a number of epochs.
Streaming interface to data for reading arbitrarily large datasets.
Reading and transforming data are TensorFlow graph operations, so are executed in C++ and in parallel with model training.
The R interface to TensorFlow datasets provides access to the Dataset API, including high-level convenience functions for easy integration with the keras and tfestimators R packages.
Installation
To use tfdatasets you need to install both the R package as well as TensorFlow itself.
First, install the tfdatasets R package from CRAN as follows:
Then, use the install_tensorflow()
function to install TensorFlow:
Creating a Dataset
To create a dataset, use one of the dataset creation functions. Dataset can be created from delimted text files, TFRecords files, as well as from in-memory data.
Text Files
For example, to create a dataset from a text file, first create a specification for how records will be decoded from the file, then call text_line_dataset()
with the file to be read and the specification:
library(tfdatasets)
# create specification for parsing records from an example file
iris_spec <- csv_record_spec("iris.csv")
# read the datset
dataset <- text_line_dataset("iris.csv", record_spec = iris_spec)
# take a glimpse at the dataset
str(dataset)
<MapDataset shapes: {Sepal.Length: (), Sepal.Width: (), Petal.Length: (), Petal.Width: (),
Species: ()}, types: {Sepal.Length: tf.float32, Sepal.Width: tf.float32, Petal.Length:
tf.float32, Petal.Width: tf.float32, Species: tf.int32}>
In the example above, the csv_record_spec()
function is passed an example file which is used to automatically detect column names and types (done by reading up to the first 1,000 lines of the file). You can also provide explicit column names and/or data types using the names
and types
parameters (note that in this case we don’t pass an example file):
# provide colum names and types explicitly
iris_spec <- csv_record_spec(
names = c("SepalLength", "SepalWidth", "PetalLength", "PetalWidth", "Species"),
types = c("double", "double", "double", "double", "integer"),
skip = 1
)
# read the datset
dataset <- text_line_dataset("iris.csv", record_spec = iris_spec)
Note that we’ve also specified skip = 1
to indicate that the first row of the CSV that contains column names should be skipped.
Supported column types are integer, double, and character. You can also provide types
in a more compact form using single-letter abbreviations (e.g. types = "dddi"
). For example:
Parallel Decoding
Decoding lines of text into a record can be computationally expensive. You can parallelize these computations using the parallel_records
parameter. For example:
You can also parallelize the reading of data from storage by requesting that a buffer of records be prefected. You do this with the dataset_prefetch()
function. For example:
dataset <- text_line_dataset("iris.csv", record_spec = iris_spec, parallel_records = 4) %>%
dataset_prefetch(1000)
If you have multiple input files, you can also parallelize reading of these files both across multiple machines (sharding) and/or on multiple threads per-machine (parallel reads with interleaving). See the section on Reading Multiple Files below for additional details.
TFRecords Files
You can read datasets from TFRecords files using the tfrecord_dataset()
function.
In many cases you’ll want to map the records in the dataset into a set of named columns. You can do this using the dataset_map()
function along with the tf$parse_single_example()
function. for example:
# Creates a dataset that reads all of the examples from two files, and extracts
# the image and label features.
filenames <- c("/var/data/file1.tfrecord", "/var/data/file2.tfrecord")
dataset <- tfrecord_dataset(filenames) %>%
dataset_map(function(example_proto) {
features <- list(
image = tf$FixedLenFeature(shape(), tf$string),
label = tf$FixedLenFeature(shape(), tf$int32)
)
tf$parse_single_example(example_proto, features)
})
SQLite Databases
You can read datasets from SQLite databases using the sqlite_dataset()
function. Note that support for SQLite databases required TensorFlow v1.7 as well as the development version of the tfdatasets package, which you can install via devtools::install_github("rstudio/tfdatasts")
.
To use sqlite_dataset()
you provide the filename of the database, a SQL query to execute, and sql_record_spec()
that describes the names and TensorFlow types of columns within the query. For example:
library(tfdatasets)
record_spec <- sql_record_spec(
names = c("disp", "drat", "vs", "gear", "mpg", "qsec", "hp", "am", "wt", "carb", "cyl"),
types = c(tf$float64, tf$int32, tf$float64, tf$int32, tf$float64, tf$float64,
tf$float64, tf$int32, tf$int32, tf$int32, tf$int32)
)
dataset <- sqlite_dataset(
"data/mtcars.sqlite3",
"select * from mtcars",
record_spec
)
dataset
<MapDataset shapes: {disp: (), drat: (), vs: (), gear: (), mpg: (), qsec: (), hp: (), am: (),
wt: (), carb: (), cyl: ()}, types: {disp: tf.float64, drat: tf.int32, vs: tf.float64, gear:
tf.int32, mpg: tf.float64, qsec: tf.float64, hp: tf.float64, am: tf.int32, wt: tf.int32, carb:
tf.int32, cyl: tf.int32}>
Note that for floating point data you must use tf$float64
(reading tf$float32
is not supported for SQLite databases).
Transformations
Mapping
You can map arbitrary transformation functions onto dataset records using the dataset_map()
function. For example, to transform the “Species” column into a one-hot encoded vector you would do this:
dataset <- dataset %>%
dataset_map(function(record) {
record$Species <- tf$one_hot(record$Species, 3L)
record
})
Note that while dataset_map()
is defined using an R function, there are some special constraints on this function which allow it to execute not within R but rather within the TensorFlow graph.
For a dataset created with the csv_dataset()
function, the passed record will be named list of tensors (one for each column of the dataset). The return value should be another set of tensors which were created from TensorFlow functions (e.g. tf$one_hot
as illustrated above). This function will be converted to a TensorFlow graph operation that performs the transformation within native code.
Parallel Mapping
If these transformations are computationally expensive they can be executed on multiple threads using the num_parallel_calls
parameter. For example:
dataset <- dataset %>%
dataset_map(num_parallel_calls = 4, function(record) {
record$Species <- tf$one_hot(record$Species, 3L)
record
})
You can control the maximum number of processed elements that will be buffered when processing in parallel using the dataset_prefetch()
transformation. For example:
Filtering
You can filter the elements of a dataset using the dataset_filter()
function, which takes a predicate
function that returns a boolean tensor for records that should be included. For example:
dataset <- csv_dataset("mtcars.csv") %>%
dataset_filter(function(record) {
record$mpg >= 20
})
dataset <- csv_dataset("mtcars.csv") %>%
dataset_filter(function(record) {
record$mpg >= 20 & record$cyl >= 6L
})
Note that the functions used inside the predicate must be tensor operations (e.g. tf$not_equal
, tf$less
, etc.). R generic methods for relational operators (e.g. <, >, <=, etc.) and logical operators (e.g. !, &, |, etc.) are provided so you can use shorthand syntax for most common comparisons (as illustrated above).
Features and Response
A common transformation is taking a column oriented dataset (e.g. one created by csv_dataset()
or tfrecord_dataset()
) and transforming it into a two-element list with features (“x”) and response (“y”). You can use the dataset_prepare()
function to do this type of transformation. For example:
mtcars_dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
dataset_prepare(x = c(mpg, disp), y = cyl)
iris_dataset <- text_line_dataset("iris.csv", record_spec = iris_spec) %>%
dataset_prepare(x = -Species, y = Species)
The dataset_prepare()
function also accepts standard R formula syntax for defining features and response:
Shuffling and Batching
There are several functions which control how batches are drawn from the dataset. For example, the following specifies that data will be drawn in batches of 128 from a shuffled window of 1000 records, and that the dataset will be repeated for 10 epochs:
Complete Example
Here’s a complete example of using the various dataset transformation functions together. We’ll read the mtcars
dataset from a CSV, filter it on some threshold values, map it into x
and y
components for modeling, and specify desired shuffling and batch iteration behavior:
Reading Datasets
The method for reading data from a TensorFlow Dataset varies depending upon which API you are using to build your models. If you are using the keras or tfestimators packages, then TensorFlow Datasets can be used much like in-memory R matrices and arrays. If you are using the lower-level tensorflow core API then you’ll use explicit dataset iteration functions.
The sections below provide additional details and examples for each of the supported APIs.
keras package
IMPORTANT NOTE: Using TensorFlow Datasets with Keras requires that you are running the very latest versions of Keras (v2.2), TensorFlow (v1.8), and the development versions of the keras and tfdatasets R packages. You can install these packages with:
You can ensure that you have the latest version of the core Keras library with:
Keras models are often trained by passing in-memory arrays directly to the fit
function. For example:
However, this requires loading data into an R data frame or matrix before calling fit. You can use the train_on_batch()
function to stream data one batch at a time, however the reading and processing of the input data is still being done serially and outside of native code.
Alternatively, Keras enables you to pass a dataset directly as the x
argument to fit()
and evaluate()
. Here’s a complete example that uses datasets to read from TFRecord files containing MNIST digits:
library(keras)
library(tfdatasets)
batch_size = 128
steps_per_epoch = 500
# function to read and preprocess mnist dataset
mnist_dataset <- function(filename) {
dataset <- tfrecord_dataset(filename) %>%
dataset_map(function(example_proto) {
# parse record
features <- tf$parse_single_example(
example_proto,
features = list(
image_raw = tf$FixedLenFeature(shape(), tf$string),
label = tf$FixedLenFeature(shape(), tf$int64)
)
)
# preprocess image
image <- tf$decode_raw(features$image_raw, tf$uint8)
image <- tf$cast(image, tf$float32) / 255
# convert label to one-hot
label <- tf$one_hot(tf$cast(features$label, tf$int32), 10L)
# return
list(image, label)
}) %>%
dataset_repeat() %>%
dataset_shuffle(1000) %>%
dataset_batch(batch_size, drop_remainder = TRUE) %>%
dataset_prefetch(1)
}
model <- keras_model_sequential() %>%
layer_dense(units = 256, activation = 'relu', input_shape = c(784)) %>%
layer_dropout(rate = 0.4) %>%
layer_dense(units = 128, activation = 'relu') %>%
layer_dropout(rate = 0.3) %>%
layer_dense(units = 10, activation = 'softmax')
model %>% compile(
loss = 'categorical_crossentropy',
optimizer = optimizer_rmsprop(),
metrics = c('accuracy')
)
history <- model %>% fit(
mnist_dataset("mnist/train.tfrecords"),
steps_per_epoch = steps_per_epoch,
epochs = 20,
validation_data = mnist_dataset("mnist/validation.tfrecords"),
validation_steps = steps_per_epoch
)
score <- model %>% evaluate(
mnist_dataset("mnist/test.tfrecords"),
steps = steps_per_epoch
)
print(score)
Note that all data preprocessing (e.g. one-hot encoding of the response variable) is done within the dataset_map()
operation.
tfestimators package
Models created with tfestimators use an input function to consume data for training, evaluation, and prediction. For example, here is an example of using an input function to feed data from an in-memory R data frame to an estimators model:
model %>% train(
input_fn(mtcars, features = c(mpg, disp), response = cyl,
batch_size = 128, epochs = 3)
)
If you are using tfdatasets with the tfestimators package, you can create an estimators input function directly from a dataset as follows:
library(tfestimators)
library(tfdatasets)
mtcars_spec <- csv_record_spec("mtcars.csv")
dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
dataset_batch(128) %>%
dataset_repeat(3)
cols <- feature_columns(
column_numeric("mpg"),
column_numeric("disp")
)
model <- linear_regressor(feature_columns = cols)
model %>% train(
input_fn(dataset, features = c(mpg, disp), response = cyl)
)
Note that we don’t use the dataset_prepare()
function in this example. Rather, this function is used under the hood to provide the input_fn
interface expected by tfestimators models.
As with dataset_prepare()
, you can alternatively use an R formula to specify features and response:
tensorflow package
You read batches of data from a dataset by using tensors that yield the next batch. You can obtain this tensor from a dataset via the make_iterator_one_shot()
and iterator_get_next()
functions. For example:
dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
dataset_prepare(cyl ~ mpg + disp) %>%
dataset_shuffle(20) %>%
dataset_batch(5)
iter <- make_iterator_one_shot(dataset)
next_batch <- iterator_get_next(iter)
next_batch
$x
Tensor("IteratorGetNext_13:0", shape=(?, 2), dtype=float32)
$y
Tensor("IteratorGetNext_13:1", shape=(?,), dtype=int32)
As you can see next_batch
isn’t the data itself but rather a tensor that will yield the next batch of data when it is evaluated:
$x
[,1] [,2]
[1,] 21.0 160
[2,] 21.0 160
[3,] 22.8 108
[4,] 21.4 258
[5,] 18.7 360
$y
[1] 6 6 4 6 8
If you are iterating over a dataset using these functions, you will need to determine at what point to stop iteration. One approach to this is to use the dataset_repeat()
function to create an dataset that yields values infinitely. For example:
library(tfdatasets)
sess <- tf$Session()
mtcars_spec <- csv_record_spec("mtcars.csv")
dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
dataset_prepare(x = c(mpg, disp), y = cyl) %>%
dataset_shuffle(5000) %>%
dataset_batch(128) %>%
dataset_repeat() # repeat infinitely
iter <- make_iterator_one_shot(dataset)
next_batch <- iterator_get_next(iter)
steps <- 200
for (i in 1:steps) {
batch <- sess$run(next_batch)
# use batch for training, etc.
}
In this case the steps
variable is used to determine when to stop drawing new batches of training data (we could have equally included code to detect a learning plateau or any other custom method of determining when to stop training).
Another approach is to detect when all batches have been yielded from the dataset. When a dataset iterator reaches the end, an out of range runtime error will occur. You can catch and ignore the error when it occurs by using out_of_range_handler
as the error
argument to tryCatch()
. For example:
library(tfdatasets)
sess <- tf$Session()
mtcars_spec <- csv_record_spec("mtcars.csv")
dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
dataset_prepare(x = c(mpg, disp), y = cyl) %>%
dataset_batch(128) %>%
dataset_repeat(10)
iter <- make_iterator_one_shot(dataset)
next_batch <- iterator_get_next(iter)
tryCatch({
while(TRUE) {
batch <- sess$run(next_batch)
# use batch$x and batch$y tensors
}
}, error = out_of_range_handler)
Reading Multiple Files
If you have multiple input files you can process them in parallel both across machines (sharding) and/or on multiple threads per-machine (parallel reads with interleaving). The read_files()
function provides a high-level interface to parallel file reading.
The read_files()
function takes a set of files and a read function along with various options to orchestrate parallel reading. For example, the following function reads all CSV files in a directory using the text_line_dataset()
function:
dataset <- read_files("data/*.csv", text_line_dataset, record_spec = mtcars_spec,
parallel_files = 4, parallel_interleave = 16) %>%
dataset_prefetch(5000) %>%
dataset_shuffle(1000) %>%
dataset_batch(128) %>%
dataset_repeat(3)
The parallel_files
argument requests that 4 files be processed in parallel and the parallel_interleave
argument requests that blocks of 16 consecutive records from each file be interleaved in the resulting dataset.
Note that because we are processing files in parallel we do not pass the parallel_records
argument to text_line_dataset()
, since we are already parallelizing at the file level.
Multiple Machines
If you are training on multiple machines and the training supervisor passes a shard index to your training script, you can also parallelizing reading by sharding the file list. For example:
# command line flags for training script (shard info is passed by training
# supervisor that executes the script)
FLAGS <- flags(
flag_integer("num_shards", 1),
flag_integer("shard_index", 1)
)
# forward shard info to read_files
dataset <- read_files("data/*.csv", text_line_dataset, record_spec = mtcars_spec,
parallel_files = 4, parallel_interleave = 16,
num_shards = FLAGS$num_shards, shard_index = FLAGS$shard_index) %>%
dataset_prefetch(5000) %>%
dataset_shuffle(1000) %>%
dataset_batch(128) %>%
dataset_repeat(3)