Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate Collection error with Melvis and Haystack #8563

Closed
cksrc opened this issue Nov 21, 2024 · 1 comment
Closed

Duplicate Collection error with Melvis and Haystack #8563

cksrc opened this issue Nov 21, 2024 · 1 comment

Comments

@cksrc
Copy link

cksrc commented Nov 21, 2024

Describe the bug
Using Milvus lite as document store with default configuration causes Failed to create collection: HaystackCollection

Error message
Assert "!name_ids_.count(field_name)" at /Users/zilliz/milvus-lite/thirdparty/milvus/internal/core/src/common/Schema.h:172
=> duplicated field name
Assert "!name_ids_.count(field_name)" at /Users/zilliz/milvus-lite/thirdparty/milvus/internal/core/src/common/Schema.h:172
=> duplicated field name
Assert "!name_ids_.count(field_name)" at /Users/zilliz/milvus-lite/thirdparty/milvus/internal/core/src/common/Schema.h:172
=> duplicated field name
Assert "!name_ids_.count(field_name)" at /Users/zilliz/milvus-lite/thirdparty/milvus/internal/core/src/common/Schema.h:172
=> duplicated field name
RPC error: [create_collection], <MilvusException: (code=2000, message=Assert "!name_ids_.count(field_name)" at /Users/zilliz/milvus-lite/thirdparty/milvus/internal/core/src/common/Schema.h:172
=> duplicated field name: segcore error)>, <Time:{'RPC start': '2024-11-20 23:29:25.490017', 'RPC error': '2024-11-20 23:32:32.365641'}>
Failed to create collection: HaystackCollection error: <MilvusException: (code=2000, message=Assert "!name_ids_.count(field_name)" at /Users/zilliz/milvus-lite/thirdparty/milvus/internal/core/src/common/Schema.h:172
=> duplicated field name: segcore error)>
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py", line 401, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/uvicorn/middleware/proxy_headers.py", line 60, in call
return await self.app(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/fastapi/applications.py", line 1054, in call
await super().call(scope, receive, send)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/applications.py", line 113, in call
await self.middleware_stack(scope, receive, send)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/middleware/errors.py", line 187, in call
raise exc
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/middleware/errors.py", line 165, in call
await self.app(scope, receive, _send)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/middleware/exceptions.py", line 62, in call
await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
raise exc
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
await app(scope, receive, sender)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/routing.py", line 715, in call
await self.middleware_stack(scope, receive, send)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/routing.py", line 735, in app
await route.handle(scope, receive, send)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/routing.py", line 288, in handle
await self.app(scope, receive, send)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/routing.py", line 76, in app
await wrap_app_handling_exceptions(app, request)(scope, receive, send)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
raise exc
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
await app(scope, receive, sender)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/routing.py", line 74, in app
await response(scope, receive, send)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/responses.py", line 158, in call
await self.background()
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/background.py", line 41, in call
await task()
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/starlette/background.py", line 26, in call
await self.func(*self.args, **self.kwargs)
File "/Users/ck/Projects/kycfast/repo/backend/preprocessing/pre_02_index/services/index_eu_policies.py", line 56, in index
pipeline.run(
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py", line 471, in run
res: Dict[str, Any] = self._run_component(name, components_inputs[name], parent_span=span)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py", line 76, in _run_component
res: Dict[str, Any] = instance.run(**inputs)
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/haystack/components/writers/document_writer.py", line 101, in run
documents_written = self.document_store.write_documents(documents=documents, policy=policy)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/milvus_haystack/document_store.py", line 384, in write_documents
self._init(**kwargs)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/milvus_haystack/document_store.py", line 560, in _init
self._create_collection(embeddings, metas)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/milvus_haystack/document_store.py", line 629, in _create_collection
raise err
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/milvus_haystack/document_store.py", line 618, in _create_collection
self.col = Collection(
^^^^^^^^^^^
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/pymilvus/orm/collection.py", line 150, in init
conn.create_collection(self.name, schema, **kwargs)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/pymilvus/decorators.py", line 148, in handler
raise e from e
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/pymilvus/decorators.py", line 144, in handler
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/pymilvus/decorators.py", line 183, in handler
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/pymilvus/decorators.py", line 123, in handler
raise e from e
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/pymilvus/decorators.py", line 87, in handler
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/pymilvus/client/grpc_handler.py", line 308, in create_collection
check_status(status)
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/pymilvus/client/utils.py", line 63, in check_status
raise MilvusException(status.code, status.reason, status.error_code)
pymilvus.exceptions.MilvusException: <MilvusException: (code=2000, message=Assert "!name_ids
.count(field_name)" at /Users/zilliz/milvus-lite/thirdparty/milvus/internal/core/src/common/Schema.h:172
=> duplicated field name: segcore error)>
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<RequestResponseCycle.run_asgi() done, defined at /Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py:399> exception=TypeError('an integer is required')>
Traceback (most recent call last):
File "/Users/ck/.pyenv/versions/3.12.7/envs/venv/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py", line 425, in run_asgi
self.on_response = lambda: None
^^^^^^^^^^^^
File "", line 69, in cfunc.to_py.__Pyx_CFunc_7f6725__29_pydevd_sys_monitoring_cython_object__lParen__etc_to_py_4code_11from_offset_9to_offset.wrap
File "_pydevd_sys_monitoring\_pydevd_sys_monitoring_cython.pyx", line 1367, in _pydevd_sys_monitoring_cython._jump_event
TypeError: an integer is required
Expected behavior
A clear and concise description of what you expected to happen.

Additional context
Inside venv/lib/python3.12/site-packages/milvus_haystack/document_store.py the following code is triggering the error:

    def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
   ...
    # If the collection hasn't been initialized yet, perform all steps to do so
        kwargs: Dict[str, Any] = {}
        if not isinstance(self.col, Collection):
            kwargs = {"embeddings": embeddings, "metas": metas}
            if self.partition_names:
                kwargs["partition_names"] = self.partition_names
            if self.replica_number:
                kwargs["replica_number"] = self.replica_number
            if self.timeout:
                kwargs["timeout"] = self.timeout
            self._init(**kwargs)

To Reproduce

@lru_cache
def get_vector_db():
    # Get document store from database
    return MilvusDocumentStore(
        connection_args={
            "uri": get_settings().milvus_db_path
        },  # Milvus Lite
        drop_old=True
    )

def run_pipeline():
  file_type_router = FileTypeRouter(
          mime_types=[
              "text/plain"
          ]
      )
    # Converter plain text files to Document objects
    text_converter = TextFileToDocument()
    # Join Documents coming from different branches of a pipeline
    document_joiner = DocumentJoiner()
    # Clean the text of the documents
    document_cleaner = DocumentCleaner()
    # Split the documents into smaller documents
    document_splitter = DocumentSplitter(split_by="sentence", split_length=2)
    # Create embeddings from the Documents
    document_embedder = SentenceTransformersDocumentEmbedder(
        model="sentence-transformers/all-MiniLM-L6-v2"
    )
    # Write the documents to the DocumentStore
    document_writer = DocumentWriter(document_store, policy=DuplicatePolicy.NONE)

    # Build the Indexing pipeline
    preprocessing_pipeline = Pipeline()
    preprocessing_pipeline.add_component(
        name="file_type_router", instance=file_type_router
    )
    preprocessing_pipeline.add_component(name="text_converter", instance=text_converter)
    preprocessing_pipeline.add_component(
        name="document_joiner", instance=document_joiner
    )
    preprocessing_pipeline.add_component(
        name="document_cleaner", instance=document_cleaner
    )
    preprocessing_pipeline.add_component(
        name="document_splitter", instance=document_splitter
    )
    preprocessing_pipeline.add_component(
        name="document_embedder", instance=document_embedder
    )
    preprocessing_pipeline.add_component(
        name="document_writer", instance=document_writer
    )

    # Connect components
    preprocessing_pipeline.connect(
        "file_type_router.plain/text, "text_converter.sources"
    )
    preprocessing_pipeline.connect("text_converter", "document_joiner")
    preprocessing_pipeline.connect("document_joiner", "document_cleaner")
    preprocessing_pipeline.connect("document_cleaner", "document_splitter")
    preprocessing_pipeline.connect("document_splitter", "document_embedder")
    preprocessing_pipeline.connect("document_embedder", "duplicate_checker")
    preprocessing_pipeline.connect(
        "duplicate_checker.documents_to_index", "document_writer.documents"
    )

pipeline.run(
        {
            "file_type_router": {
                "sources": [...],
                "meta": [..],
            }
        }
    )

**FAQ Check**
- [x] Have you had a look at [our new FAQ page]

**System:**
 - OS: Mac M3 Pro sonoma 14.6.1
 - GPU/CPU:
 - Haystack version (commit or version number): 2.7.0
 - DocumentStore: Milvus
 - Reader:
 - Retriever:
@anakin87
Copy link
Member

Hello!

The integration with Milvus is not maintained by us but by Milvus,
so I would open an issue in their repository: https://github.com/milvus-io/milvus-haystack/issues

@github-actions github-actions bot added the stale label Dec 22, 2024
@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Jan 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants