Commit eb88cda0 authored by Yuxin Wu's avatar Yuxin Wu

JoinData supports dict

parent 93d707bc
...@@ -505,7 +505,9 @@ class ConcatData(DataFlow): ...@@ -505,7 +505,9 @@ class ConcatData(DataFlow):
class JoinData(DataFlow): class JoinData(DataFlow):
""" """
Join the components from each DataFlow. Join the components from each DataFlow. See below for its behavior.
Dataflow that produces lists and dataflow that produces dicts
cannot be joined.
Example: Example:
...@@ -514,6 +516,10 @@ class JoinData(DataFlow): ...@@ -514,6 +516,10 @@ class JoinData(DataFlow):
df1 produces: [c1, c2] df1 produces: [c1, c2]
df2 produces: [c3, c4] df2 produces: [c3, c4]
joined: [c1, c2, c3, c4] joined: [c1, c2, c3, c4]
df1 produces: {"a":c1, "b":c2}
df2 produces: {"c":c3}
joined: {"a":c1, "b":c2, "c":c3}
""" """
def __init__(self, df_lists): def __init__(self, df_lists):
...@@ -549,9 +555,13 @@ class JoinData(DataFlow): ...@@ -549,9 +555,13 @@ class JoinData(DataFlow):
itrs = [k.__iter__() for k in self.df_lists] itrs = [k.__iter__() for k in self.df_lists]
try: try:
while True: while True:
dp = [] all_dps = [next(itr) for itr in itrs]
for itr in itrs: if isinstance(all_dps[0], (list, tuple)):
dp.extend(next(itr)) dp = list(itertools.chain(*all_dps))
else:
dp = {}
for x in all_dps:
dp.update(x)
yield dp yield dp
except StopIteration: # some of them are exhausted except StopIteration: # some of them are exhausted
pass pass
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment