
    ɯeiN                        d dl Z d dlmZ d dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZmZ d dlmZ d dlmZ d dl m!Z! erd dl"Zd dl#Z ee$      Z% G d de      Z& G d d      Z'y)    N)Enum)	getLogger)TYPE_CHECKINGIteratorListLiteralOptionalUnion)ASYNC_RETRY_PATTERN)DatabaseError)pandas)result_scan_statement)Query)is_in_stored_procedureresult_set_to_iterresult_set_to_rows)SnowparkSQLException)col)Rowc                   0    e Zd ZdZdZdZdZdZdZdZ	dZ
d	Zy
)_AsyncResultTyperowrow_iteratorr   pandas_batchescount	no_resultupdatedeletemergeN)__name__
__module____qualname__ROWITERATORPANDASPANDAS_BATCHCOUNT	NO_RESULTUPDATEDELETEMERGE     ^/var/www/html/glpi_dashboard/venv/lib/python3.12/site-packages/snowflake/snowpark/async_job.pyr   r      s/    
CHF#LEIFFEr-   r   c                   >   e Zd ZdZej
                  ddddfdedee   ddd	ed
eee	      de
de
dee   ddfdZedee   fd       ZddZde
fdZde
fdZdefdZd dZdeee   ee   f   d	eded   fdZ	 d!d	eed      deee   ee   ded   eddddf	   fdZy)"AsyncJoba  
    Provides a way to track an asynchronous query in Snowflake. A :class:`DataFrame` object can be
    evaluated asynchronously and an :class:`AsyncJob` object will be returned. With this instance,
    you can:

        - retrieve results;
        - check the query status (still running or done);
        - cancel the running query;
        - retrieve the query ID and perform other operations on this query ID manually.

    :class:`AsyncJob` can be created by :meth:`Session.create_async_job` or action methods in
    :class:`DataFrame` and other classes. All methods in :class:`DataFrame` with a suffix of
    ``_nowait`` execute asynchronously and create an :class:`AsyncJob` instance. They are also
    equivalent to corresponding functions in :class:`DataFrame` and other classes that set
    ``block=False``. Therefore, to use it, you need to create a dataframe first. Here we demonstrate
    how to do that:

    First, we create a dataframe:
        >>> from snowflake.snowpark.functions import when_matched, when_not_matched
        >>> from snowflake.snowpark.types import IntegerType, StringType, StructField, StructType
        >>> df = session.create_dataframe([[float(4), 3, 5], [2.0, -4, 7], [3.0, 5, 6],[4.0,6,8]], schema=["a", "b", "c"])

    Example 1
        :meth:`DataFrame.collect` can be performed asynchronously::

            >>> async_job = df.collect_nowait()
            >>> async_job.result()
            [Row(A=4.0, B=3, C=5), Row(A=2.0, B=-4, C=7), Row(A=3.0, B=5, C=6), Row(A=4.0, B=6, C=8)]


        You can also do::

            >>> async_job = df.collect(block=False)
            >>> async_job.result()
            [Row(A=4.0, B=3, C=5), Row(A=2.0, B=-4, C=7), Row(A=3.0, B=5, C=6), Row(A=4.0, B=6, C=8)]

    Example 2
        :meth:`DataFrame.to_pandas` can be performed asynchronously::

            >>> async_job = df.to_pandas(block=False)
            >>> async_job.result()
                 A  B  C
            0  4.0  3  5
            1  2.0 -4  7
            2  3.0  5  6
            3  4.0  6  8

    Example 3
        :meth:`DataFrame.first` can be performed asynchronously::

            >>> async_job = df.first(block=False)
            >>> async_job.result()
            [Row(A=4.0, B=3, C=5)]

    Example 4
        :meth:`DataFrame.count` can be performed asynchronously::

            >>> async_job = df.count(block=False)
            >>> async_job.result()
            4

    Example 5
        Save a dataframe to table or copy it into a stage file can also be performed asynchronously::

            >>> table_name = "name"
            >>> async_job = df.write.save_as_table(table_name, block=False)
            >>> # copy into a stage file
            >>> remote_location = f"{session.get_session_stage()}/name.csv"
            >>> async_job = df.write.copy_into_location(remote_location, block=False)
            >>> async_job.result()[0]['rows_unloaded']
            4

    Example 7
        :meth:`Table.merge`, :meth:`Table.update`, :meth:`Table.delete` can also be performed asynchronously::

            >>> schema = StructType([StructField("key", IntegerType()), StructField("value", StringType())])
            >>> target_df = session.create_dataframe([(10, "old"), (10, "too_old"), (11, "old")], schema=schema)
            >>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
            >>> target = session.table("my_table")
            >>> source = session.create_dataframe([(10, "new"), (12, "new"), (13, "old")], schema=schema)
            >>> async_job = target.merge(source,target["key"] == source["key"],[when_matched().update({"value": source["value"]}),when_not_matched().insert({"key": source["key"]})],block=False)
            >>> async_job.result()
            MergeResult(rows_inserted=2, rows_updated=2, rows_deleted=0)

    Example 8
        Cancel the running query associated with the dataframe::

            >>> df = session.sql("select SYSTEM$WAIT(3)")
            >>> async_job = df.collect_nowait()
            >>> async_job.cancel()

    Example 9
        Creating an :class:`AsyncJob` from an existing query ID, retrieving results and converting it back to a :class:`DataFrame`:

            >>> from snowflake.snowpark.functions import col
            >>> query_id = session.sql("select 1 as A, 2 as B, 3 as C").collect_nowait().query_id
            >>> async_job = session.create_async_job(query_id)
            >>> async_job.query # doctest: +SKIP
            'select 1 as A, 2 as B, 3 as C'
            >>> async_job.result()
            [Row(A=1, B=2, C=3)]
            >>> async_job.result(result_type="pandas")
               A  B  C
            0  1  2  3
            >>> df = async_job.to_df()
            >>> df.select(col("A").as_("D"), "B").collect()
            [Row(D=1, B=2)]

    Example 10
        Checking the status of a failed query (division by zero) using the new status APIs::

            >>> import time
            >>> failing_query = session.sql("select 1/0 as result")
            >>> async_job = failing_query.collect_nowait()
            >>> while not async_job.is_done():
            ...     time.sleep(1.0)
            >>> async_job.is_done()
            True
            >>> async_job.is_failed()
            True
            >>> async_job.status()
            'FAILED_WITH_ERROR'

    Note:
        - If a dataframe is associated with multiple queries:

            + if you use :meth:`Session.create_dataframe` to create a dataframe from a large amount
              of local data and evaluate this dataframe asynchronously, data will still be loaded
              into Snowflake synchronously, and only fetching data from Snowflake again will be
              performed asynchronously.
            + otherwise, multiple queries will be wrapped into a
              `Snowflake Anonymous Block <https://docs.snowflake.com/en/developer-guide/snowflake-scripting/blocks.html#using-an-anonymous-block>`_
              and executed asynchronously as one query.
        - Temporary objects (e.g., tables) might be created when evaluating dataframes and they will
          be dropped automatically after all queries finish when calling a synchronous API. When you
          evaluate dataframes asynchronously, temporary objects will only be dropped after calling
          :meth:`result`.
    NFTquery_idquerysessionz"snowflake.snowpark.session.Sessionresult_typepost_actionslog_on_exceptioncase_sensitivenum_statementsreturnc	                 "   || _         || _        d| _        || _        |j                  j                  j                         | _        || _        |r|ng | _        || _	        || _
        || _        |	| _        d | _        d| _        d| _        d| _        y )NTF)r1   _query_can_query_be_retrieved_session_conncursor_cursor_result_type_post_actions_log_on_exception_case_sensitive_num_statements_parameters_result_meta	_inserted_updated_deleted)
selfr1   r2   r3   r4   r5   r6   r7   r8   kwargss
             r.   __init__zAsyncJob.__init__   s     &'+$}}**113'-9\r!1--! r-   c                 X   | j                   syd| j                   }| j                  s	 | j                  j	                  d      j                  t        d      | j                  k(        j                  d      j                         }t        |t              sJ t        |      dk(  r t        j                  | d       d| _         yt        |d   d         | _        | j                  S | j                  S # t        $ r+}t        j                  | d	|        d| _         Y d}~yd}~ww xY w)
zs
        The SQL text of of the executed query. Returns ``None`` if it cannot be retrieved from Snowflake.
        Nz(query cannot be retrieved from query ID z information_schema.query_historyr1   
query_textr   z: result is emptyFz: )r<   r1   r;   r=   table_functionwherer   select'_internal_collect_with_tag_no_telemetry
isinstancelistlen_loggerdebugstrr   )rK   error_messageresultexs       r.   r2   zAsyncJob.query   s   
 ++Ft}}oVM;;8445WXs:$--?@-@@B	  &fd3336{a'7H&IJ7<4#&)&)A,&7;;4;; ,  MM]O2bT":;38D0 s   AC5 5	D)>!D$$D)c                 ^    | j                   j                  t        | j                              S )z^
        Returns a :class:`DataFrame` built from the result of this asynchronous job.
        )r=   sqlr   r1   )rK   s    r.   to_dfzAsyncJob.to_df   s#     }}  !6t}}!EFFr-   c                     | j                   j                  j                  j                  | j                        }| j                   j                  j                  j	                  |      }| S )z
        Checks the status of the query associated with this instance and returns a bool value
        indicating whether the query has finished.
        )r=   r>   get_query_statusr1   is_still_running)rK   status
is_runnings      r.   is_donezAsyncJob.is_done   sP    
 $$**;;DMMJ]]((..??G
~r-   c                     | j                   j                  j                  j                  | j                        }| j                   j                  j                  j	                  |      S )z
        Checks the status of the query associated with this instance and returns a bool value
        indicating whether the query has failed.
        )r=   r>   ra   r1   is_an_errorrK   rc   s     r.   	is_failedzAsyncJob.is_failed  sI    
 $$**;;DMMJ}}""((44V<<r-   c                     | j                   j                  j                  j                  | j                        }|j                  S )a|  
        Returns the current query status as a string.

        Possible values are the names from `snowflake.connector.cursor.QueryStatus`, e.g.
        `RUNNING`, `SUCCESS`, `FAILED_WITH_ERROR`, `ABORTING`, `ABORTED`, `QUEUED`,
        `FAILED_WITH_INCIDENT`, `DISCONNECTED`, `RESUMING_WAREHOUSE`, `QUEUED_REPARING_WAREHOUSE`,
        `RESTARTED`, `BLOCKED`, `NO_DATA`.
        )r=   r>   ra   r1   namerh   s     r.   rc   zAsyncJob.status
  s2     $$**;;DMMJ{{r-   c                 N   t               r| j                  j                  j                  dd      r|ddl}ddl}ddl}	 |j                  | j                         |j                  | j                        }d}	 |j                  |      }|j                  dd      s|}|rt        d
