代码之家  ›  专栏  ›  技术社区  ›  sphinks

在不是varchar(65535)的字段之前,redshift无法导入空字符串值

  •  0
  • sphinks  · 技术社区  · 6 年前

    我遇到了一个奇怪的行为,用aws胶水将数据从postgres导入redshift。我在Postgres表中有一个字段 lastname varchar(255) . 用aws胶水移动那张桌子,上面写着这样的代码:

    import sys, boto3
    from pyspark.context import SparkContext
    from awsglue.job import Job
    from awsglue.utils import getResolvedOptions
    from awsglue.context import GlueContext
    
    
    def getDBUrl(database):
        dbConnection = glue.get_connection(Name=database)
        jdbc_url = dbConnection['Connection']['ConnectionProperties']['JDBC_CONNECTION_URL']
        username = dbConnection['Connection']['ConnectionProperties']['USERNAME']
        password = dbConnection['Connection']['ConnectionProperties']['PASSWORD']
        jdbc_url = jdbc_url + '?user=' + username + '&password=' + password
        print jdbc_url
        return jdbc_url
    
    
    args = getResolvedOptions(sys.argv, ['TempDir', 'JOB_NAME'])
    
    sc = sc if 'sc' in vars() else SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    
    source_database_connection = 'Postgres'
    target_database_connection = 'Redshift'
    
    bound_query = """
    (
        select COALESCE(max(id),0)
        from {0}
    ) as temp
    """
    
    glue = boto3.client(service_name='glue', region_name='us-east-1')
    
    # Create connection urls
    jdbc_url_source = getDBUrl(database=source_database_connection)
    jdbc_url_target = getDBUrl(database=target_database_connection)
    
    
    def extract_and_save(source_table, target_table, source_bound_query):
        print "loading {0}".format(target_table)
        (upper_bound,) = (spark.read
                          .jdbc(url=jdbc_url_source, table=source_bound_query)
                          .first())
    
        df = spark.read.jdbc(url=jdbc_url_source,
                             table=source_table,
                             column='id',
                             lowerBound=1,
                             upperBound=upper_bound + 10,
                             numPartitions=50)
    
        df.write.format("com.databricks.spark.redshift") \
            .option("url", jdbc_url_target) \
            .option("dbtable", target_table) \
            .option("tempdir", args["TempDir"]) \
            .option("aws_iam_role", "AWS_ROLE") \
            .mode("overwrite") \
            .option("jdbcdriver", "com.amazon.redshift.jdbc41.Driver") \
            .save()
    
    source_user = """
    (
    SELECT 
        cast(firstname as VARCHAR(65535)),
        last_updated,
        registration_date,
        date_created,
        cast(sex as VARCHAR(65535)),
        id,
        cast(email as VARCHAR(65535)),
        cast(lastname as VARCHAR(65535)),
        cast(username as VARCHAR(65535))
    FROM user
    ) as temp
    """
    
    
    # do extract
    extract_and_save(
        source_user,
        "user",
        bound_query.format("user"))
    
    job.commit()
    

    而且效果很好。但是,一旦我开始在varchar字段中使用 VARCHAR(65535) ,但原始大小 varchar(255) ,导入时出错: Missing data for not-null field . 从stl_load_error我可以发现我得到了一个空字符串 lastname 字段。但在stl_load_错误中,该值标记为 @NULL@ . 在redshift的表定义中没有 not null 约束。

    所以为什么 VARCHAR(255) 处理空字符串时出现问题,但是 varchar(65535) 做得完美吗?

    0 回复  |  直到 6 年前