Skip to content

Commit

Permalink
enh!: Deprecate the streamz interface in holoviews.streams
Browse files Browse the repository at this point in the history
  • Loading branch information
hoxbro committed Jan 15, 2025
1 parent b66153a commit 009f096
Show file tree
Hide file tree
Showing 8 changed files with 7 additions and 369 deletions.
3 changes: 0 additions & 3 deletions doc/user_guide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ concepts in HoloViews:
`Working with large data <Large_Data.html>`_
Leverage Datashader to interactively explore millions or billions of datapoints.

`Working with Streaming Data <Streaming_Data.html>`_
Demonstrates how to leverage the streamz library with HoloViews to work with streaming datasets.

`Creating interactive dashboards <Dashboards.html>`_
Use external widget libraries to build custom, interactive dashboards.

Expand Down
2 changes: 0 additions & 2 deletions examples/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
collect_ignore_glob = [
# Needs selenium, phantomjs, firefox, and geckodriver to save a png picture
"user_guide/Plotting_with_Bokeh.ipynb",
# Streaming data use streamz which is no longer maintained
"user_guide/16-Streaming_Data.ipynb",
# Possible timeout error
"user_guide/17-Dashboards.ipynb",
# Give file not found
Expand Down
357 changes: 1 addition & 356 deletions examples/user_guide/16-Streaming_Data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
"\n",
"This user guide shows a third way of building an interactive plot, using ``DynamicMap`` and streams. Here, instead of pushing plot metadata (such as zoom ranges, user triggered events such as ``Tap`` and so on) to a ``DynamicMap`` callback, the underlying data in the visualized elements are updated directly using a HoloViews ``Stream``.\n",
"\n",
"In particular, we will show how the HoloViews ``Pipe`` and ``Buffer`` streams can be used to work with streaming data sources without having to fetch or generate the data from inside the ``DynamicMap`` callable. Apart from simply setting element data from outside a ``DynamicMap``, we will also explore ways of working with streaming data coordinated by the separate [``streamz``](http://matthewrocklin.com/blog/work/2017/10/16/streaming-dataframes-1) library from Matt Rocklin, which can make building complex streaming pipelines much simpler.\n",
"\n",
"As this notebook makes use of the ``streamz`` library, you will need to install it with ``conda install streamz`` or ``pip install streamz``."
"In particular, we will show how the HoloViews ``Pipe`` and ``Buffer`` streams can be used to work with streaming data sources without having to fetch or generate the data from inside the ``DynamicMap`` callable. Apart from simply setting element data from outside a ``DynamicMap``.\n"
]
},
{
Expand All @@ -25,8 +23,6 @@
"import numpy as np\n",
"import pandas as pd\n",
"import holoviews as hv\n",
"import streamz\n",
"import streamz.dataframe\n",
"\n",
"from holoviews import opts\n",
"from holoviews.streams import Pipe, Buffer\n",
Expand Down Expand Up @@ -206,357 +202,6 @@
"Note that when using the ``Buffer`` stream the view will always follow the current range of the data by default, by setting ``buffer.following=False`` or passing following as an argument to the constructor this behavior may be disabled."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Using the Streamz library\n",
"\n",
"Now that we have discovered what ``Pipe`` and ``Buffer`` can do it's time to show how you can use them together with the ``streamz`` library. Although HoloViews does not depend on ``streamz`` and you can use the streaming functionality without needing to learn about it, the two libraries work well together, allowing you to build pipelines to manage continuous streams of data. Streamz is easy to use for simple tasks, but also supports complex pipelines that involve branching, joining, flow control, feedback and more. Here we will mostly focus on connecting streamz output to ``Pipe`` and then ``Buffer`` so for more details about the streamz API, consult the [streamz documentation](https://streamz.readthedocs.io/en/latest/).\n",
"\n",
"#### Using ``streamz.Stream`` together with ``Pipe``\n",
"\n",
"Let's start with a fairly simple example:\n",
"\n",
"1. Declare a ``streamz.Stream`` and a ``Pipe`` object and connect them into a pipeline into which we can push data. \n",
"2. Use a ``sliding_window`` of 10, which will first wait for 10 sets of stream updates to accumulate. At that point and for every subsequent update, it will apply ``pd.concat`` to combine the most recent 10 updates into a new dataframe.\n",
"3. Use the ``sink`` method on the ``streamz.Stream`` to ``send`` the resulting collection of 10 updates to ``Pipe``.\n",
"4. Declare a ``DynamicMap`` that takes the sliding window of concatenated DataFrames and displays it using a ``Scatter`` Element.\n",
"5. Color the ``Scatter`` points by their 'count' and set a range, then display:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"point_source = streamz.Stream()\n",
"pipe = Pipe(data=pd.DataFrame({'x': [], 'y': [], 'count': []}))\n",
"point_source.sliding_window(20).map(pd.concat).sink(pipe.send) # Connect streamz to the Pipe\n",
"scatter_dmap = hv.DynamicMap(hv.Scatter, streams=[pipe])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"After set up our streaming pipeline we can again display it:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"scatter_dmap.opts(bgcolor='black', color='count', ylim=(-4, 4), show_legend=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img class=\"gif\" src=\"https://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz1.gif\"></img>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There is now a pipeline, but initially this plot will be empty, because no data has been sent to it. To see the plot update, let's use the ``emit`` method of ``streamz.Stream`` to send small chunks of random pandas ``DataFrame``s to our plot:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for i in range(100):\n",
" df = pd.DataFrame({'x': np.random.rand(100), 'y': np.random.randn(100), 'count': i},\n",
" columns=['x', 'y', 'count'])\n",
" point_source.emit(df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Using StreamingDataFrame and StreamingSeries\n",
"\n",
"The streamz library provides ``StreamingDataFrame`` and ``StreamingSeries`` as a powerful way to easily work with live sources of tabular data. This makes it perfectly suited to work with ``Buffer``. With the ``StreamingDataFrame`` we can easily stream data, apply computations such as cumulative and rolling statistics and then visualize the data with HoloViews.\n",
"\n",
"The ``streamz.dataframe`` module provides a ``Random`` utility that generates a ``StreamingDataFrame`` that emits random data with a certain frequency at a specified interval. The ``example`` attribute lets us see the structure and dtypes of the data we can expect:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"simple_sdf = streamz.dataframe.Random(freq='10ms', interval='100ms')\n",
"print(simple_sdf.index)\n",
"simple_sdf.example.dtypes"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Since the ``StreamingDataFrame`` provides a pandas-like API, we can specify operations on the data directly. In this example we subtract a fixed offset and then compute the cumulative sum, giving us a randomly drifting timeseries. We can then pass the x-values of this dataframe to the HoloViews ``Buffer`` and supply ``hv.Curve`` as the ``DynamicMap`` callback to stream the data into a HoloViews ``Curve`` (with the default key and value dimensions):"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sdf = (simple_sdf-0.5).cumsum()\n",
"hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x)]).opts(width=500, show_grid=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img class=\"gif\" src=\"https://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz3.gif\"></img>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The ``Random`` StreamingDataFrame will asynchronously emit events, driving the visualization forward, until it is explicitly stopped, which we can do by calling the ``stop`` method."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"simple_sdf.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Making use of the ``StreamingDataFrame`` API\n",
"\n",
"So far we have only computed the cumulative sum, but the ``StreamingDataFrame`` actually has an extensive API that lets us run a broad range of streaming computations on our data. For example, let's apply a rolling mean to our x-values with a window of 500ms and overlay it on top of the 'raw' data:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"source_df = streamz.dataframe.Random(freq='5ms', interval='100ms')\n",
"sdf = (source_df-0.5).cumsum()\n",
"raw_dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x)])\n",
"smooth_dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x.rolling('500ms').mean())])\n",
"\n",
"(raw_dmap.relabel('raw') * smooth_dmap.relabel('smooth')).opts(\n",
" opts.Curve(width=500, show_grid=True))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img class=\"gif\" src=\"https://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz4.gif\"></img>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"source_df.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Customizing elements with ``functools.partial``\n",
"\n",
"In this notebook we have avoided defining custom functions for ``DynamicMap`` by simply supplying the element class and using the element constructor instead. Although this works well for examples, it often won't generalize to real-life situations, because you don't have an opportunity to use anything other than the default dimensions. One simple way to get around this limitation is to use ``functools.partial``:\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from functools import partial"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now you can now easily create an inline callable that creates an element with custom key and value dimensions by supplying them to ``partial`` in the form ``partial(hv.Element, kdims=[...], vdims=[...])``. In the next section, we will see an example of this pattern using ``hv.BoxWhisker``."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Controlling the length\n",
"\n",
"By default the ``Buffer`` accumulates a ``length`` of 1000 samples. In many cases this may be excessive, but we can specify a shorter (or longer) length value to control how much history we accumulate, often depending on the element type.\n",
"\n",
"In the following example, a custom ``length`` is used together with a ``partial`` wrapping ``hv.BoxWhisker`` in order to display a cumulative sum generated from a stream of random dataframes:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"multi_source = streamz.dataframe.Random(freq='50ms', interval='500ms')\n",
"sdf = (multi_source-0.5).cumsum()\n",
"hv.DynamicMap(hv.Table, streams=[Buffer(sdf.x, length=10)]) +\\\n",
"hv.DynamicMap(partial(hv.BoxWhisker, kdims=[], vdims='x'), streams=[Buffer(sdf.x, length=100)])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img class=\"gif\" src=\"https://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz5.gif\"></img>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here the given stream ``sdf`` is being consumed by a table showing a short length (where only the items visible in the table need to be kept), along with a plot computing averages and variances over a longer length (100 items).\n",
"\n",
"#### Updating multiple cells\n",
"\n",
"Since a ``StreamingDataFrame`` will emit data until it is stopped, we can subscribe multiple plots across different cells to the same stream. Here, let's add a ``Scatter`` plot of the same data stream as in the preceding cell:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"hv.DynamicMap(hv.Scatter, streams=[Buffer(sdf.x)]).redim.label(x='value', index='time')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img class=\"gif\" src=\"https://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz6.gif\"></img>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here we let the ``Scatter`` elements use the column names from the supplied ``DataFrames`` which are relabelled using the ``redim`` method. Stopping the stream will now stop updates to all three of these DynamicMaps:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"multi_source.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Operations over streaming data\n",
"\n",
"As we discovered above, the ``Buffer`` lets us set a ``length``, which defines how many rows we want to accumulate. We can use this to our advantage and apply an operation over this length window. In this example we declare a ``Dataset`` and then apply the ``histogram`` operation to compute a ``Histogram`` over the specified ``length`` window:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"hist_source = streamz.dataframe.Random(freq='5ms', interval='100ms')\n",
"sdf = (hist_source-0.5).cumsum()\n",
"dmap = hv.DynamicMap(hv.Dataset, streams=[Buffer(sdf.x, length=500)])\n",
"hv.operation.histogram(dmap, dimension='x')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img class=\"gif\" src=\"https://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz7.gif\"></img>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"hist_source.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Datashading\n",
"\n",
"The same approach will also work for the datashader operation letting us datashade the entire ``length`` window even if we make it very large such as 1 million samples:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from holoviews.operation.datashader import datashade\n",
"from bokeh.palettes import Blues8\n",
"\n",
"large_source = streamz.dataframe.Random(freq='100us', interval='200ms')\n",
"sdf = (large_source-0.5).cumsum()\n",
"dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x, length=1000000)])\n",
"datashade(dmap, streams=[hv.streams.PlotSize], cnorm='linear', cmap=list(Blues8)).opts(width=600)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img class=\"gif\" src=\"https://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz8.gif\"></img>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"large_source.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
Loading

0 comments on commit 009f096

Please sign in to comment.