o
    g)1                  	   @   s`  d dl Z d dlZd dlZd dlZd dlmZ d dlZd dlmZ d dl	m
Z
mZmZmZmZmZmZmZmZ d dlmZ d dlmZmZ d dlmZ d dlmZ d d	lmZmZmZm Z  d d
l!m"Z"m#Z# d dlm$Z$m%Z% d dl m&Z&m'Z'm(Z( d dl)Z*de
eeef ddf fddZ+de
eeef ddf fddZ,deeg e
eeef ddf f  fddZ-ej.de- dde$de
eeef ddf fddZ/ej.dddee fddZ0G dd dZ1dee2 dee2 ddfd d!Z3d"ee d#ee ddfd$d%Z4ej5j deeef dee d&eddfd'd(Z6ej5j deeef dee ddfd)d*Z7ej5j deeef dee ddfd+d,Z8ej5j deeef dee d&eddfd-d.Z9ej5j deeef dee d&eddfd/d0Z:ej5j deeef dee ddfd1d2Z;ej5j deeef dee d&eddfd3d4Z<ej5j deeef dee ddfd5d6Z=dS )7    N)UUID)count)		GeneratorListCallableOptionalDictUnionIteratorSequenceTuple)BatchSizeExceededError)ProducerConsumer)SqliteDB)
ProducerFn)OperationRecord	Operation	LogRecordScalarEncoding)SystemSettings)FixtureRequestapprox)Eventwait_forTimeoutErrorreturnc                  c   s8    t tdd} | t}|   ||fV  |   dS )z0Fixture generator for sqlite Producer + ConsumerT)allow_resetN)r   r   requirer   startstop)systemdb r$   b/var/www/visachat/venv/lib/python3.10/site-packages/chromadb/test/ingest/test_producer_consumer.pysqlite#   s   

r&   c                  c   s^    t  } ttdd| d}|t}|  ||fV  |  tj	
| r-t|  dS dS )z;Fixture generator for sqlite_persistent Producer + ConsumerT)r   is_persistentpersist_directoryN)tempfilemkdtempr   r   r   r   r    r!   ospathexistsshutilrmtree)	save_pathr"   r#   r$   r$   r%   sqlite_persistent,   s   

r1   c                  C   s   t tg} dtjv rg } | S )NCHROMA_CLUSTER_TEST_ONLY)r&   r1   r+   environ)fixturesr$   r$   r%   r4   :   s   
r4   module)scopeparamsrequestc                 c   s    t |  V  d S N)nextparam)r8   r$   r$   r%   producer_consumerC   s   r<   )r6   c                      s&   dt dtfdd  fddt D S )Nir   c                 S   sn   t | | d  | d | d  g}| d dkrd }nd|  | | | d  d}td|  |tj|tjd}|S )	Ng?      r   value_)str_keyint_key	float_key
embedding_)id	embeddingencodingmetadata	operation)nparrayr   r   FLOAT32r   ADD)r=   vectorrH   recordr$   r$   r%   create_recordL   s   "z(sample_embeddings.<locals>.create_recordc                 3   s    | ]} |V  qd S r9   r$   ).0r=   rP   r$   r%   	<genexpr>]   s    z$sample_embeddings.<locals>.<genexpr>)intr   r   r$   r$   rR   r%   sample_embeddingsJ   s   rU   c                   @   sj   e Zd ZU ee ed< eeeef  ed< dddZ	de
e ddfddZdd
edede
e fddZdS )CapturingConsumeFn
embeddingswaitersr   Nc                 C   s   g | _ g | _t | _dS )zA function that captures embeddings and allows you to wait for a certain
        number of embeddings to be available. It must be constructed in the thread with
        the main event loop
        N)rW   rX   asyncioget_event_loop_loop)selfr$   r$   r%   __init__d   s   zCapturingConsumeFn.__init__c                 C   s<   | j | | jD ]\}}t| j |kr| j|j q	d S r9   )rW   extendrX   lenr[   call_soon_threadsafeset)r\   rW   neventr$   r$   r%   __call__m   s   zCapturingConsumeFn.__call__
   rb   timeout_secsc                    sV   t | j|kr| jd| S t }| j||f t| |I dH  | jd| S )zJWait until at least N embeddings are available, then return all embeddingsN)r_   rW   r   rX   appendr   wait)r\   rb   rf   rc   r$   r$   r%   gett   s   zCapturingConsumeFn.get)r   N)re   )__name__
__module____qualname__r   r   __annotations__r   rT   r   r]   r   rd   ri   r$   r$   r$   r%   rV   `   s   
 
	 rV   abc                 C   s,   t | |D ]\}}t|t|ksJ qd S r9   )zipr   )rn   ro   r=   jr$   r$   r%   assert_approx_equal   s   rr   inserted_recordsconsumed_recordsc                 C   s   t |t | ks
J t| |D ]O\}}|d |d d ksJ |d |d d ks+J |d |d d ks7J |d |d d ksCJ |d dur^|d d dusSJ t|d |d d  qdS )zCGiven a list of inserted and consumed records, make sure they matchrE   rO   rI   rG   rH   rF   N)r_   rp   rr   )rs   rt   insertedconsumedr$   r$   r%   assert_records_match   s   rw   produce_fnsc           	         sl   | \}}|   |   td}||||dd }t }|j||| d |dI d H }t|| d S )N$00000000-0000-0000-0000-000000000000   r   r    )reset_stater   rV   	subscribe	min_seqidri   rw   )	r<   rU   rx   producerconsumercollection_idrW   
consume_fnrecievedr$   r$   r%   test_backfill   s   r   c           
         s   | \}}|   |   td}g }t }|j||| d tdD ] }t|}|| ||| |	|d I d H }	t
||	 q$d S )Nry   r{   re   r>   r|   r   rV   r}   r~   ranger:   rg   submit_embeddingri   rw   )
r<   rU   r   r   r   rW   r   r=   ereceivedr$   r$   r%   test_notifications   s   
r   c                    s   | \}}|   |   td}td}g }g }t }t }	|j||| d |j||	| d tdD ]>}
t|}|| ||| |	|
d I d H }t
|| t|}|| ||| |		|
d I d H }t
|| q7d S )Nz$00000000-0000-0000-0000-000000000001z$00000000-0000-0000-0000-000000000002r{   re   r>   r   )r<   rU   r   r   collection_1collection_2embeddings_1embeddings_2consume_fn_1consume_fn_2r=   e_1	results_2e_2r$   r$   r%   test_multiple_collections   s0   


r   c                    s   | \}}|   |   td}t }t }|j||| d ||||dd }|dI d H }	t||	 |jd d }
|j|||
d ||||dd }t|t	sVJ |
| |dI d H }t|dd  | d S )Nry   r{      r   
log_offset)r|   r   rV   r}   r~   ri   rw   rW   
isinstancelistr^   )r<   rU   rx   r   r   
collectionr   r   rW   	results_1r    second_embeddingsr   r$   r$   r%   test_start_seq_id   s$   

r   c                    s   | \}}|   |   td}t }t }|j||| d ||||dd }|dI d H }	t||	 |jd d }
|j||| |
d |dI d H }t|d d | t	t
 t|d	d
dI d H }W d    d S 1 sww   Y  d S )Nry   r{   re   r   r   r   )r    end      r>   )timeout)r|   r   rV   r}   r~   ri   rw   rW   pytestraisesr   r   )r<   rU   rx   r   r   r   r   r   rW   r   r   r   _r$   r$   r%   test_end_seq_id  s$   
"r   c                    s~   | \}}|   |   td} fddtdD }|j||d t }|j||| d |dI d H }t|| d S )Nry   c                       g | ]}t  qS r$   r:   rQ   r   rU   r$   r%   
<listcomp>+      z%test_submit_batch.<locals>.<listcomp>d   rW   r{   )	r|   r   r   submit_embeddingsrV   r}   r~   ri   rw   )r<   rU   r   r   r   rW   r   r   r$   r   r%   test_submit_batch!  s   r   c              	      s   | \}}|   |   d}dd t|D }t|D ]}|jtd| || | d qdd t|D }d}	d}
d	}t|
|	 D ]3}t|D ](}|| ||td| ||	d	  || ||	 I d H }t|| | qK||	7 }qEd S )
Nr?   c                 S   s   g | ]}t  qS r$   )rV   r   r$   r$   r%   r   A  s    z3test_multiple_collections_batch.<locals>.<listcomp>z#00000000-0000-0000-0000-00000000000r{   c                 S   s   g | ]}g qS r$   r$   r   r$   r$   r%   r   I  s    re   r   r   )r|   r   r}   r   r~   r^   ri   rw   )r<   rU   rx   r   r   N_TOPICSconsume_fnsr=   embeddings_nPRODUCE_BATCH_SIZEN_TO_PRODUCEtotal_producedrb   r   r$   r$   r%   test_multiple_collections_batch6  s@   
r   c           
         s   | \}}|   |   td}|j}|dksJ  fddt|D }t }|j||| d |j||d |j|ddI d H }t	||  fd	dt|d
 D }t
t}	|j||d W d    n1 slw   Y  dt|	jv szJ d S )Nry   r   c                    r   r$   r   r   r   r$   r%   r   j  r   z'test_max_batch_size.<locals>.<listcomp>r{   r   x   )rf   c                    r   r$   r   r   r   r$   r%   r   q  r   r>   zCannot submit more than)r|   r   max_batch_sizer   rV   r}   r~   r   ri   rw   r   r   r   strvalue)
r<   rU   r   r   r   r   rW   r   r   r   r$   r   r%   test_max_batch_size]  s$   
r   )>rY   r+   r.   r)   uuidr   r   	itertoolsr   typingr   r   r   r   r   r	   r
   r   r   chromadb.errorsr   chromadb.ingestr   r   chromadb.db.impl.sqliter   chromadb.test.conftestr   chromadb.typesr   r   r   r   chromadb.configr   r   r   r   r   r   r   numpyrJ   r&   r1   r4   fixturer<   rU   rV   floatrr   rw   markr   r   r   r   r   r   r   r   r$   r$   r$   r%   <module>   s    ,  	,	
 



!



&