|       y| j                   j#                  d| j                   d       y# t        $ r t        d| j                   d      w xY w# t        |j                  f$ r}dd| d	}Y d}~d}~ww xY w)z0Cancels the query associated with this instance.)ENABLE_ASYNC_QUERY_IN_PYTHON_STORED_PROCSFr   NzInvalid UUID: ''successzError parsing response: )ro   errorz+Failed to cancel query. Returned response: zselect SYSTEM$CANCEL_QUERY('z'))r   r=   r>   "_get_client_side_session_parameter
_snowflakejsonuuidUUIDr1   
ValueErrorcancel_queryloadsget	TypeErrorJSONDecodeErrorr   r@   execute)rK   rr   rs   rt   raw_cancel_respfailure_responseparsed_cancel_respes           r.   cancelzAsyncJob.cancel  s1    #$##FF;U E		$--( )55dmmDO
  $%)ZZ%@")--i?'9$  #ABRAST   
 LL  #?b!QR1  E ?4==/!CDDE t334 $7s;$ s#   C 7%C= #C:=D$DD$result_data)snowflake.snowpark.MergeResultsnowflake.snowpark.UpdateResultsnowflake.snowpark.DeleteResultc                 :   |t         j                  k(  r>t        j                  j	                  t        |d   d         t        |d   d               S |t         j                  k(  r.t        j                  j                  t        |d   d               S d}d\  }}}| j                  rt        |d   |         }|dz  }| j                  rt        |d   |         }|dz  }| j                  rt        |d   |         }t        j                  j                  |||      S )Nr      )r   r   r   )r   r)   	snowflakesnowparkUpdateResultintr*   DeleteResultrH   rI   rJ   MergeResult)rK   r   r4   idxrows_insertedrows_updatedrows_deleteds          r.   _table_resultzAsyncJob._table_result?  s    *111%%22KN1%&KN1,=(>  ,333%%223{1~a7H3IJJC8?5M<~~ #KN3$7 8q}}";q>##67q}}";q>##67%%11|\ r-   )r   r   r   r   r   zpandas.DataFramer   r   r   c                 ,   |rt        |j                               n| j                  }| j                  j	                  | j
                         | j                  t        | j                  dz
        D ]  }| j                  j                           |t         j                  t         j                  fv r3| j                  j                  d| j                  j                   d       |t         j                  k(  rd}	 | j                  j                  j!                  | j
                        }| j                  j                  j#                  |      sn:t%        j&                  dt(        |   z         |t+        t(              dz
  k  r|dz  }d}n|t         j                  k(  r7| j                  j,                  j/                  | j                  dd	      d
   }nY|t         j                  k(  r7| j                  j,                  j/                  | j                  dd	      d
   }n| j                  j1                         }| j                  j2                  | _        |t         j6                  k(  r#t9        || j4                  | j:                        }n|t         j<                  k(  r#t?        || j4                  | j:                        }nn|t         j@                  k(  r	|d   d   }nR|t         jB                  t         jD                  t         jF                  fv r| jI                  ||      }ntK        | d      | jL                  D ]T  } | j                  j,                  jN                  |jP                  f|jR                  | jT                  d| jV                   V |S )aH  
        Blocks and waits until the query associated with this instance finishes, then returns query
        results. This acts like executing query in a synchronous way. The data type of returned
        query results is determined by how you create this :class:`AsyncJob` instance. For example,
        if this instance is returned by :meth:`DataFrame.collect_nowait`, you will get a list of
        :class:`Row` s from this method.

        Args:
            result_type: Specifies the data type of returned query results. Currently
                it only supports the following return data types:

                - "row": returns a list of :class:`Row` objects, which is the same as the return
                  type of :meth:`DataFrame.collect`.
                - "row_iterator": returns an iterator of :class:`Row` objects, which is the same as
                  the return type of :meth:`DataFrame.to_local_iterator`.
                - "pandas": returns a ``pandas.DataFrame``, which is the same as the return type of
                  :meth:`DataFrame.to_pandas`.
                - "pandas_batches": returns an iterator of ``pandas.DataFrame`` s, which is the same
                  as the return type of :meth:`DataFrame.to_pandas_batches`.
                - "no_result": returns ``None``. You can use this option when you intend to execute
                  the query but don't care about query results (the client will not fetch results
                  either).

                When you create an :class:`AsyncJob` by :meth:`Session.create_async_job` and
                retrieve results with this method, ``result_type`` should be specified to determine
                the result data type. Otherwise, it will return a list of :class:`Row` objects by default.
                When you create an :class:`AsyncJob` by action methods in :class:`DataFrame` and
                other classes, ``result_type`` is optional and it will return results with
                corresponding type. If you still provide a value for it, this value will overwrite
                the original result data type.
        Nr   z!select * from table(result_scan('z'))r   Tg      ?F)	to_pandasto_iterdata)r7   z is not supported)is_ddl_on_temp_objectr6   ),r   lowerrA   r@   get_results_from_sfqidr1   rE   rangenextsetr%   r&   r|   sfqidr(   r=   
connectionra   rb   timesleepr   rV   r>   _to_data_or_iterfetchalldescriptionrG   r#   r   rD   r$   r   r'   r)   r*   r+   r   rv   rB   	run_queryr^   r   rC   rF   )	rK   r4   async_result_type_retry_pattern_posrc   r[   r   actions	            r.   r[   zAsyncJob.result]  s?   ` 6A[..01dFWFW 	 	++DMM:+4//!34 '$$&' ! '' --%  $$78J8J7K3O  0 : :: !"11BB4==Q}}//@@H

-.?@@ %,?(@1(DE%*%  F"2"9"99]]((99e : F "2"?"??]]((99d : F ,,//1K $ 8 8D $4$8$88+%%#'#7#7
 #&6&?&??+%%#'#7#7
 #&6&<&<<$Q*" '' '' &&' 
 ++K9JK $5#66G!HII(( 	F)DMM))

&,&B&B!%!7!7 ""		 r-   )r9   z&snowflake.snowpark.dataframe.DataFrame)r9   N)N)r    r!   r"   __doc__r   r#   rY   r	   r   r   boolr   rM   propertyr2   r_   re   ri   rc   r   r
   tupledictr   r   r   r   r[   r,   r-   r.   r0   r0   +   s|   I` )9(<(<.2!&#(, } 6	
 & tE{+   ! 
8 x}  >G =4 =
 
'SR4;T
23 & 
	+
	D 	zRS
z
 
S	#$())	

zr-   r0   )(r   enumr   loggingr   typingr   r   r   r   r	   r
   snowflake.snowparkr   snowflake.connector.cursorr   snowflake.connector.errorsr   snowflake.connector.optionsr   4snowflake.snowpark._internal.analyzer.analyzer_utilsr   4snowflake.snowpark._internal.analyzer.snowflake_planr   "snowflake.snowpark._internal.utilsr   r   r   snowflake.snowpark.exceptionsr   snowflake.snowpark.functionsr   snowflake.snowpark.rowr   snowflake.snowpark.dataframesnowflake.snowpark.sessionr    rW   r   r0   r,   r-   r.   <module>r      sh       J J  : 4 . V F 
 ? , &'%
H
	t 	l lr-   